The below post describes how to configure Flum to grab data from a Kafaka topic as the source then send the data to an HDFS target. Pipeline flow
1 2 |
Original source > Kafka topic > Flum from (Kafka topic) > HDFS |
Create a run file
1 2 3 4 5 6 |
agent_name=flume1 flume-ng agent -n $agent_name -c conf -f conf/flume.conf # Debug to screen flume-ng agent -n $agent_name -c conf -f conf/flume.conf -Dflume.root.logger=INFO,console |
Create configuration files Create environment file
1 2 3 4 5 6 7 8 |
mkdir conf conf/flume-env.sh # Give Flume more memory and pre-allocate, enable remote monitoring via JMX export JAVA_OPTS="-Xms2048m -Xmx2048m -Dcom.sun.management.jmxremote" # Note that the Flume conf directory is always included in the classpath. export FLUME_CLASSPATH="/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/zookeeper-3.4.5-cdh5.4.0.jar" |
Create configuration file
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
conf/flume.conf flume1.sources = kafkaSource flume1.channels = memoryChannel flume1.sinks = hdfsSink flume1.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource flume1.sources.kafkaSource.zookeeperConnect = n01.domain.com:2181,n02.domain.com:2181 flume1.sources.kafkaSource.topic = web-sql-log flume1.sources.kafkaSource.batchSize = 5 flume1.sources.kafkaSource.batchDurationMillis = 200 flume1.sources.kafkaSource.channels = memoryChannel #flume1.sources.kafkaSource.groupId = new flume1.sources.kafkaSource.groupId = flume-sql-logs flume1.sources.kafkaSource.kafka.auto.offset.reset = smallest #Use this to modify the source message #flume1.sources.kafkaSource.interceptors = i1 # Regex Interceptor to set timestamp so that HDFS can be written to partitioned #flume1.sources.kafkaSource.interceptors.i1.type = regex_extractor #flume1.sources.kafkaSource.interceptors.i1.serializers = s1 #flume1.sources.kafkaSource.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer #flume1.sources.kafkaSource.interceptors.i1.serializers.s1.name = timestamp ## Match this format logfile to get timestamp from it: ## 76.164.194.74 - - [06/Apr/2014:03:38:07 +0000] "GET / HTTP/1.1" 200 38281 "-" "Pingdom.com_bot_version_1.4_(http://www.pingdom.com/)" #flume1.sources.kafkaSource.interceptors.i1.regex = (d{2}/[a-zA-Z]{3}/d{4}:d{2}:d{2}:d{2}s+d{4}) #flume1.sources.kafkaSource.interceptors.i1.serializers.s1.pattern = dd/MMM/yyyy:HH:mm:ss Z # # http://flume.apache.org/FlumeUserGuide.html#memory-channel flume1.channels.memoryChannel.type = memory flume1.channels.memoryChannel.capacity = 100 flume1.channels.memoryChannel.transactionCapacity = 100 ## Write to HDFS #http://flume.apache.org/FlumeUserGuide.html#hdfs-sink flume1.sinks.hdfsSink.type = hdfs flume1.sinks.hdfsSink.channel = memoryChannel flume1.sinks.hdfsSink.hdfs.kerberosPrincipal = flume/n01.domain.com@DEVTECH101.COM flume1.sinks.hdfsSink.hdfs.kerberosKeytab = /var/tmp/elik/flume.keytab flume1.sinks.hdfsSink.hdfs.path = /tmp/kafka/%{topic}/%y-%m-%d flume1.sinks.hdfsSink.hdfs.fileType = DataStream flume1.sinks.hdfsSink.hdfs.writeFormat = Text flume1.sinks.hdfsSink.hdfs.rollSize = 0 flume1.sinks.hdfsSink.hdfs.batchSize = 100 flume1.sinks.hdfsSink.hdfs.rollCount = 10000 flume1.sinks.hdfsSink.hdfs.rollInterval = 600 |
Start the flume agent Kafka and Flume to HDFS