Wednesday, November 16, 2016

Hive Filter to skip .tmp files while quering

1. Create class implements PathFilter
package com.hivefilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
public class FileFilterExcludeTmpFiles implements PathFilter {
        public boolean accept(Path p) {
                String name = p.getName();
                return !name.startsWith("_") && !name.startsWith(".") &&!name.endsWith(".tmp");
        }
}

2. Create jar file and Copyjar /usr/lib/hive/lib location and add below property in /etc/hive/conf/hive-site.xml file.

 
          mapred.input.pathFilter.class
          com.alu.spm.hivefilter.FileFilterExcludeTmpFiles

Creating Hive UDF and Using it


Create class and extend UDF

package com;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.Text;

public class TestUDF extends UDF {

public TestUDF(){
}
 public Text evaluate(Text str)  {
     return new Text(str.toString().toUpperCase());
 }
}


1. Create jar
jar cvf test.jar com/TestUDF.class
2. move jar to hdfs
hdfs dfs -put /home/cloudera/workspace/training/test.jar /user/hive/udf/.
hdfs dfs -chmod 777  /user/hive/udf/test.jar
3. create UDF
create function my_upper2(string) returns string location '/user/hive/udf/test.jar' symbol='com.TestUDF';
4. use UDF
select my_upper2(name) from table1

consuming above webservice

consuming above webservice

we can consume in two way

Way 1 :

XFireClientFactoryBean client=new XFireClientFactoryBean();
try{
client.setServiceClass(Class.forName("com.optrasystems.mvc.TestInterface"));
client.setWsdlDocumentUrl("http://192.168.100.151:8089/thorlabs/app/citationService?WSDL");
client.afterPropertiesSet();
client.getObject();
TestInterface lTestInterface=(TestInterface)(client.getObject()));
String str=lTestInterface.sayHello();
request.getSession().setAttribute("webservice", str);
}catch(java.lang.Exception e){
e.printStackTrace();
}

Way 2 : (this using Axies web service)

// create proxy object
WeatherSoapProxy lWeatherSoapProxy=new WeatherSoapProxy();

//invoke method
WeatherDescription[] lWeatherDescription=lWeatherSoapProxy.getWeatherInformation();
for(WeatherDescription lWeatherDesc: lWeatherDescription){

System.out.println(lWeatherDesc.getDescription());
System.out.println(lWeatherDesc.getPictureURL());
System.out.println(lWeatherDesc.getWeatherID());
}

Karaf - OSGI container

Apache Karaf is a small OSGi based runtime which provides a lightweight container onto which various components and applications can be deployed.
Deploying all the requirements (bundles and configurations) of an application into a container is called the "provisioning".
In Apache Karaf, the application provisioning is an Apache Karaf "feature".

=========================
start/stop
service karaf-service status
service karaf-service start
service karaf-service stop

./client -u karaf
=============
exports | grep spm

run
./start
then run
./client -a 8101 -h -u karaf

./client -a 8101 -h spmdev3.alcatel.com -u karaf
Open a command line console and change the directory to .
To start the server, run the following command in Windows:
===========================
1. start karaf
cd %KARAF_HOME%
%KARAF_HOME%\bin\karaf.bat

respectively on Unix:
bin/karaf

2. To know karaf status
On Unix:
bin/status
Not Running ...
bin/status
Running ...

On Windows:
bin\status.bat
Not Running ...
bin\status.bat
Running ...

3. To see all active services: la
4. log: C:\SPM_softwares\apache-karaf-4.0.0.M2\data\log
5. Copy jar to deploy folder
6. Start service if not started automatically: start ID
7. RESTART:The shutdown command accepts the -r (--restart) option to restart Apache Karaf:
system:shutdown -r
8. To install dependencies
install wrap:mvn:org.mariadb.jdbc/mariadb-java-client/1.1.7
install wrap:mvn:mysql/mysql-connector-java/5.1.17
install wrap:mvn:commons-dbcp/commons-dbcp/1.4
install wrap:mvn:com.sun.jersey.contribs/jersey-spring/1.9.1
install wrap:mvn:com.sun.jersey/jersey-core/1.10
install wrap:mvn:com.sun.jersey/jersey-json/1.10
install wrap:mvn:com.sun.jersey/jersey-server/1.10
install wrap:mvn:com.sun.jersey/jersey-servlet/1.10
install wrap:mvn:com.sun.jersey/jersey-client/1.18.1
install wrap:mvn:com.fasterxml.jackson.core/jackson-core/2.3.3
install wrap:mvn:com.fasterxml.jackson.core/jackson-databind/2.3.3

9. exports | grep common

10. update log level to your project in org.ops4j.pax.logging.cfg file

log4j.logger.alu.ausdc = INFO, PORTFW
log4j.logger.com.alu = INFO, PORTFW

log4j.appender.PORTFW = org.apache.log4j.RollingFileAppender
log4j.appender.PORTFW.layout = org.apache.log4j.EnhancedPatternLayout
log4j.appender.PORTFW.layout.ConversionPattern = %d{ISO8601} | %-5.5p | %-16.16c{1} | %m%n
log4j.appender.PORTFW.file = ${karaf.data}/log/gatewayframework.log
log4j.appender.PORTFW.append = true
log4j.appender.PORTFW.MaxFileSize=1MB
log4j.appender.PORTFW.MaxBackupIndex=10
log4j.appender.PORTFW.filter.1=org.apache.log4j.varia.StringMatchFilter
log4j.appender.PORTFW.filter.1.StringToMatch=executionduration
log4j.appender.PORTFW.filter.1.AcceptOnMatch=false

#log4j.logger.com.alu.ipprd.spm.collector.CommandLogger=INFO,CommandLog
log4j.logger.com.alu.ipprd.spm.collector.CommandLogger=INFO, CommandLog

log4j.appender.CommandLog = org.apache.log4j.RollingFileAppender
log4j.appender.CommandLog.layout = org.apache.log4j.EnhancedPatternLayout
log4j.appender.CommandLog.layout.ConversionPattern = %d{ISO8601} | %-5.5p | %-16.16c{1} | %m%n
log4j.appender.CommandLog.file = ${karaf.data}/log/commandlog.log
log4j.appender.CommandLog.append = true
log4j.appender.CommandLog.MaxFileSize=1MB
log4j.appender.CommandLog.MaxBackupIndex=10

Cron tab- Linux/Unix default schedular

step to setup corn tab in Unix/Linux

1. Login to UNIX/LINUX system
2. To run job every early morning (00.01 hrs) run crontab -e  and add bleow like
$crontab -e
1 0 * * * . /root/.bash_profile; /var/lib/hadoop-hdfs/shekhar/refresh_enrichment_table.sh /var/lib/hadoop-hdfs/shekhar/ >  /var/log/spm/refresh_enrichment_table.log

Note:  JAVA_HOME and other evn variablea has to be set in /root/.bash_profile and need to source to in corntab command like above line otherwise it will not find env avriables

How to debug Python script from CLI

To debug script run below command in console:
python -m pdb test.py


bedugging options:
n for next line
s for go inside method
q for to quit debugging

creating break point:
b cm_utils.py:98  -  creating break point
c -  to go next break point

Wednesday, November 9, 2016

Kafka Custom Partition



1. Create partitioner class

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class SimplePartitioner implements Partitioner {
    public SimplePartitioner (VerifiableProperties props) {

    }

    public int partition(Object key, int a_numPartitions) {
        int partition = 0;
        String stringKey = (String) key;
        int offset = stringKey.lastIndexOf('.');
        if (offset > 0) {
           partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;
        }
       return partition;
  }

}
============================
2.TestProducer.java for testing
package com.alu.ipprd.spm.kafkapartition;
import java.util.*;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class TestProducer {

public static void main(String[] args) {
        long events = Long.parseLong(args[0]);
        Random rnd = new Random();

        Properties props = new Properties();
        props.put("metadata.broker.list", "localhost:9092,localhost:9093");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "example.producer.SimplePartitioner");
        props.put("request.required.acks", "1");

        ProducerConfig config = new ProducerConfig(props);

        Producer producer = new Producer(config);

        for (long nEvents = 0; nEvents < events; nEvents++) {
               long runtime = new Date().getTime();
               String ip = "192.168.2." + rnd.nextInt(255);
               String msg = runtime + ",www.example.com," + ip;
               KeyedMessage data = new KeyedMessage("page_visits", ip, msg);
               producer.send(data);
        }
        producer.close();
    }
}
===========================
3.Maven dependencies

  org.apache.kafka
  kafka_2.9.2
  0.8.1.1
  compile
 
   
      jmxri
      com.sun.jmx
   
   
      jms
      javax.jms
   
   
      jmxtools
      com.sun.jdmk
   
 


Apache Apex example

To create example apex app using maven:
step 1:
mvn -B archetype:generate -DarchetypeGroupId=org.apache.apex -DarchetypeArtifactId=apex-app-archetype -DarchetypeVersion="3.3.0-incubating"  -DgroupId=com.example  -Dpackage=com.example.myapexapp  -DartifactId=myapexapp  -Dversion=1.0-SNAPSHOT

step2:
download https://github.com/srccodes/hadoop-common-2.2.0-bin/archive/master.zip and set hadoop.home.dir var in ApplicationTest before running test class
ApplicationTest.java
------------------------------------
Terminologies in apex:
Operators: which actually do data processing like kpi  calculation. can have many Operators in data processing
Unifier: which will do merge of partial aggregation data make final aggregation results (sum,count..etc). can have many Unifier in data processing
checkpoint: to store intermediate result, which will used in fault-tolerance time. default is 30 sec

Flume Opentsdb config

flume config for opentsdb with kafka topic:
===============================
opentsdbflume.sources = kafka-source
opentsdbflume.channels = http-channel
opentsdbflume.sinks = http-sink

opentsdbflume.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
opentsdbflume.sources.kafka-source.zookeeperConnect = 135.250.193.178:2181
opentsdbflume.sources.kafka-source.topic = airtel_ims_entity_cpu_opentsdb
opentsdbflume.sources.kafka-source.batchSize = 100
opentsdbflume.sources.kafka-source.batchDurationMillis = 200
opentsdbflume.sources.kafka-source.channels = http-channel

opentsdbflume.channels.http-channel.type = memory
opentsdbflume.channels.http-channel.transactionCapacity = 1000000
opentsdbflume.channels.http-channel.capacity = 10000000

opentsdbflume.sinks.http-sink.channel = http-channel
opentsdbflume.sinks.http-sink.type = org.apache.flume.sink.HttpSink
opentsdbflume.sinks.http-sink.endpoint = http://135.250.193.178
opentsdbflume.sinks.http-sink.port = 4241
opentsdbflume.sinks.http-sink.resource = /api/put

Zookeeper HA config

URL: http://jayatiatblogs.blogspot.in/2012/11/setting-up-zookeeper-cluster.html
1. Obtain the zookeeper setup at some location. Setup can be downloaded from :
http://hadoop.apache.org/zookeeper/releases.html

2. Create a file with any name (eg. zoo.cfg) in the /etc/zookeeper/conf folder of the copied setup and write in it
dataDir=/var/zookeeper/                                                                  
clientPort=2181
initLimit=5
syncLimit=2
server.server1=zoo1:2888:3888                              
server.server2=zoo2:2888:3888
server.server3=zoo3:2888:3888                                                      

Here 2888 and 3888 ports cannot be modified but the server.id(server.server1) and the zkServerName(zoo1) can be changed by the user. Using the above entries as sample entries, next it is required that a file named “myid” be created in the path specified in dataDir which contains just one entry which is of the server id. So the first system of the cluster would have a file named "myid" created at the path specified in dataDir containing server1 and so on i.e.
To make it more clear, if we are using 3 systems with IP 192.192.192.191, 192, 193
and zoo1 would designate 192.192.192.191, zoo2 would designate 192.192.192.192, zoo3 would designate 192.192.192.193
then
the machine 192.192.192.191 should contain a file called myid at /var/zookeeper/ (or the value of dataDir specified in zoo.cfg) containing the following entry
server1
Similarly machines 192.192.192.192 and 192.192.192.193 should have entries server2 and server3 respectively.

3. Update the /etc/hosts file on each machine to add the host names being used in the zookeeper configuration. This is needed so as to make it understandable that zoo1, zoo2 and zoo3 refer to which systems.
Post-updation the /etc/hosts file on each system in the cluster would have a similar set of entries like :
192.192.192.191   zoo1
192.192.192.192   zoo2                                                                    
192.192.192.193   zoo3

4. Add leaderServes=yes property in master node zoo.cfg file to tell which Zookeeper node is master
  maxClientCnxns=4000
        tickTime=2000
        initLimit=10
        syncLimit=5
        dataDir=/var/lib/zookeeper
        clientPort=2181
        minSessionTimeout=40000
        maxSessionTimeout=40000
        server.server1=192.192.192.191:3181:4181
        server.server2=192.192.192.192:3181:4181
        server.server3=192.192.192.193:3181:4181
        leaderServes=yes

5. This completes the configuration part, next cd to the zookeeper home and start the cluster by running
/usr/lib/zookeeper/bin/zkServer.sh start

Flume Master HA

http://archive.cloudera.com/cdh/3/flume/UserGuide/#_running_in_distributed_mode

Running in Distributed Mode:
-------------------------------------------
Distributed mode runs the Flume Master on several machines. Therefore the configuration described below should be done on every Master machine, except where noted. **

Running the Flume Master in distributed mode provides better fault tolerance than in standalone mode, and scalability for hundreds of nodes.

Configuring machines to run as part of a distributed Flume Master is nearly as simple as standalone mode. As before, flume.master.servers needs to be set, this time to a list of machines:


flume.master.servers
masterA,masterB,masterC

How many machines do I need? The distributed Flume Master will continue to work correctly as long as more than half the physical machines running it are still working and haven’t crashed. Therefore if you want to survive one fault, you need three machines (because 3-1 = 2 > 3/2). For every extra fault you want to tolerate, add another two machines, so for two faults you need five machines. Note that having an even number of machines doesn’t make the Flume Master any more fault-tolerant - four machines only tolerate one failure, because if two were to fail only two would be left functioning, which is not more than half of four. Common deployments should be well served by three or five machines.

The final property to set is not the same on every machine - every node in the Flume Master must have a unique value for flume.master.serverid.

Note. flume.master.serverid is the only Flume Master property that must be different on every machine in the ensemble. *

masterA.


flume.master.serverid
0

masterB.


flume.master.serverid
1

masterC.


flume.master.serverid
2


The value for flume.master.serverid for each node is the index of that node’s hostname in the list in flume.master.ensemble, starting at 0. For example masterB has index 1 in that list. The purpose of this property is to allow each node to uniquely identify itself to the other nodes in the Flume Master.

This is all the configuration required to start a three-node distributed Flume Master. To test this out, we can start the Master process on all three machines:

[flume@masterA] flume master

[flume@masterB] flume master

[flume@masterC] flume master
Each Master process will initially try and contact all other nodes in the ensemble. Until more than half (in this case, two) nodes are alive and contactable, the configuration store will be unable to start, and the Flume Master will not be able to read or write configuration data.

You can check the current state of the ensemble by inspecting the web page for any of the Flume Master machines which by default will be found at, for example, http://masterA:35871.

Flume Custome interceptor


flume1.sources = kafka-source-airtel_ims_p1360com_cpqhocpuutiltable_kpi
flume1.channels = memory-channel-airtel_ims_p1360com_cpqhocpuutiltable_kpi
flume1.sinks  = hdfs-sink-airtel_ims_p1360com_cpqhocpuutiltable_kpi


# For each source, channel, and sink, set standard properties
flume1.sources.kafka-source-airtel_ims_p1360com_cpqhocpuutiltable_kpi.type = org.apache.flume.source.kafka.KafkaSource
flume1.sources.kafka-source-airtel_ims_p1360com_cpqhocpuutiltable_kpi.zookeeperConnect = 135.250.193.206:2181
flume1.sources.kafka-source-airtel_ims_p1360com_cpqhocpuutiltable_kpi.topic = airtel_ims_p1360com_cpqhocpuutiltable_kpi
flume1.sources.kafka-source-airtel_ims_p1360com_cpqhocpuutiltable_kpi.batchSize = 5
flume1.sources.kafka-source-airtel_ims_p1360com_cpqhocpuutiltable_kpi.batchDurationMillis = 200
flume1.sources.kafka-source-airtel_ims_p1360com_cpqhocpuutiltable_kpi.channels = memory-channel-airtel_ims_p1360com_cpqhocpuutiltable_kpi


flume1.sources.kafka-source-airtel_ims_p1360com_cpqhocpuutiltable_kpi.interceptors = i2
flume1.sources.kafka-source-airtel_ims_p1360com_cpqhocpuutiltable_kpi.interceptors.i2.type = flume.interceptor.CustomInterceptor$Builder
flume1.sources.kafka-source-airtel_ims_p1360com_cpqhocpuutiltable_kpi.selector.type = multiplexing
flume1.sources.kafka-source-airtel_ims_p1360com_cpqhocpuutiltable_kpi.selector.mapping.US = memory-channel-airtel_ims_p1360com_cpqhocpuutiltable_kpi
flume1.sources.kafka-source-airtel_ims_p1360com_cpqhocpuutiltable_kpi.selector.default = memory-channel-airtel_ims_p1360com_cpqhocpuutiltable_kpi

# Other properties are specific to each type of source, channel, or sink. In this case, we specify the capacity of the memory channel.
flume1.channels.memory-channel-airtel_ims_p1360com_cpqhocpuutiltable_kpi.type = memory
flume1.channels.memory-channel-airtel_ims_p1360com_cpqhocpuutiltable_kpi.capacity = 10000
flume1.channels.memory-channel-airtel_ims_p1360com_cpqhocpuutiltable_kpi.transactionCapacity = 10000

flume1.sinks.hdfs-sink-airtel_ims_p1360com_cpqhocpuutiltable_kpi.channel = memory-channel-airtel_ims_p1360com_cpqhocpuutiltable_kpi
flume1.sinks.hdfs-sink-airtel_ims_p1360com_cpqhocpuutiltable_kpi.type = hdfs
flume1.sinks.hdfs-sink-airtel_ims_p1360com_cpqhocpuutiltable_kpi.hdfs.useLocalTimeStamp = false
flume1.sinks.hdfs-sink-airtel_ims_p1360com_cpqhocpuutiltable_kpi.hdfs.path =  hdfs://135.250.193.206:8020/user/hive/warehouse/spm_database.db/spm_kpi_data/customername=%{customer}/networkid=%{networkid}/subnetwork=%{subnetwork}/entity_type=%{product}/year=%{year}/month=%{month}/day=%{day}
flume1.sinks.hdfs-sink-airtel_ims_p1360com_cpqhocpuutiltable_kpi.hdfs.filePrefix = airtel_ims_p1360com_cpqhocpuutiltable_kpi
flume1.sinks.hdfs-sink-airtel_ims_p1360com_cpqhocpuutiltable_kpi.hdfs.maxOpenFiles=150
flume1.sinks.hdfs-sink-airtel_ims_p1360com_cpqhocpuutiltable_kpi.hdfs.rollSize = 0
flume1.sinks.hdfs-sink-airtel_ims_p1360com_cpqhocpuutiltable_kpi.hdfs.rollCount = 0
flume1.sinks.hdfs-sink-airtel_ims_p1360com_cpqhocpuutiltable_kpi.hdfs.rollInterval = 30
flume1.sinks.hdfs-sink-airtel_ims_p1360com_cpqhocpuutiltable_kpi.hdfs.writeFormat=Text
flume1.sinks.hdfs-sink-airtel_ims_p1360com_cpqhocpuutiltable_kpi.hdfs.inUsePrefix=_
flume1.sinks.hdfs-sink-airtel_ims_p1360com_cpqhocpuutiltable_kpi.hdfs.fileType = DataStream

=================================
package alu.spm.flume.interceptor;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

/**
 * @author  Sangala Shakhar Reddy
 *
 */
public class CustomInterceptor implements Interceptor {


private final String yearheader = "year";
private final String monthheader = "month";
private final String dayheader = "day";

private final String customer = "customer";
private final String networkid = "networkid";
private final String subnetwork = "subnetwork";
private final String product = "product";

@Override
public Event intercept(Event event) {

String customerval = null;
String networkidrval = null;
String productVal = null;
String subnetworkval = null;
String yearheaderval = null;
String monthheaderval = null;
String dayheaderval = null;

byte[] eventBody = event.getBody();
try {

String body = new String(eventBody);
// 2015-02-13 00:01:09;00:01:09;-05:00;PT300S;PT300S;comSubscriberPmPerIpTable;1;nltcom01;1;4;10;6;1;1234.06;4321.04;KA;2015-02-13 00:01:09$Airtel$IMS_1$1360COM;last_hour_cpu_utilization_kpi;100
String[] strs = body.split(";");
String res = strs[strs.length - 3];
String[] p = res.split("#");


String aDate = p[0];
// String aDate = "2012-04-13 sdfsd00:00:00";
// String aDate = "2012-4-13";
Pattern datePattern = Pattern.compile("(\\d{4})-(\\d{2})-(\\d{2})");
Matcher dateMatcher = datePattern.matcher(aDate);
if (dateMatcher.find()) {
yearheaderval = dateMatcher.group(1);
monthheaderval = dateMatcher.group(2);
dayheaderval = dateMatcher.group(3);
}

customerval = p[1];
networkidrval = p[2];
subnetworkval = p[3];
productVal = p[4];
// event.setBody("test insert".getBytes());
} catch (Exception e) {
e.printStackTrace();
}
// depeen 95602 63623

Map headers = event.getHeaders();
headers.put(customer, customerval);
headers.put(networkid, networkidrval);
headers.put(subnetwork, subnetworkval);
headers.put(product, productVal);
headers.put(yearheader, yearheaderval);
headers.put(monthheader, monthheaderval);
headers.put(dayheader, dayheaderval);

event.setHeaders(headers);
return event;
}

@Override
public void close() {
// TODO Auto-generated method stub

}

@Override
public void initialize() {
// TODO Auto-generated method stub

}

@Override
public List intercept(List events) {
for (Iterator iterator = events.iterator(); iterator.hasNext();) {
Event next = intercept((Event) iterator.next());
if (next == null) {
iterator.remove();
}
}
return events;
}

public static class Builder implements Interceptor.Builder {
@Override
public void configure(Context context) {
// TODO Auto-generated method stub
}

@Override
public Interceptor build() {
return new CustomInterceptor();
}
}

}

Flume Elastic Search config

flume1.sources = source1
flume1.channels = channel1
flume1.sinks = k1

flume1.channels.channel1.type = memory
flume1.channels.channel1.capacity = 10000000
flume1.channels.channel1.transactionCapacity = 1000

# For each source, channel, and sink, set standard properties
flume1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
flume1.sources.source1.zookeeperConnect = 135.250.193.206:2181
flume1.sources.source1.topic = airtel_ims_p1360com_cpqhocpuutiltable_kpi
flume1.sources.source1.batchSize = 5
flume1.sources.source1.batchDurationMillis = 200
flume1.sources.source1.channels = channel1


flume1.sinks.k1.channel = channel1
flume1.sinks.k1.type = elasticsearch
flume1.sinks.k1.batchSize = 100
flume1.sinks.k1.hostNames = 135.250.193.206:9300
flume1.sinks.k1.indexName = p1360com_cpqhocpuutiltable_index
flume1.sinks.k1.indexType = p1360com_cpqhocpuutiltable_type
flume1.sinks.k1.clusterName = spmdevteam
flume1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer

Flume - Sannpy and create tmp file with prefix _

flume1.sources = kafka-source-ip_cardstatus_kpi
flume1.channels = memory-channel-ip_cardstatus_kpi
flume1.sinks  = hdfs-sink-ip_cardstatus_kpi

# For each source, channel, and sink, set standard properties
flume1.sources.kafka-source-ip_cardstatus_kpi.type = org.apache.flume.source.kafka.KafkaSource
flume1.sources.kafka-source-ip_cardstatus_kpi.zookeeperConnect = 135.250.193.206:2181
flume1.sources.kafka-source-ip_cardstatus_kpi.topic = ip_cardstatus_kpi_enriched
flume1.sources.kafka-source-ip_cardstatus_kpi.batchSize = 5
flume1.sources.kafka-source-ip_cardstatus_kpi.batchDurationMillis = 200
flume1.sources.kafka-source-ip_cardstatus_kpi.channels = memory-channel-ip_cardstatus_kpi

flume1.sources.kafka-source-ip_cardstatus_kpi.interceptors = i1
flume1.sources.kafka-source-ip_cardstatus_kpi.interceptors.i1.type=regex_extractor
flume1.sources.kafka-source-ip_cardstatus_kpi.interceptors.i1.regex = (\d\d\d\d-\d\d-\d\d)
flume1.sources.kafka-source-ip_cardstatus_kpi.interceptors.i1.serializers = s1
flume1.sources.kafka-source-ip_cardstatus_kpi.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
flume1.sources.kafka-source-ip_cardstatus_kpi.interceptors.i1.serializers.s1.name = timestamp
flume1.sources.kafka-source-ip_cardstatus_kpi.interceptors.i1.serializers.s1.pattern =yyyy-MM-dd

# Other properties are specific to each type of source, channel, or sink. In this case, we specify the capacity of the memory channel.
flume1.channels.memory-channel-ip_cardstatus_kpi.type = memory
flume1.channels.memory-channel-ip_cardstatus_kpi.capacity = 10000
flume1.channels.memory-channel-ip_cardstatus_kpi.transactionCapacity = 10000

flume1.sinks.hdfs-sink-ip_cardstatus_kpi.channel = memory-channel-ip_cardstatus_kpi
flume1.sinks.hdfs-sink-ip_cardstatus_kpi.type = hdfs
flume1.sinks.hdfs-sink-ip_cardstatus_kpi.hdfs.useLocalTimeStamp = false
flume1.sinks.hdfs-sink-ip_cardstatus_kpi.hdfs.path =  hdfs://135.250.193.206:8020/user/hive/warehouse/spm_database.db/ip_cardstatus_kpi/year=%Y/month=%m/day=%d
flume1.sinks.hdfs-sink-ip_cardstatus_kpi.hdfs.filePrefix = ip_cardstatus_kpi
flume1.sinks.hdfs-sink-ip_cardstatus_kpi.hdfs.maxOpenFiles=150
flume1.sinks.hdfs-sink-ip_cardstatus_kpi.hdfs.rollSize = 0
flume1.sinks.hdfs-sink-ip_cardstatus_kpi.hdfs.rollCount = 0
flume1.sinks.hdfs-sink-ip_cardstatus_kpi.hdfs.rollInterval = 30
flume1.sinks.hdfs-sink-ip_cardstatus_kpi.hdfs.writeFormat=Text
flume1.sinks.hdfs-sink-ip_cardstatus_kpi.hdfs.inUsePrefix=_
flume1.sinks.hdfs-sink-ip_cardstatus_kpi.hdfs.fileType = CompressedStream
flume1.sinks.hdfs-sink-ip_cardstatus_kpi.hdfs.codeC=snappy

Static flume interceptor

 tier1.sources  = source1
 tier1.channels = channel1
 tier1.sinks = sink1

 tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
 tier1.sources.source1.zookeeperConnect = 135.250.193.206:2181
 tier1.sources.source1.topic = flum_test
 tier1.sources.source1.groupId = flume
 tier1.sources.source1.channels = channel1

 tier1.sources.source1.interceptors = i1
 tier1.sources.source1.interceptors.i1.type = static
 tier1.sources.source1.interceptors.i1.key = one
 tier1.sources.source1.interceptors.i1.value = 1

 tier1.channels.channel1.type = memory
 tier1.channels.channel1.capacity = 10000
 tier1.channels.channel1.transactionCapacity = 1000

 tier1.sinks.sink1.type = hdfs
 #tier1.sinks.sink1.hdfs.path = /tmp/kafka/%{topic}/%y-%m-%d
 tier1.sinks.sink1.hdfs.path =hdfs://135.250.193.206:8020/user/hive/warehouse/spm_database.db/%{topic}/%{one}
 tier1.sinks.sink1.hdfs.rollInterval = 5
 tier1.sinks.sink1.hdfs.rollSize = 0
 tier1.sinks.sink1.hdfs.rollCount = 0
 tier1.sinks.sink1.hdfs.fileType = DataStream
 tier1.sinks.sink1.channel = channel1

Flume config - without source


agent1.sinks = sink1
agent1.channels = channel1

# Describe channel here
agent1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
agent1.channels.channel1.brokerList = 135.250.193.206:9092
agent1.channels.channel1.topic = flum_test
agent1.channels.channel1.zookeeperConnect = 135.250.193.206:2181
agent1.channels.channel1.parseAsFlumeEvent=false
                                                                                                                                                                             
agent1.channels.channel1.transactionCapacity = 1000000
agent1.channels.channel1.checkpointInterval = 30000
agent1.channels.channel1.maxFileSize = 2146435071
agent1.channels.channel1.capacity= 10000000

agent1.sinks.sink1.channel = channel1

# Describe sink1                                                                                                                                                                                                                          
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path =  hdfs://135.250.193.206:8020/user/hive/warehouse/spm_database.db/flum_test/year=%Y/month=%m/day=%d
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
agent1.sinks.sink1.hdfs.filePrefix = LogCreateTest

agent1.sinks.sink1.hdfs.rollInterval = 600
agent1.sinks.sink1.hdfs.rollSize = 0
agent1.sinks.sink1.hdfs.rollCount = 10000
agent1.sinks.sink1.hdfs.batchSize = 10000
agent1.sinks.sink1.hdfs.txnEventMax = 40000
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.maxOpenFiles=50
agent1.sinks.sink1.hdfs.appendTimeout = 10000
agent1.sinks.sink1.hdfs.callTimeout = 10000
agent1.sinks.sink1.hdfs.threadsPoolSize=100
agent1.sinks.sink1.hdfs.rollTimerPoolSize = 1

Steps to install flume

service iptables stop
service ip6tables stop
sudo yum install flume-ng flume-ng-agent flume-ng-doc
sudo yum install flume-ng-agent
sudo yum install flume-ng-doc


If flume not comming up then run below commands:
sudo service cloudera-scm-agent hard_stop
service cloudera-scm-agent start

scheduling workflows using oozie coordinator

1. Create folder and create lib under new folder in hdfs
hadoop fs -mkdir /user/oozie/shekhar
hadoop fs -mkdir /user/oozie/shekhar/lib
2. Copy workflow.xml and coordinator.xml file to hdfs location /user/oozie/shekhar
hadoop fs -put workflow.xml /user/oozie/shekhar/
3. Copy your jars to hdfs location /user/oozie/shekhar/lib
hadoop fs -put sparkanalitics-1.jar /user/oozie/shekhar/lib/
4. Create coordinatorjob.properties and simpleSparkTest.sh file in current dir (not in HDFS)
5. Run using oozie command in current dir
$ oozie job -oozie http://localhost:11000/oozie -config coordinatorjob.properties -submit
job: 0000673-120823182447665-oozie-hado-C

logs will generates in /root/oozie-oozi/0000673-120823182447665-oozie-hado-C folder

Suspending the coordinator job
$ oozie job -oozie http://localhost:11000/oozie -suspend 0000673-120823182447665-oozie-hado-C
Resuming a Coordinator Job
$ oozie job -oozie http://localhost:11000/oozie -resume 0000673-120823182447665-oozie-hado-C
Killing a Coordinator Job
$ oozie job -oozie http://localhost:11000/oozie -kill 0000673-120823182447665-oozie-hado-C
Rerunning a Coordinator Action or Multiple Actions
$ oozie job -rerun 0000673-120823182447665-oozie-hado-C [-nocleanup]
[-refresh][-action 1,3-5] [-date 2012-01-01T01:00Z::2012-05-31T23:59Z, 2012-11-10T01:00Z, 2012-12-31T22:00Z]
-action or -date is required to rerun. If neither -action nor -date is given, the exception will be thrown.

Checking the Status of a Coordinator/Workflow job or a Coordinator Action
$ oozie job -oozie http://localhost:11000/oozie -info 0000673-20823182447665-oozie-hado-C

The info option can display information about a workflow job or coordinator job or coordinator action.
============================
workflow.xml


   
   
       
            root@cooper.alcatel.com
            /var/lib/hadoop-hdfs/simpleSparkTest.sh
       
       
       
   
   
        Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
   
   

=============================
coordinator.xml




${workflowPath}


       
============================
simpleSparkTest.sh

cd /var/lib/hadoop-hdfs
spark-submit --jars ./utils-common-1.0.0.jar --master yarn --class alu.ausdc.analitics.sparkEx.log4j.Main sparkanalitics-1.jar 20-01-2015
===============================

coordinatorjob.properties

#run for every 5 sec
frequency=5
startTime=2012-08-31T20\:20Z
endTime=2013-08-31T20\:20Z
timezone=GMT+0530

nameNode=hdfs://cooper.alcatel.com:8020
jobTracker=cooper.alcatel.com:8021
queueName=default
inputDir=${nameNode}/data.in
outputDir=${nameNode}/out
user.name=training
# give workflow xml path here
workflowPath=${nameNode}/user/oozie/shekhar
# give coordinator xml path here
oozie.coord.application.path=${nameNode}/user/oozie/shekhar