How to Write Hive UDAF:
1. Create Java class which extends org.apache.hadoop.hive.ql.exec.hive.UDAF;
2. Create Inner Class which implements UDAFEvaluator
3. Implement five methods ()
init() – The init() method initializes the evaluator and resets its internal state. We are using new Column() in code below to indicate that no values have been aggregated yet.
iterate() – this method is called every time there is anew value to be aggregated. The evaluator should update its internal state with the result of performing the aggregation (we are doing sum – see below). We return true to indicate that input was valid.
terminatePartial() – this method is called when Hive wants a result for the partial aggregation. The method must return an object that encapsulates the state of the aggregation.
merge() – this method is called when Hive decides to combine one partial aggregation with another.
terminate() – this method is called when the final result of the aggregation is needed.
4. Compile and Package JAR
5. CREATE TEMPORARY FUNCTION in hive CLI
6. Run Aggregation Query – Verify Output!!!
package org.hardik.letsdobigdata;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.hardik.letsdobigdata.MeanUDAF.MeanUDAFEvaluator.Column;
@Description(name = "Mean", value = "_FUNC(double) - computes mean", extended = "select col1, MeanFunc(value) from table group by col1;")
public class MeanUDAF extends UDAF {
// Define Logging
static final Log LOG = LogFactory.getLog(MeanUDAF.class.getName());
public static class MeanUDAFEvaluator implements UDAFEvaluator {
/**
* Use Column class to serialize intermediate computation
* This is our groupByColumn
*/
public static class Column {
double sum = 0;
int count = 0;
}
private Column col = null;
public MeanUDAFEvaluator() {
super();
init();
}
// A - Initalize evaluator - indicating that no values have been
// aggregated yet.
public void init() {
LOG.debug("Initializeuator");
col = new Column();
}
// B- Iterate every time there is a new value to be aggregated
public boolean iterate(double value) throws HiveException {
LOG.debug("Iterating each value for aggregation");
if (col == null)
throw new HiveException("Item is not initialized");
col.sum = col.sum + value;
col.count = col.count + 1;
return true;
}
// C - Called when Hive wants partially aggregated results.
public Column terminatePartial() {
LOG.debug("Returnially aggregated results");
return col;
}
// D - Called when Hive decides to combine one partial aggregation with another
public boolean merge(Column other) {
LOG.debug("mergingombining partial aggregation");
if(other == null) {
return true;
}
col.sum += other.sum;
col.count += other.count;
return true;
}
// E - Called when the final result of the aggregation needed.
public double terminate(){
LOG.debug("Attend of last record of the group - returning final result");
return col.sum/col.count;
}
}
}
CREATE TABLE IF NOT EXISTS orders (order_id int, order_date string, customer_id int, amount int) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE;
LOAD DATA LOCAL INPATH '/root/utaf.txt' INTO TABLE orders;
101,2016-01-01,7,3540
102,2016-03-01,1,240
103,2016-03-02,6,2340
104,2016-02-12,3,5000
105,2016-02-12,3,5500
106,2016-02-14,9,3005
107,2016-02-14,1,20
108,2016-02-29,2,2000
109,2016-02-29,3,2500
110,2016-02-27,1,200
add jar /root/HiveUDFs-master-0.0.1-SNAPSHOT.jar;
CREATE TEMPORARY FUNCTION MeanFunc AS 'org.hardik.letsdobigdata.MeanUDAF';
select customer_id, MeanFunc(amount) from orders group by customer_id;
1. Create Java class which extends org.apache.hadoop.hive.ql.exec.hive.UDAF;
2. Create Inner Class which implements UDAFEvaluator
3. Implement five methods ()
init() – The init() method initializes the evaluator and resets its internal state. We are using new Column() in code below to indicate that no values have been aggregated yet.
iterate() – this method is called every time there is anew value to be aggregated. The evaluator should update its internal state with the result of performing the aggregation (we are doing sum – see below). We return true to indicate that input was valid.
terminatePartial() – this method is called when Hive wants a result for the partial aggregation. The method must return an object that encapsulates the state of the aggregation.
merge() – this method is called when Hive decides to combine one partial aggregation with another.
terminate() – this method is called when the final result of the aggregation is needed.
4. Compile and Package JAR
5. CREATE TEMPORARY FUNCTION in hive CLI
6. Run Aggregation Query – Verify Output!!!
package org.hardik.letsdobigdata;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.hardik.letsdobigdata.MeanUDAF.MeanUDAFEvaluator.Column;
@Description(name = "Mean", value = "_FUNC(double) - computes mean", extended = "select col1, MeanFunc(value) from table group by col1;")
public class MeanUDAF extends UDAF {
// Define Logging
static final Log LOG = LogFactory.getLog(MeanUDAF.class.getName());
public static class MeanUDAFEvaluator implements UDAFEvaluator {
/**
* Use Column class to serialize intermediate computation
* This is our groupByColumn
*/
public static class Column {
double sum = 0;
int count = 0;
}
private Column col = null;
public MeanUDAFEvaluator() {
super();
init();
}
// A - Initalize evaluator - indicating that no values have been
// aggregated yet.
public void init() {
LOG.debug("Initializeuator");
col = new Column();
}
// B- Iterate every time there is a new value to be aggregated
public boolean iterate(double value) throws HiveException {
LOG.debug("Iterating each value for aggregation");
if (col == null)
throw new HiveException("Item is not initialized");
col.sum = col.sum + value;
col.count = col.count + 1;
return true;
}
// C - Called when Hive wants partially aggregated results.
public Column terminatePartial() {
LOG.debug("Returnially aggregated results");
return col;
}
// D - Called when Hive decides to combine one partial aggregation with another
public boolean merge(Column other) {
LOG.debug("mergingombining partial aggregation");
if(other == null) {
return true;
}
col.sum += other.sum;
col.count += other.count;
return true;
}
// E - Called when the final result of the aggregation needed.
public double terminate(){
LOG.debug("Attend of last record of the group - returning final result");
return col.sum/col.count;
}
}
}
CREATE TABLE IF NOT EXISTS orders (order_id int, order_date string, customer_id int, amount int) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE;
LOAD DATA LOCAL INPATH '/root/utaf.txt' INTO TABLE orders;
101,2016-01-01,7,3540
102,2016-03-01,1,240
103,2016-03-02,6,2340
104,2016-02-12,3,5000
105,2016-02-12,3,5500
106,2016-02-14,9,3005
107,2016-02-14,1,20
108,2016-02-29,2,2000
109,2016-02-29,3,2500
110,2016-02-27,1,200
add jar /root/HiveUDFs-master-0.0.1-SNAPSHOT.jar;
CREATE TEMPORARY FUNCTION MeanFunc AS 'org.hardik.letsdobigdata.MeanUDAF';
select customer_id, MeanFunc(amount) from orders group by customer_id;
No comments:
Post a Comment