DevTech101

DevTech101

The below post describes how to configure Flum to grab data from a Kafaka topic as the source then send the data to an HDFS target.
Pipeline flow

Original source > Kafka topic 
> Flum from (Kafka topic) > HDFS

Create a run file

agent_name=flume1

flume-ng agent -n $agent_name -c conf -f conf/flume.conf

# Debug to screen
flume-ng agent -n $agent_name -c conf -f conf/flume.conf -Dflume.root.logger=INFO,console

Create configuration files

Create environment file
mkdir conf

conf/flume-env.sh
# Give Flume more memory and pre-allocate, enable remote monitoring via JMX
export JAVA_OPTS="-Xms2048m -Xmx2048m -Dcom.sun.management.jmxremote"

# Note that the Flume conf directory is always included in the classpath.
export FLUME_CLASSPATH="/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/zookeeper-3.4.5-cdh5.4.0.jar"
Create configuration file
conf/flume.conf
flume1.sources = kafkaSource  
flume1.channels = memoryChannel  
flume1.sinks = hdfsSink 
 
flume1.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource  
flume1.sources.kafkaSource.zookeeperConnect = n01.domain.com:2181,n02.domain.com:2181
flume1.sources.kafkaSource.topic = web-sql-log  
flume1.sources.kafkaSource.batchSize = 5  
flume1.sources.kafkaSource.batchDurationMillis = 200  
flume1.sources.kafkaSource.channels = memoryChannel
#flume1.sources.kafkaSource.groupId = new
flume1.sources.kafkaSource.groupId = flume-sql-logs
flume1.sources.kafkaSource.kafka.auto.offset.reset = smallest  
#Use this to modify the source message
#flume1.sources.kafkaSource.interceptors = i1
# Regex Interceptor to set timestamp so that HDFS can be written to partitioned  
#flume1.sources.kafkaSource.interceptors.i1.type = regex_extractor  
#flume1.sources.kafkaSource.interceptors.i1.serializers = s1  
#flume1.sources.kafkaSource.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer  
#flume1.sources.kafkaSource.interceptors.i1.serializers.s1.name = timestamp  
## Match this format logfile to get timestamp from it:  
## 76.164.194.74 - - [06/Apr/2014:03:38:07 +0000] "GET / HTTP/1.1" 200 38281 "-" "Pingdom.com_bot_version_1.4_(http://www.pingdom.com/)"  
#flume1.sources.kafkaSource.interceptors.i1.regex = (d{2}/[a-zA-Z]{3}/d{4}:d{2}:d{2}:d{2}s+d{4})  
#flume1.sources.kafkaSource.interceptors.i1.serializers.s1.pattern = dd/MMM/yyyy:HH:mm:ss Z  
#
 
# http://flume.apache.org/FlumeUserGuide.html#memory-channel  
flume1.channels.memoryChannel.type = memory  
flume1.channels.memoryChannel.capacity = 100
flume1.channels.memoryChannel.transactionCapacity = 100
 
## Write to HDFS  
#http://flume.apache.org/FlumeUserGuide.html#hdfs-sink  
flume1.sinks.hdfsSink.type = hdfs  
flume1.sinks.hdfsSink.channel = memoryChannel  
flume1.sinks.hdfsSink.hdfs.kerberosPrincipal = flume/n01.domain.com@DEVTECH101.COM
flume1.sinks.hdfsSink.hdfs.kerberosKeytab = /var/tmp/elik/flume.keytab
flume1.sinks.hdfsSink.hdfs.path = /tmp/kafka/%{topic}/%y-%m-%d
flume1.sinks.hdfsSink.hdfs.fileType = DataStream  
flume1.sinks.hdfsSink.hdfs.writeFormat = Text  
flume1.sinks.hdfsSink.hdfs.rollSize = 0  
flume1.sinks.hdfsSink.hdfs.batchSize = 100
flume1.sinks.hdfsSink.hdfs.rollCount = 10000  
flume1.sinks.hdfsSink.hdfs.rollInterval = 600
Start the flume agent

Kafka and Flume to HDFS

0 0 votes
Article Rating
Subscribe
Notify of
guest
2 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Christine C
Christine C
February 20, 2019 5:11 pm

Where did you configure the elasticsearch target? I could not locate that in the configuration file.

2
0
Would love your thoughts, please comment.x
()
x
%d bloggers like this: