Wednesday, November 9, 2016

Flume Custome interceptor


flume1.sources = kafka-source-airtel_ims_p1360com_cpqhocpuutiltable_kpi
flume1.channels = memory-channel-airtel_ims_p1360com_cpqhocpuutiltable_kpi
flume1.sinks  = hdfs-sink-airtel_ims_p1360com_cpqhocpuutiltable_kpi


# For each source, channel, and sink, set standard properties
flume1.sources.kafka-source-airtel_ims_p1360com_cpqhocpuutiltable_kpi.type = org.apache.flume.source.kafka.KafkaSource
flume1.sources.kafka-source-airtel_ims_p1360com_cpqhocpuutiltable_kpi.zookeeperConnect = 135.250.193.206:2181
flume1.sources.kafka-source-airtel_ims_p1360com_cpqhocpuutiltable_kpi.topic = airtel_ims_p1360com_cpqhocpuutiltable_kpi
flume1.sources.kafka-source-airtel_ims_p1360com_cpqhocpuutiltable_kpi.batchSize = 5
flume1.sources.kafka-source-airtel_ims_p1360com_cpqhocpuutiltable_kpi.batchDurationMillis = 200
flume1.sources.kafka-source-airtel_ims_p1360com_cpqhocpuutiltable_kpi.channels = memory-channel-airtel_ims_p1360com_cpqhocpuutiltable_kpi


flume1.sources.kafka-source-airtel_ims_p1360com_cpqhocpuutiltable_kpi.interceptors = i2
flume1.sources.kafka-source-airtel_ims_p1360com_cpqhocpuutiltable_kpi.interceptors.i2.type = flume.interceptor.CustomInterceptor$Builder
flume1.sources.kafka-source-airtel_ims_p1360com_cpqhocpuutiltable_kpi.selector.type = multiplexing
flume1.sources.kafka-source-airtel_ims_p1360com_cpqhocpuutiltable_kpi.selector.mapping.US = memory-channel-airtel_ims_p1360com_cpqhocpuutiltable_kpi
flume1.sources.kafka-source-airtel_ims_p1360com_cpqhocpuutiltable_kpi.selector.default = memory-channel-airtel_ims_p1360com_cpqhocpuutiltable_kpi

# Other properties are specific to each type of source, channel, or sink. In this case, we specify the capacity of the memory channel.
flume1.channels.memory-channel-airtel_ims_p1360com_cpqhocpuutiltable_kpi.type = memory
flume1.channels.memory-channel-airtel_ims_p1360com_cpqhocpuutiltable_kpi.capacity = 10000
flume1.channels.memory-channel-airtel_ims_p1360com_cpqhocpuutiltable_kpi.transactionCapacity = 10000

flume1.sinks.hdfs-sink-airtel_ims_p1360com_cpqhocpuutiltable_kpi.channel = memory-channel-airtel_ims_p1360com_cpqhocpuutiltable_kpi
flume1.sinks.hdfs-sink-airtel_ims_p1360com_cpqhocpuutiltable_kpi.type = hdfs
flume1.sinks.hdfs-sink-airtel_ims_p1360com_cpqhocpuutiltable_kpi.hdfs.useLocalTimeStamp = false
flume1.sinks.hdfs-sink-airtel_ims_p1360com_cpqhocpuutiltable_kpi.hdfs.path =  hdfs://135.250.193.206:8020/user/hive/warehouse/spm_database.db/spm_kpi_data/customername=%{customer}/networkid=%{networkid}/subnetwork=%{subnetwork}/entity_type=%{product}/year=%{year}/month=%{month}/day=%{day}
flume1.sinks.hdfs-sink-airtel_ims_p1360com_cpqhocpuutiltable_kpi.hdfs.filePrefix = airtel_ims_p1360com_cpqhocpuutiltable_kpi
flume1.sinks.hdfs-sink-airtel_ims_p1360com_cpqhocpuutiltable_kpi.hdfs.maxOpenFiles=150
flume1.sinks.hdfs-sink-airtel_ims_p1360com_cpqhocpuutiltable_kpi.hdfs.rollSize = 0
flume1.sinks.hdfs-sink-airtel_ims_p1360com_cpqhocpuutiltable_kpi.hdfs.rollCount = 0
flume1.sinks.hdfs-sink-airtel_ims_p1360com_cpqhocpuutiltable_kpi.hdfs.rollInterval = 30
flume1.sinks.hdfs-sink-airtel_ims_p1360com_cpqhocpuutiltable_kpi.hdfs.writeFormat=Text
flume1.sinks.hdfs-sink-airtel_ims_p1360com_cpqhocpuutiltable_kpi.hdfs.inUsePrefix=_
flume1.sinks.hdfs-sink-airtel_ims_p1360com_cpqhocpuutiltable_kpi.hdfs.fileType = DataStream

=================================
package alu.spm.flume.interceptor;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

/**
 * @author  Sangala Shakhar Reddy
 *
 */
public class CustomInterceptor implements Interceptor {


private final String yearheader = "year";
private final String monthheader = "month";
private final String dayheader = "day";

private final String customer = "customer";
private final String networkid = "networkid";
private final String subnetwork = "subnetwork";
private final String product = "product";

@Override
public Event intercept(Event event) {

String customerval = null;
String networkidrval = null;
String productVal = null;
String subnetworkval = null;
String yearheaderval = null;
String monthheaderval = null;
String dayheaderval = null;

byte[] eventBody = event.getBody();
try {

String body = new String(eventBody);
// 2015-02-13 00:01:09;00:01:09;-05:00;PT300S;PT300S;comSubscriberPmPerIpTable;1;nltcom01;1;4;10;6;1;1234.06;4321.04;KA;2015-02-13 00:01:09$Airtel$IMS_1$1360COM;last_hour_cpu_utilization_kpi;100
String[] strs = body.split(";");
String res = strs[strs.length - 3];
String[] p = res.split("#");


String aDate = p[0];
// String aDate = "2012-04-13 sdfsd00:00:00";
// String aDate = "2012-4-13";
Pattern datePattern = Pattern.compile("(\\d{4})-(\\d{2})-(\\d{2})");
Matcher dateMatcher = datePattern.matcher(aDate);
if (dateMatcher.find()) {
yearheaderval = dateMatcher.group(1);
monthheaderval = dateMatcher.group(2);
dayheaderval = dateMatcher.group(3);
}

customerval = p[1];
networkidrval = p[2];
subnetworkval = p[3];
productVal = p[4];
// event.setBody("test insert".getBytes());
} catch (Exception e) {
e.printStackTrace();
}
// depeen 95602 63623

Map headers = event.getHeaders();
headers.put(customer, customerval);
headers.put(networkid, networkidrval);
headers.put(subnetwork, subnetworkval);
headers.put(product, productVal);
headers.put(yearheader, yearheaderval);
headers.put(monthheader, monthheaderval);
headers.put(dayheader, dayheaderval);

event.setHeaders(headers);
return event;
}

@Override
public void close() {
// TODO Auto-generated method stub

}

@Override
public void initialize() {
// TODO Auto-generated method stub

}

@Override
public List intercept(List events) {
for (Iterator iterator = events.iterator(); iterator.hasNext();) {
Event next = intercept((Event) iterator.next());
if (next == null) {
iterator.remove();
}
}
return events;
}

public static class Builder implements Interceptor.Builder {
@Override
public void configure(Context context) {
// TODO Auto-generated method stub
}

@Override
public Interceptor build() {
return new CustomInterceptor();
}
}

}

No comments: