Wednesday, November 9, 2016

Flume - Sannpy and create tmp file with prefix _

flume1.sources = kafka-source-ip_cardstatus_kpi
flume1.channels = memory-channel-ip_cardstatus_kpi
flume1.sinks  = hdfs-sink-ip_cardstatus_kpi

# For each source, channel, and sink, set standard properties
flume1.sources.kafka-source-ip_cardstatus_kpi.type = org.apache.flume.source.kafka.KafkaSource
flume1.sources.kafka-source-ip_cardstatus_kpi.zookeeperConnect = 135.250.193.206:2181
flume1.sources.kafka-source-ip_cardstatus_kpi.topic = ip_cardstatus_kpi_enriched
flume1.sources.kafka-source-ip_cardstatus_kpi.batchSize = 5
flume1.sources.kafka-source-ip_cardstatus_kpi.batchDurationMillis = 200
flume1.sources.kafka-source-ip_cardstatus_kpi.channels = memory-channel-ip_cardstatus_kpi

flume1.sources.kafka-source-ip_cardstatus_kpi.interceptors = i1
flume1.sources.kafka-source-ip_cardstatus_kpi.interceptors.i1.type=regex_extractor
flume1.sources.kafka-source-ip_cardstatus_kpi.interceptors.i1.regex = (\d\d\d\d-\d\d-\d\d)
flume1.sources.kafka-source-ip_cardstatus_kpi.interceptors.i1.serializers = s1
flume1.sources.kafka-source-ip_cardstatus_kpi.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
flume1.sources.kafka-source-ip_cardstatus_kpi.interceptors.i1.serializers.s1.name = timestamp
flume1.sources.kafka-source-ip_cardstatus_kpi.interceptors.i1.serializers.s1.pattern =yyyy-MM-dd

# Other properties are specific to each type of source, channel, or sink. In this case, we specify the capacity of the memory channel.
flume1.channels.memory-channel-ip_cardstatus_kpi.type = memory
flume1.channels.memory-channel-ip_cardstatus_kpi.capacity = 10000
flume1.channels.memory-channel-ip_cardstatus_kpi.transactionCapacity = 10000

flume1.sinks.hdfs-sink-ip_cardstatus_kpi.channel = memory-channel-ip_cardstatus_kpi
flume1.sinks.hdfs-sink-ip_cardstatus_kpi.type = hdfs
flume1.sinks.hdfs-sink-ip_cardstatus_kpi.hdfs.useLocalTimeStamp = false
flume1.sinks.hdfs-sink-ip_cardstatus_kpi.hdfs.path =  hdfs://135.250.193.206:8020/user/hive/warehouse/spm_database.db/ip_cardstatus_kpi/year=%Y/month=%m/day=%d
flume1.sinks.hdfs-sink-ip_cardstatus_kpi.hdfs.filePrefix = ip_cardstatus_kpi
flume1.sinks.hdfs-sink-ip_cardstatus_kpi.hdfs.maxOpenFiles=150
flume1.sinks.hdfs-sink-ip_cardstatus_kpi.hdfs.rollSize = 0
flume1.sinks.hdfs-sink-ip_cardstatus_kpi.hdfs.rollCount = 0
flume1.sinks.hdfs-sink-ip_cardstatus_kpi.hdfs.rollInterval = 30
flume1.sinks.hdfs-sink-ip_cardstatus_kpi.hdfs.writeFormat=Text
flume1.sinks.hdfs-sink-ip_cardstatus_kpi.hdfs.inUsePrefix=_
flume1.sinks.hdfs-sink-ip_cardstatus_kpi.hdfs.fileType = CompressedStream
flume1.sinks.hdfs-sink-ip_cardstatus_kpi.hdfs.codeC=snappy

No comments: