Wednesday, November 9, 2016

Static flume interceptor

 tier1.sources  = source1
 tier1.channels = channel1
 tier1.sinks = sink1

 tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
 tier1.sources.source1.zookeeperConnect = 135.250.193.206:2181
 tier1.sources.source1.topic = flum_test
 tier1.sources.source1.groupId = flume
 tier1.sources.source1.channels = channel1

 tier1.sources.source1.interceptors = i1
 tier1.sources.source1.interceptors.i1.type = static
 tier1.sources.source1.interceptors.i1.key = one
 tier1.sources.source1.interceptors.i1.value = 1

 tier1.channels.channel1.type = memory
 tier1.channels.channel1.capacity = 10000
 tier1.channels.channel1.transactionCapacity = 1000

 tier1.sinks.sink1.type = hdfs
 #tier1.sinks.sink1.hdfs.path = /tmp/kafka/%{topic}/%y-%m-%d
 tier1.sinks.sink1.hdfs.path =hdfs://135.250.193.206:8020/user/hive/warehouse/spm_database.db/%{topic}/%{one}
 tier1.sinks.sink1.hdfs.rollInterval = 5
 tier1.sinks.sink1.hdfs.rollSize = 0
 tier1.sinks.sink1.hdfs.rollCount = 0
 tier1.sinks.sink1.hdfs.fileType = DataStream
 tier1.sinks.sink1.channel = channel1

No comments: