Introduction to Data Science Part 9: Flume advanced concepts
Parag Ray
7-Feb-2015
Introduction
This is the ninth part of the series of articles. Here we are looking into the needs of uploading data into HDFS or data acquisition tools. We shall be exploring flume tuning as part of the strategy.
If you have not gone through already, you would need to go back to eight part (flume introduction) & fourth part where multi node set up(see related readings) is discussed, as this post will depend on the multi node set up and flume introduction.
Please note the related readings and target audience section to get help to better follow the blog.
We are using the operating system Ubuntu 14.04. Hadoop version 1.2.1 and Flume version 1.5.2. java version 1.7.
Disclaimer
I do not represent any particular organization in providing this details here.Implementation of tools and techniques discussed in this presentation should be done by responsible Implementer, capable of undertaking such tasks.
Agenda
This is the eighth article of this series or blogs, other article shortcuts are available in the pages tab.
You would also like to look at Flume, Cloudera & Hadoop home page for further details.
Overall objective of case study
Objectives of this blog is primarily to expose flume to varied degree of stress and see how flume behave under varied stress as a java process. Real world application will require various load levels and complexity and the strategy to have a test bet which will allow simulation of stress levels will definitely help design decisions.
Creating test bed
We shall use the following tools to create application, we expect you to already have them , if not please set up these,-
Creating source application
Now in eclipse you can create a java project and include the log4j libraries as well as flume libraries other than the defaults that you shall any way have for log4j.
You can ignore the source and java doc jars , but I suggest to have them as part of the project.
Next would be the log4j configuration file which is an xml.
You have a folder like config and put the file log4j2.xml there.
Please refer the folder structure showing the class file and log4j config file
You can use the log4j config as below
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="debug" name="Flume" verbose="false" monitorInterval="30">
<MarkerFilter marker="EVENT" onMatch="ACCEPT" onMismatch="NEUTRAL" />
<Appenders>
<Flume name="flume" ignoreExceptions="false" mdcPrefix="ReqCtx_" compress="false">
<Agent host="localhost" port="8800" />
<PatternLayout pattern="%d [%p] %c %m%n"/>
<!-- <RFC5424Layout enterpriseNumber="12293" includeMDC="true" mdcId="RequestContext" appName="GL" /> -->
</Flume>
</Appenders>
<Loggers>
<Root name="EventLogger" level="debug" additivity="true">
<AppenderRef ref="flume" />
</Root>
</Loggers>
</Configuration>
Note here the port used to connect to flume is 8800 , in case you have trouble connecting , you might be intersted to check if the port is free in your set up.
I suggest you try to check if the logs are being generated using console appender before proceeding to next steps. Please use only one appender, as I have noted that using dual appender together with Flume like console & Flume causes the log4j to break. There seems to be a few bugs reported, but i have not concluded on this myself.
Moving over to flume config, save this in conf folder under the flume installation folder with name flume-conf.properties-
agent.sources = avroSrc
agent.channels = memoryChannel
agent.sinks = loggerSink
# For each one of the sources, the type is defined
agent.sources.avroSrc.type = avro
agent.sources.avroSrc.bind = localhost
agent.sources.avroSrc.port = 8800
# The channel can be defined as follows.
agent.sources.avroSrc.channels = memoryChannel
# Each sink's type must be defined
agent.sinks.loggerSink.type = hdfs
agent.sinks.loggerSink.hdfs.path =hdfs://PRUBNode1:9020/flume/events/
#Specify the channel the sink should use
agent.sinks.loggerSink.channel = memoryChannel
# Each channel's type is defined.
agent.channels.memoryChannel.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100
This assumes that flume is listening to local host and the log generator will be running in the same node as that of the flume agent.
For Hadoop set up, we assume, you are using the same set up as it was created in the HDFS configuration used for multi-node cluster set up in previous blog (Multi-node HDFS cluster).
Please make sure that the previous blog on flume is actually working for you.
We are now all set to crank up the entire thing.
First thing to make sure that HDFS is running. Please use jps command to make sure that hdfs is up , if not, please start it up withh ./start-all.sh command in $HADOOP_HOME/bin.
Once Hadoop is up and running you must go to flume bin folder and start up flume with the following command,-
./flume-ng agent --conf ../conf/ -f ../conf/flume-conf.properties -Dflume.root.logger=INFO,console -n agent
Once you see an entry in the console as below you know flume is started,-
2015-02-08 21:42:31,079 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:253)] Avro source avroSrc started
Please go trough the logs printed in teh console to check in case of any error.
Once Flume is started , we are in a position to generate the logs. So we go to the Eclipse,
we ensure the following,-
1.Verify all the jars and libraries for flume and log 4j are present.
2. Convince yourself that log4j2 xml is in path for the class. If not put it here.
Right click on LogGenerator source and go to 'Run As' and select run configuration.
In the run configuration we need to do the following,-
1. In the program arguments put 100 under arguments tab.This ensures logs are written 201 times if you have read the java code.
Press 'Run' and go back to flume terminal,and see if entries similar to those below are appearing,-
2015-02-08 19:49:21,271 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:261)] Creating hdfs://PRUBNode1:9020/flume/events//FlumeData.1423405159441.tmp
2015-02-08 19:49:51,330 (hdfs-loggerSink-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:409)] Closing hdfs://PRUBNode1:9020/flume/events//FlumeData.1423405159441.tmp
2015-02-08 19:49:51,331 (hdfs-loggerSink-call-runner-8) [INFO - org.apache.flume.sink.hdfs.BucketWriter$3.call(BucketWriter.java:339)] Close tries incremented
2015-02-08 19:49:51,398 (hdfs-loggerSink-call-runner-9) [INFO - org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:669)] Renaming hdfs://PRUBNode1:9020/flume/events/FlumeData.1423405159441.tmp to hdfs://PRUBNode1:9020/flume/events/FlumeData.1423405159441
Now you have good reason to believe that data is written to HDFS.
We move to HDFS.
And from $HADOOP_HOME/bin issue the following
hadoop fs -ls /flume/events/*
This will list the files written in HDFS. You can use other FS command to view the files and etc. Since your local file system may not actually be very big, you may use the command as under to clean it up after each run,-
hadoop fs -rm /flume/events/*
It might be handy to check the count of the files, hadoop fs -ls /flume/events/*|wc -l
Trouble shooting
Case 1::
Now in my case, if look very closely to the log, i see the following entry,-
New I/O worker #2) [WARN - org.apache.flume.source.AvroSource.append(AvroSource.java:358)] Avro source avroSrc: Unable to process event. Exception follows.
org.apache.flume.ChannelException: Unable to put event on required channel: org.apache.flume.channel.MemoryChannel{name: memoryChannel}
at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:275)
at org.apache.flume.source.AvroSource.append(AvroSource.java:356)
at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
...
...
Caused by: org.apache.flume.ChannelFullException: Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:130)
Now if I inspect the data dumped in HDFS, I can be very sure that there is a loss of 10% of data compared to what was generated! You can check it very easily as we know how many (201) log entries we are expecting and we can see ow many are there.
Let us understand what is happening here.
The root cause of the issue seems to be that we are writing into the flume channel faster than rate at which sink can clean it out.This fills up the channel.So creating a bigger capacity in the channel may clear the issue.
It is notable here that the originator program here running without any interruption & generating log at a extremely high rate; where as flume is negotiating handshaking with hadoop, and hadoop is negotiating network bottleneck when it is doing the necessary replication across nodes.
If we look at the steps in the logs,we see, flume is doing a lot of i/o intensive steps like creating tmp file then finalizing it etc. So it is natural that in this case at continuous run , sink is likely to be falling behind the source. In many of the actual situation, however, source do not continuously generate events. There is likely to be intermittent gaps at random intervals in the flow of event generation. This may give enough of opportunity for the sink to catch up, provided we have enough capacity in channel. So this can be first level strategy. Let us try doing it and see what happens.
In the flume properties file I had the following entries
agent.channels.memoryChannel.capacity = 500
agent.channels.memoryChannel.transactionCapacity = 200
And in flume-env.sh added the following as higher capacity will require bigger heap space.
JAVA_OPTS="-Xms100m -Xmx512m"
Once i do this and restart the flume process an fire the logs again, the error seems to have been resolved.And the files are now fully stored.
In our log generator we have been only doing one task and that is to generate logs and we do agree that hardly any real world applications do This.
This would mean that the task would involve other processing and by that token they generates less log per unit of time. Especially , once code is in production, logs are reduced to WARN or INFO level which gives much less log. And there will be gaps and cycles in the speed in which the logs will be generated.
So this fix may work in real world , but we now consider further complexity.
We do need however to model and try various capacities and tune the capacity of the channel and other parameters.
Case 2::
There are situations however, where it is necessary to consider very high and continuous event generation sinreario and if the events are generated faster than they are consumed by sink , we shall eventually end up in the same scenario.
We can simulate such situation by simply increasing the number of iteration the loop runs by changing the argument to 10000 so that it runs for longer duration and kicking up more than one process causing more load to flume.
This Will cause the flume to be engaged enough so that response will be slow and exception will be thrown as channel fills up.
There could be simple to more complex strategies to handle such situation.
1. Increase keepAlive time: By default if the channel is full, Channel processor keeps on retrying for 3 seconds before exception is thrown, However in case of delayed response due to over loaded processes the keepAlive time may get exceeded and the exception thrown. We can tune it up.
Following property can be added,-
agent.channels.memoryChannel.keepAlive = 30
2. Try to make the payload for the events bigger. This will cause the flume hold up some data and then write together. In case we are writing to HDFS, this may help as HDFS is expected to perform better with large but small number of files compared to small but large number of files.This way we are able to take full advantage of the memmory channel also as less number of I/O is happening as for sink to write it is waiting longer. Ofcourse, we have to keep in mind the heap size.
We are using the following additional configuration,-
# Number of seconds to wait before rolling current file (0 = never roll based on time interval)
agent.sinks.loggerSink.hdfs.rollInterval = 0
# File size to trigger roll, in bytes (0: never roll based on file size)
agent.sinks.loggerSink.hdfs.rollSize = 524288
#Number of events written to file before it rolled (0 = never roll based on number of events)
agent.sinks.loggerSink.hdfs.rollCount =0
Also the following are added for additional capacity,-
agent.channels.memoryChannel.capacity = 1000
agent.channels.memoryChannel.transactionCapacity = 100
For additional heap size we have already provided setting in flume-env.sh as under,-
And in flume-env.sh added the following
JAVA_OPTS="-Xms100m -Xmx512m"
This sucessfully loads the logs to HDFS.
HDFS storage looks like the following,-
parag@PRUBNode1:~/work/hadoop-1.2.1/bin$ hadoop fs -ls /flume/events/
Warning: $HADOOP_HOME is deprecated.
Found 12 items
-rw-r--r-- 2 parag supergroup 640216 2015-02-15 00:25 /flume/events/FlumeData.1423940101870
-rw-r--r-- 2 parag supergroup 639846 2015-02-15 00:25 /flume/events/FlumeData.1423940101871
-rw-r--r-- 2 parag supergroup 640000 2015-02-15 00:25 /flume/events/FlumeData.1423940101872
-rw-r--r-- 2 parag supergroup 639747 2015-02-15 00:25 /flume/events/FlumeData.1423940101873
-rw-r--r-- 2 parag supergroup 639495 2015-02-15 00:25 /flume/events/FlumeData.1423940101874
-rw-r--r-- 2 parag supergroup 639009 2015-02-15 00:25 /flume/events/FlumeData.1423940101875
-rw-r--r-- 2 parag supergroup 638991 2015-02-15 00:26 /flume/events/FlumeData.1423940101876
-rw-r--r-- 2 parag supergroup 639157 2015-02-15 00:26 /flume/events/FlumeData.1423940101877
-rw-r--r-- 2 parag supergroup 639762 2015-02-15 00:26 /flume/events/FlumeData.1423940101878
-rw-r--r-- 2 parag supergroup 638749 2015-02-15 00:26 /flume/events/FlumeData.1423940101879
-rw-r--r-- 2 parag supergroup 638597 2015-02-15 00:26 /flume/events/FlumeData.1423940101880
-rw-r--r-- 2 parag supergroup 0 2015-02-15 00:26 /flume/events/FlumeData.1423940101881.tmp
At this point we would like to see how memory profile of flume process look like.
No doubt this is a very healthy profile foot print as we pass on almost 638 mb in 176 seconds.
It looks like we have used 75 mb where as we have allocated 512mb max. So it is very much possible for this configuration to do more heavy lifting.
Case 3::
Now we can turn our attention to load balancing and failure tolerance etc. We Define our problem set as under,-
1. For log4j to be able to submit the events, flume needs to be available all the time, prevention of any loss of availability will require redundancy on flume side.
2.There needs to be safe delivery of data so flume needs to have persistent channel.
3.Flume needs to have additional depth and should be able to hold data till it is safely handed over to HDFS.
Assumption here is that log4j is reliable in delivery of the payload to source.
So we propose the following lay out with chaining of flume agents.
We prefer to have four property and will be having four different JVM.
Since we are using 4 CPU PC with HDFS master running on the same PC, I do not have ideal infrastructure for a high configuration ,although we shall be able to make do as we have seen earlier.
Below are the config files.
In the above topology , we are configuring 4 agents as under,-
Collector Agent 1 save the file as conf/flumeagentCollector1-conf.properties under flume install location.
##Collector 1 ########################################
agentCollector1.sources = avroSrc1
# For each one of the sources, the type is defined
# agent.sources.seqGenSrc.type = seq
agentCollector1.sources.avroSrc1.type = avro
agentCollector1.sources.avroSrc1.bind = localhost
agentCollector1.sources.avroSrc1.port = 8800
agentCollector1.sources.avroSrc1.batchSize = 10000
# The channel can be defined as follows.
agentCollector1.sources.avroSrc1.channels = fileChannel1
agentCollector1.sinks = avroSink1 avroSink2
agentCollector1.sinks.avroSink1.type = avro
agentCollector1.sinks.avroSink1.hostname=localhost
agentCollector1.sinks.avroSink1.port=8801
agentCollector1.sinks.avroSink1.channel = fileChannel1
agentCollector1.sinks.avroSink1.serializer=HEADER_AND_TEXT
agentCollector1.sinks.avroSink2.type = avro
agentCollector1.sinks.avroSink2.hostname=localhost
agentCollector1.sinks.avroSink2.port=8802
agentCollector1.sinks.avroSink2.channel = fileChannel1
agentCollector1.sinks.avroSink2.serializer=HEADER_AND_TEXT
agentCollector1.sinkgroups=sinkgroup1
agentCollector1.sinkgroups.sinkgroup1.sinks=avroSink1 avroSink2
agentCollector1.sinkgroups.sinkgroup1.processor.type=failover
agentCollector1.sinkgroups.sinkgroup1.processor.priority.avroSink1=01
agentCollector1.sinkgroups.sinkgroup1.processor.priority.avroSink2=02
agentCollector1.sinkgroups.sinkgroup1.processor.maxpenality=30000
agentCollector1.channels = fileChannel1
# Each channel's type is defined.
agentCollector1.channels.fileChannel1.type = FILE
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agentCollector1.channels.fileChannel1.checkpointDir=/home/parag/work/flume/fileChannel1/checkpoint
agentCollector1.channels.fileChannel1.dataDirs=/home/parag/work/flume/fileChannel1/data
agentCollector1.channels.fileChannel1.capacity = 1000
agentCollector1.channels.fileChannel1.transactionCapacity = 400
agentCollector1.channels.fileChannel1.keepAlive = 30
##Collector 1 end##################################
Collector Agent 2 save the file as conf/flumeagentCollector2-conf.properties
under flume install location.
##Collector 2####################################
agentCollector2.sources = avroSrc2
# For each one of the sources, the type is defined
# agent.sources.seqGenSrc.type = seq
agentCollector2.sources.avroSrc2.type = avro
agentCollector2.sources.avroSrc2.bind = localhost
agentCollector2.sources.avroSrc2.port = 8803
agentCollector2.sources.avroSrc2.batchSize = 10000
# The channel can be defined as follows.
agentCollector2.sources.avroSrc2.channels = fileChannel2
agentCollector2.sinks = avroSink1 avroSink2
agentCollector2.sinks.avroSink1.type = avro
agentCollector2.sinks.avroSink1.hostname=localhost
agentCollector2.sinks.avroSink1.port=8801
agentCollector2.sinks.avroSink1.channel = fileChannel2
agentCollector2.sinks.avroSink1.sink.serializer=HEADER_AND_TEXT
agentCollector2.sinks.avroSink2.type = avro
agentCollector2.sinks.avroSink2.hostname=localhost
agentCollector2.sinks.avroSink2.port=8802
agentCollector2.sinks.avroSink2.channel = fileChannel2
agentCollector2.sinks.avroSink2.sink.serializer=HEADER_AND_TEXT
agentCollector2.sinkgroups=sinkgroup1
agentCollector2.sinkgroups.sinkgroup1.sinks=avroSink1 avroSink2
agentCollector2.sinkgroups.sinkgroup1.processor.type=failover
agentCollector2.sinkgroups.sinkgroup1.processor.priority.avroSink1=02
agentCollector2.sinkgroups.sinkgroup1.processor.priority.avroSink2=01
agentCollector2.sinkgroups.sinkgroup1.processor.maxpenality=30000
agentCollector2.channels = fileChannel2
# Each channel's type is defined.
agentCollector2.channels.fileChannel2.type = FILE
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agentCollector2.channels.fileChannel2.checkpointDir=/home/parag/work/flume/fileChannel2/checkpoint
agentCollector2.channels.fileChannel2.dataDirs=/home/parag/work/flume/fileChannel2/data
agentCollector2.channels.fileChannel2.capacity = 1000
agentCollector2.channels.fileChannel2.transactionCapacity = 100
agentCollector2.channels.fileChannel2.keepAlive = 30
##Collector 2 end ################################
Collector to HDFS bridge 1 save the file as conf/flumebridgeagent1-conf.properties under flume install location.
##Bridge 1####################################
bridgeagent1.sources = avroSrc3
bridgeagent1.sources.avroSrc3.type = avro
bridgeagent1.sources.avroSrc3.bind = localhost
bridgeagent1.sources.avroSrc3.port = 8801
bridgeagent1.sources.avroSrc3.batchSize = 10000
bridgeagent1.sources.avroSrc3.channels = memoryChannel1
bridgeagent1.channels = memoryChannel1
bridgeagent1.sinks = loggerSink1
bridgeagent1.sinks.loggerSink1.type = hdfs
bridgeagent1.sinks.loggerSink1.hdfs.path =hdfs://PRUBNode1:9020/flume/events/
bridgeagent1.sinks.loggerSink1.hdfs.rollInterval = 0
bridgeagent1.sinks.loggerSink1.hdfs.rollSize = 524288
bridgeagent1.sinks.loggerSink1.hdfs.rollCount =0
bridgeagent1.sinks.loggerSink1.sink.serializer = HEADER_AND_TEXT
bridgeagent1.sinks.loggerSink1.channel = memoryChannel1
bridgeagent1.channels.memoryChannel1.type = memory
bridgeagent1.channels.memoryChannel1.capacity = 1000
bridgeagent1.channels.memoryChannel1.transactionCapacity = 100
bridgeagent1.channels.memoryChannel1.keepAlive = 30
##Bridge 1 end ################################
Collector to HDFS bridge 2 save the file as conf/flumebridgeagent2-conf.properties under flume install location.
##Bridge 2####################################
bridgeagent2.sources = avroSrc4
bridgeagent2.sources.avroSrc4.type = avro
bridgeagent2.sources.avroSrc4.bind = localhost
bridgeagent2.sources.avroSrc4.port = 8802
bridgeagent2.sources.avroSrc4.batchSize = 10000
bridgeagent2.sources.avroSrc4.channels = memoryChannel2
bridgeagent2.channels = memoryChannel2
bridgeagent2.sinks = loggerSink2
bridgeagent2.sinks.loggerSink2.type = hdfs
bridgeagent2.sinks.loggerSink2.hdfs.path =hdfs://PRUBNode1:9020/flume/events/
bridgeagent2.sinks.loggerSink2.hdfs.rollInterval = 0
bridgeagent2.sinks.loggerSink2.hdfs.rollSize = 524288
bridgeagent2.sinks.loggerSink2.hdfs.rollCount =0
bridgeagent2.sinks.loggerSink2.sink.serializer = HEADER_AND_TEXT
bridgeagent2.sinks.loggerSink2.channel = memoryChannel2
bridgeagent2.channels.memoryChannel2.type = memory
bridgeagent2.channels.memoryChannel2.capacity = 1000
bridgeagent2.channels.memoryChannel2.transactionCapacity = 100
bridgeagent2.channels.memoryChannel2.keepAlive = 30
##Bridge 2 end ################################
Let us now focus on what is the nature of processes that we can see. We shall be gradually increasing load and see how does our system behave. But before that we need to set up the Ganglia to monitor the processes.
Ganglia
Now it is our time to set up ganglia.
For Ganglia set up , I shall ask you to refer to the following blog which is very good and helped me set up ganglia very easily
https://www.digitalocean.com/community/tutorials/introduction-to-ganglia-on-ubuntu-14-04.
Following are the basic commands,-
Install Ganglia
sudo apt-get install -y ganglia-monitor rrdtool gmetad ganglia-webfrontend
Use following to copy apache configuration to ganglia configur
sudo cp /etc/ganglia-webfrontend/apache.conf /etc/apache2/sites-enabled/ganglia.conf
Go to /etc/ganglia folder to updated the following sections in gmond.conf as shown in bold and italics
cluster {
name = "prcluster"
owner = "unspecified"
latlong = "unspecified"
url = "unspecified"
}
udp_send_channel {
#mcast_join = 239.2.11.71
host = localhost
port = 8649
ttl = 1
}
udp_recv_channel {
#mcast_join = 239.2.11.71
port = 8649
#bind = 239.2.11.71
}
Go to /etc/ganglia folder to updated the following sections in gmetad.conf as shown in bold and italics
data_source "prcluster" 10 localhost
Restart Ganglia
sudo service ganglia-monitor restart && sudo service gmetad restart && sudo service apache2 restart
Starting flume service
The HDFS needs to be started first and the following flume agents should be started in the given order.
./flume-ng agent --conf ../conf/ -f ../conf/flumebridgeagent2-conf.properties -Dflume.root.logger=DEBUG,console -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=localhost:8649 -n bridgeagent2
./flume-ng agent --conf ../conf/ -f ../conf/flumebridgeagent1-conf.properties -Dflume.root.logger=INFO,console -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=localhost:8649 -n bridgeagent1
./flume-ng agent --conf ../conf/ -f ../conf/flumeagentCollector2-conf.properties -Dflume.root.logger=INFO,console -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=localhost:8649 -n agentCollector2
./flume-ng agent --conf ../conf/ -f ../conf/flumeagentCollector1-conf.properties -Dflume.root.logger=INFO,console -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=localhost:8649 -n agentCollector1
The following option enables flume to report metrices to ganglia
-Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=localhost:8649
Monitoring in ganglia
Graphs will appear in Ganglia if the local host is selected at the top drop down.
We shall now inspect the results.
It is notable that we do not see any capacity spill over to the redundant route we have provided and only one collector and one bridge used.
Now in the next step we start scaling up the load on the set up by launching parallel processes. In my case I have launched 30 parallel processes before I could see that first two agents getting into full capacity and redundant channels were used.
we have below the matrices of the first two agents in full capacity
We can see that the channels filled up for collector agent 1 and bridge agent 1
CPU load and the network load also shows peak during the time.
The Following graphs show the activity in the collector agent 2 and bridge agent 2
This shows the load balancing and horizontal scaling capability built in in flume.
Parag Ray
7-Feb-2015
Introduction
Welcome to the readers!
This is the ninth part of the series of articles. Here we are looking into the needs of uploading data into HDFS or data acquisition tools. We shall be exploring flume tuning as part of the strategy.
If you have not gone through already, you would need to go back to eight part (flume introduction) & fourth part where multi node set up(see related readings) is discussed, as this post will depend on the multi node set up and flume introduction.
Please note the related readings and target audience section to get help to better follow the blog.
We are using the operating system Ubuntu 14.04. Hadoop version 1.2.1 and Flume version 1.5.2. java version 1.7.
Disclaimer
I do not represent any particular organization in providing this details here.Implementation of tools and techniques discussed in this presentation should be done by responsible Implementer, capable of undertaking such tasks.
Agenda
- Target audience
- Related readings/other blogs
- Overall objective of case study
- Create test infrastructure with log4j flume appender
- Using the test infrastructure to simulate capacity over flow situation
- Case study resolution of capacity issues
- Create tiered topology of chained flume agents to create redundancy and load balancing
- Use Ganglia to monitor the load behaviour.
- This is an intermediate level discussion on Hadoop and related tools.
- Best suited for audience who are looking for introduction to this technology.
- Prior knowledge in java and Linux required.
- Intermediate level understanding of networking necessary.
This is the eighth article of this series or blogs, other article shortcuts are available in the pages tab.
You would also like to look at Flume, Cloudera & Hadoop home page for further details.
Overall objective of case study
Objectives of this blog is primarily to expose flume to varied degree of stress and see how flume behave under varied stress as a java process. Real world application will require various load levels and complexity and the strategy to have a test bet which will allow simulation of stress levels will definitely help design decisions.
- To capture logs log4j appender and start putting them into Hadoop.
- To establish the flexibility of flume,we need to inspect some types of frequent issues and try to over come these problems.
To keep our focus on flume , we shall create a java application which is simple but can generate varying volume of log4j entries as we need. - This application will be a small stand alone application, which will create log4j entries and using the log4j2 flume appender, we shall capture the logs directly.
We shall observer the behavior of this process and see if the log capturing is robust and see the behavior against increased load. As the problems are identified we shall try to fix these. - We shall try to analyze the flume process with profiler and see how the memory profile is revealing about the foot print and resource intensity of Flume.
- We shall try to see if capacity bottlenecks can be overcome by chaining flume agents.
- We shall monitor the high load behavior of flume with Ganglia.
Creating test bed
We shall use the following tools to create application, we expect you to already have them , if not please set up these,-
- Eclipse ( we are using Luna Service Release 1 (4.4.1) you can use any of the recent versions
- Log4j 2.1
- Jprofiler 8.1.2 ( use can use it for 10 days free, if you are concerned about Jprofiler being commercial profiler), you can use other profiler but we shall cover Jprofiler here.
- Flume 1.5.2 please down load and extract the tar archive, for detailed set up please refer to earlier blog on flume set up and initial run sample.
Creating source application
Now in eclipse you can create a java project and include the log4j libraries as well as flume libraries other than the defaults that you shall any way have for log4j.
You can ignore the source and java doc jars , but I suggest to have them as part of the project.
create a package pr.edu.loggen and create a java class in the package named LogGenerator.java
For Flume jars, please refer to the lib folder of fume directory.
Copy the source below,-
package pr.edu.loggen;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.*;
import java.sql.SQLException;
import java.util.*;
public class LogGenerator{
static Logger log = LogManager.getLogger(LogGenerator.class.getName());
private static long itercnt=0;
public static void main(String[] args) {
// TODO Auto-generated method stub
itercnt = Long.parseLong(args[0]);
System.out.println(itercnt);
long i=0;
log.info("starting log generation till i=" + itercnt);
for (i=0;i<itercnt;i++)
{log.debug("Hello this is an debug message i "+ i);
log.info("Hello this is an info message i "+ i);
}
System.out.println(itercnt);
}
}
The class simply takes a number as argument and generates log that many times in a loop. We can run it for various numbers and also create parallel instances to increase the load many folds as we need.import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.*;
import java.sql.SQLException;
import java.util.*;
public class LogGenerator{
static Logger log = LogManager.getLogger(LogGenerator.class.getName());
private static long itercnt=0;
public static void main(String[] args) {
// TODO Auto-generated method stub
itercnt = Long.parseLong(args[0]);
System.out.println(itercnt);
long i=0;
log.info("starting log generation till i=" + itercnt);
for (i=0;i<itercnt;i++)
{log.debug("Hello this is an debug message i "+ i);
log.info("Hello this is an info message i "+ i);
}
System.out.println(itercnt);
}
}
Next would be the log4j configuration file which is an xml.
You have a folder like config and put the file log4j2.xml there.
Please refer the folder structure showing the class file and log4j config file
You can use the log4j config as below
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="debug" name="Flume" verbose="false" monitorInterval="30">
<MarkerFilter marker="EVENT" onMatch="ACCEPT" onMismatch="NEUTRAL" />
<Appenders>
<Flume name="flume" ignoreExceptions="false" mdcPrefix="ReqCtx_" compress="false">
<Agent host="localhost" port="8800" />
<PatternLayout pattern="%d [%p] %c %m%n"/>
<!-- <RFC5424Layout enterpriseNumber="12293" includeMDC="true" mdcId="RequestContext" appName="GL" /> -->
</Flume>
</Appenders>
<Loggers>
<Root name="EventLogger" level="debug" additivity="true">
<AppenderRef ref="flume" />
</Root>
</Loggers>
</Configuration>
Note here the port used to connect to flume is 8800 , in case you have trouble connecting , you might be intersted to check if the port is free in your set up.
I suggest you try to check if the logs are being generated using console appender before proceeding to next steps. Please use only one appender, as I have noted that using dual appender together with Flume like console & Flume causes the log4j to break. There seems to be a few bugs reported, but i have not concluded on this myself.
Moving over to flume config, save this in conf folder under the flume installation folder with name flume-conf.properties-
agent.sources = avroSrc
agent.channels = memoryChannel
agent.sinks = loggerSink
# For each one of the sources, the type is defined
agent.sources.avroSrc.type = avro
agent.sources.avroSrc.bind = localhost
agent.sources.avroSrc.port = 8800
# The channel can be defined as follows.
agent.sources.avroSrc.channels = memoryChannel
# Each sink's type must be defined
agent.sinks.loggerSink.type = hdfs
agent.sinks.loggerSink.hdfs.path =hdfs://PRUBNode1:9020/flume/events/
#Specify the channel the sink should use
agent.sinks.loggerSink.channel = memoryChannel
# Each channel's type is defined.
agent.channels.memoryChannel.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100
This assumes that flume is listening to local host and the log generator will be running in the same node as that of the flume agent.
For Hadoop set up, we assume, you are using the same set up as it was created in the HDFS configuration used for multi-node cluster set up in previous blog (Multi-node HDFS cluster).
Please make sure that the previous blog on flume is actually working for you.
We are now all set to crank up the entire thing.
First thing to make sure that HDFS is running. Please use jps command to make sure that hdfs is up , if not, please start it up withh ./start-all.sh command in $HADOOP_HOME/bin.
Once Hadoop is up and running you must go to flume bin folder and start up flume with the following command,-
./flume-ng agent --conf ../conf/ -f ../conf/flume-conf.properties -Dflume.root.logger=INFO,console -n agent
Once you see an entry in the console as below you know flume is started,-
2015-02-08 21:42:31,079 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:253)] Avro source avroSrc started
Please go trough the logs printed in teh console to check in case of any error.
Once Flume is started , we are in a position to generate the logs. So we go to the Eclipse,
we ensure the following,-
1.Verify all the jars and libraries for flume and log 4j are present.
2. Convince yourself that log4j2 xml is in path for the class. If not put it here.
Right click on LogGenerator source and go to 'Run As' and select run configuration.
In the run configuration we need to do the following,-
1. In the program arguments put 100 under arguments tab.This ensures logs are written 201 times if you have read the java code.
Press 'Run' and go back to flume terminal,and see if entries similar to those below are appearing,-
2015-02-08 19:49:21,271 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:261)] Creating hdfs://PRUBNode1:9020/flume/events//FlumeData.1423405159441.tmp
2015-02-08 19:49:51,330 (hdfs-loggerSink-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:409)] Closing hdfs://PRUBNode1:9020/flume/events//FlumeData.1423405159441.tmp
2015-02-08 19:49:51,331 (hdfs-loggerSink-call-runner-8) [INFO - org.apache.flume.sink.hdfs.BucketWriter$3.call(BucketWriter.java:339)] Close tries incremented
2015-02-08 19:49:51,398 (hdfs-loggerSink-call-runner-9) [INFO - org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:669)] Renaming hdfs://PRUBNode1:9020/flume/events/FlumeData.1423405159441.tmp to hdfs://PRUBNode1:9020/flume/events/FlumeData.1423405159441
Now you have good reason to believe that data is written to HDFS.
We move to HDFS.
And from $HADOOP_HOME/bin issue the following
hadoop fs -ls /flume/events/*
This will list the files written in HDFS. You can use other FS command to view the files and etc. Since your local file system may not actually be very big, you may use the command as under to clean it up after each run,-
hadoop fs -rm /flume/events/*
It might be handy to check the count of the files, hadoop fs -ls /flume/events/*|wc -l
Trouble shooting
Case 1::
Now in my case, if look very closely to the log, i see the following entry,-
New I/O worker #2) [WARN - org.apache.flume.source.AvroSource.append(AvroSource.java:358)] Avro source avroSrc: Unable to process event. Exception follows.
org.apache.flume.ChannelException: Unable to put event on required channel: org.apache.flume.channel.MemoryChannel{name: memoryChannel}
at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:275)
at org.apache.flume.source.AvroSource.append(AvroSource.java:356)
at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
...
...
Caused by: org.apache.flume.ChannelFullException: Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:130)
Now if I inspect the data dumped in HDFS, I can be very sure that there is a loss of 10% of data compared to what was generated! You can check it very easily as we know how many (201) log entries we are expecting and we can see ow many are there.
Let us understand what is happening here.
The root cause of the issue seems to be that we are writing into the flume channel faster than rate at which sink can clean it out.This fills up the channel.So creating a bigger capacity in the channel may clear the issue.
It is notable here that the originator program here running without any interruption & generating log at a extremely high rate; where as flume is negotiating handshaking with hadoop, and hadoop is negotiating network bottleneck when it is doing the necessary replication across nodes.
If we look at the steps in the logs,we see, flume is doing a lot of i/o intensive steps like creating tmp file then finalizing it etc. So it is natural that in this case at continuous run , sink is likely to be falling behind the source. In many of the actual situation, however, source do not continuously generate events. There is likely to be intermittent gaps at random intervals in the flow of event generation. This may give enough of opportunity for the sink to catch up, provided we have enough capacity in channel. So this can be first level strategy. Let us try doing it and see what happens.
In the flume properties file I had the following entries
agent.channels.memoryChannel.capacity = 500
agent.channels.memoryChannel.transactionCapacity = 200
And in flume-env.sh added the following as higher capacity will require bigger heap space.
JAVA_OPTS="-Xms100m -Xmx512m"
Once i do this and restart the flume process an fire the logs again, the error seems to have been resolved.And the files are now fully stored.
In our log generator we have been only doing one task and that is to generate logs and we do agree that hardly any real world applications do This.
This would mean that the task would involve other processing and by that token they generates less log per unit of time. Especially , once code is in production, logs are reduced to WARN or INFO level which gives much less log. And there will be gaps and cycles in the speed in which the logs will be generated.
So this fix may work in real world , but we now consider further complexity.
We do need however to model and try various capacities and tune the capacity of the channel and other parameters.
Case 2::
There are situations however, where it is necessary to consider very high and continuous event generation sinreario and if the events are generated faster than they are consumed by sink , we shall eventually end up in the same scenario.
We can simulate such situation by simply increasing the number of iteration the loop runs by changing the argument to 10000 so that it runs for longer duration and kicking up more than one process causing more load to flume.
This Will cause the flume to be engaged enough so that response will be slow and exception will be thrown as channel fills up.
There could be simple to more complex strategies to handle such situation.
1. Increase keepAlive time: By default if the channel is full, Channel processor keeps on retrying for 3 seconds before exception is thrown, However in case of delayed response due to over loaded processes the keepAlive time may get exceeded and the exception thrown. We can tune it up.
Following property can be added,-
agent.channels.memoryChannel.keepAlive = 30
2. Try to make the payload for the events bigger. This will cause the flume hold up some data and then write together. In case we are writing to HDFS, this may help as HDFS is expected to perform better with large but small number of files compared to small but large number of files.This way we are able to take full advantage of the memmory channel also as less number of I/O is happening as for sink to write it is waiting longer. Ofcourse, we have to keep in mind the heap size.
We are using the following additional configuration,-
# Number of seconds to wait before rolling current file (0 = never roll based on time interval)
agent.sinks.loggerSink.hdfs.rollInterval = 0
# File size to trigger roll, in bytes (0: never roll based on file size)
agent.sinks.loggerSink.hdfs.rollSize = 524288
#Number of events written to file before it rolled (0 = never roll based on number of events)
agent.sinks.loggerSink.hdfs.rollCount =0
Also the following are added for additional capacity,-
agent.channels.memoryChannel.capacity = 1000
agent.channels.memoryChannel.transactionCapacity = 100
For additional heap size we have already provided setting in flume-env.sh as under,-
And in flume-env.sh added the following
JAVA_OPTS="-Xms100m -Xmx512m"
This sucessfully loads the logs to HDFS.
HDFS storage looks like the following,-
parag@PRUBNode1:~/work/hadoop-1.2.1/bin$ hadoop fs -ls /flume/events/
Warning: $HADOOP_HOME is deprecated.
Found 12 items
-rw-r--r-- 2 parag supergroup 640216 2015-02-15 00:25 /flume/events/FlumeData.1423940101870
-rw-r--r-- 2 parag supergroup 639846 2015-02-15 00:25 /flume/events/FlumeData.1423940101871
-rw-r--r-- 2 parag supergroup 640000 2015-02-15 00:25 /flume/events/FlumeData.1423940101872
-rw-r--r-- 2 parag supergroup 639747 2015-02-15 00:25 /flume/events/FlumeData.1423940101873
-rw-r--r-- 2 parag supergroup 639495 2015-02-15 00:25 /flume/events/FlumeData.1423940101874
-rw-r--r-- 2 parag supergroup 639009 2015-02-15 00:25 /flume/events/FlumeData.1423940101875
-rw-r--r-- 2 parag supergroup 638991 2015-02-15 00:26 /flume/events/FlumeData.1423940101876
-rw-r--r-- 2 parag supergroup 639157 2015-02-15 00:26 /flume/events/FlumeData.1423940101877
-rw-r--r-- 2 parag supergroup 639762 2015-02-15 00:26 /flume/events/FlumeData.1423940101878
-rw-r--r-- 2 parag supergroup 638749 2015-02-15 00:26 /flume/events/FlumeData.1423940101879
-rw-r--r-- 2 parag supergroup 638597 2015-02-15 00:26 /flume/events/FlumeData.1423940101880
-rw-r--r-- 2 parag supergroup 0 2015-02-15 00:26 /flume/events/FlumeData.1423940101881.tmp
At this point we would like to see how memory profile of flume process look like.
No doubt this is a very healthy profile foot print as we pass on almost 638 mb in 176 seconds.
It looks like we have used 75 mb where as we have allocated 512mb max. So it is very much possible for this configuration to do more heavy lifting.
Case 3::
Now we can turn our attention to load balancing and failure tolerance etc. We Define our problem set as under,-
1. For log4j to be able to submit the events, flume needs to be available all the time, prevention of any loss of availability will require redundancy on flume side.
2.There needs to be safe delivery of data so flume needs to have persistent channel.
3.Flume needs to have additional depth and should be able to hold data till it is safely handed over to HDFS.
Assumption here is that log4j is reliable in delivery of the payload to source.
So we propose the following lay out with chaining of flume agents.
We prefer to have four property and will be having four different JVM.
Since we are using 4 CPU PC with HDFS master running on the same PC, I do not have ideal infrastructure for a high configuration ,although we shall be able to make do as we have seen earlier.
Below are the config files.
In the above topology , we are configuring 4 agents as under,-
Collector Agent 1 save the file as conf/flumeagentCollector1-conf.properties under flume install location.
##Collector 1 ########################################
agentCollector1.sources = avroSrc1
# For each one of the sources, the type is defined
# agent.sources.seqGenSrc.type = seq
agentCollector1.sources.avroSrc1.type = avro
agentCollector1.sources.avroSrc1.bind = localhost
agentCollector1.sources.avroSrc1.port = 8800
agentCollector1.sources.avroSrc1.batchSize = 10000
# The channel can be defined as follows.
agentCollector1.sources.avroSrc1.channels = fileChannel1
agentCollector1.sinks = avroSink1 avroSink2
agentCollector1.sinks.avroSink1.type = avro
agentCollector1.sinks.avroSink1.hostname=localhost
agentCollector1.sinks.avroSink1.port=8801
agentCollector1.sinks.avroSink1.channel = fileChannel1
agentCollector1.sinks.avroSink1.serializer=HEADER_AND_TEXT
agentCollector1.sinks.avroSink2.type = avro
agentCollector1.sinks.avroSink2.hostname=localhost
agentCollector1.sinks.avroSink2.port=8802
agentCollector1.sinks.avroSink2.channel = fileChannel1
agentCollector1.sinks.avroSink2.serializer=HEADER_AND_TEXT
agentCollector1.sinkgroups=sinkgroup1
agentCollector1.sinkgroups.sinkgroup1.sinks=avroSink1 avroSink2
agentCollector1.sinkgroups.sinkgroup1.processor.type=failover
agentCollector1.sinkgroups.sinkgroup1.processor.priority.avroSink1=01
agentCollector1.sinkgroups.sinkgroup1.processor.priority.avroSink2=02
agentCollector1.sinkgroups.sinkgroup1.processor.maxpenality=30000
agentCollector1.channels = fileChannel1
# Each channel's type is defined.
agentCollector1.channels.fileChannel1.type = FILE
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agentCollector1.channels.fileChannel1.checkpointDir=/home/parag/work/flume/fileChannel1/checkpoint
agentCollector1.channels.fileChannel1.dataDirs=/home/parag/work/flume/fileChannel1/data
agentCollector1.channels.fileChannel1.capacity = 1000
agentCollector1.channels.fileChannel1.transactionCapacity = 400
agentCollector1.channels.fileChannel1.keepAlive = 30
##Collector 1 end##################################
Collector Agent 2 save the file as conf/flumeagentCollector2-conf.properties
under flume install location.
##Collector 2####################################
agentCollector2.sources = avroSrc2
# For each one of the sources, the type is defined
# agent.sources.seqGenSrc.type = seq
agentCollector2.sources.avroSrc2.type = avro
agentCollector2.sources.avroSrc2.bind = localhost
agentCollector2.sources.avroSrc2.port = 8803
agentCollector2.sources.avroSrc2.batchSize = 10000
# The channel can be defined as follows.
agentCollector2.sources.avroSrc2.channels = fileChannel2
agentCollector2.sinks = avroSink1 avroSink2
agentCollector2.sinks.avroSink1.type = avro
agentCollector2.sinks.avroSink1.hostname=localhost
agentCollector2.sinks.avroSink1.port=8801
agentCollector2.sinks.avroSink1.channel = fileChannel2
agentCollector2.sinks.avroSink1.sink.serializer=HEADER_AND_TEXT
agentCollector2.sinks.avroSink2.type = avro
agentCollector2.sinks.avroSink2.hostname=localhost
agentCollector2.sinks.avroSink2.port=8802
agentCollector2.sinks.avroSink2.channel = fileChannel2
agentCollector2.sinks.avroSink2.sink.serializer=HEADER_AND_TEXT
agentCollector2.sinkgroups=sinkgroup1
agentCollector2.sinkgroups.sinkgroup1.sinks=avroSink1 avroSink2
agentCollector2.sinkgroups.sinkgroup1.processor.type=failover
agentCollector2.sinkgroups.sinkgroup1.processor.priority.avroSink1=02
agentCollector2.sinkgroups.sinkgroup1.processor.priority.avroSink2=01
agentCollector2.sinkgroups.sinkgroup1.processor.maxpenality=30000
agentCollector2.channels = fileChannel2
# Each channel's type is defined.
agentCollector2.channels.fileChannel2.type = FILE
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agentCollector2.channels.fileChannel2.checkpointDir=/home/parag/work/flume/fileChannel2/checkpoint
agentCollector2.channels.fileChannel2.dataDirs=/home/parag/work/flume/fileChannel2/data
agentCollector2.channels.fileChannel2.capacity = 1000
agentCollector2.channels.fileChannel2.transactionCapacity = 100
agentCollector2.channels.fileChannel2.keepAlive = 30
##Collector 2 end ################################
Collector to HDFS bridge 1 save the file as conf/flumebridgeagent1-conf.properties under flume install location.
##Bridge 1####################################
bridgeagent1.sources = avroSrc3
bridgeagent1.sources.avroSrc3.type = avro
bridgeagent1.sources.avroSrc3.bind = localhost
bridgeagent1.sources.avroSrc3.port = 8801
bridgeagent1.sources.avroSrc3.batchSize = 10000
bridgeagent1.sources.avroSrc3.channels = memoryChannel1
bridgeagent1.channels = memoryChannel1
bridgeagent1.sinks = loggerSink1
bridgeagent1.sinks.loggerSink1.type = hdfs
bridgeagent1.sinks.loggerSink1.hdfs.path =hdfs://PRUBNode1:9020/flume/events/
bridgeagent1.sinks.loggerSink1.hdfs.rollInterval = 0
bridgeagent1.sinks.loggerSink1.hdfs.rollSize = 524288
bridgeagent1.sinks.loggerSink1.hdfs.rollCount =0
bridgeagent1.sinks.loggerSink1.sink.serializer = HEADER_AND_TEXT
bridgeagent1.sinks.loggerSink1.channel = memoryChannel1
bridgeagent1.channels.memoryChannel1.type = memory
bridgeagent1.channels.memoryChannel1.capacity = 1000
bridgeagent1.channels.memoryChannel1.transactionCapacity = 100
bridgeagent1.channels.memoryChannel1.keepAlive = 30
##Bridge 1 end ################################
Collector to HDFS bridge 2 save the file as conf/flumebridgeagent2-conf.properties under flume install location.
##Bridge 2####################################
bridgeagent2.sources = avroSrc4
bridgeagent2.sources.avroSrc4.type = avro
bridgeagent2.sources.avroSrc4.bind = localhost
bridgeagent2.sources.avroSrc4.port = 8802
bridgeagent2.sources.avroSrc4.batchSize = 10000
bridgeagent2.sources.avroSrc4.channels = memoryChannel2
bridgeagent2.channels = memoryChannel2
bridgeagent2.sinks = loggerSink2
bridgeagent2.sinks.loggerSink2.type = hdfs
bridgeagent2.sinks.loggerSink2.hdfs.path =hdfs://PRUBNode1:9020/flume/events/
bridgeagent2.sinks.loggerSink2.hdfs.rollInterval = 0
bridgeagent2.sinks.loggerSink2.hdfs.rollSize = 524288
bridgeagent2.sinks.loggerSink2.hdfs.rollCount =0
bridgeagent2.sinks.loggerSink2.sink.serializer = HEADER_AND_TEXT
bridgeagent2.sinks.loggerSink2.channel = memoryChannel2
bridgeagent2.channels.memoryChannel2.type = memory
bridgeagent2.channels.memoryChannel2.capacity = 1000
bridgeagent2.channels.memoryChannel2.transactionCapacity = 100
bridgeagent2.channels.memoryChannel2.keepAlive = 30
##Bridge 2 end ################################
Let us now focus on what is the nature of processes that we can see. We shall be gradually increasing load and see how does our system behave. But before that we need to set up the Ganglia to monitor the processes.
Ganglia
Now it is our time to set up ganglia.
For Ganglia set up , I shall ask you to refer to the following blog which is very good and helped me set up ganglia very easily
https://www.digitalocean.com/community/tutorials/introduction-to-ganglia-on-ubuntu-14-04.
Following are the basic commands,-
Install Ganglia
sudo apt-get install -y ganglia-monitor rrdtool gmetad ganglia-webfrontend
Use following to copy apache configuration to ganglia configur
sudo cp /etc/ganglia-webfrontend/apache.conf /etc/apache2/sites-enabled/ganglia.conf
Go to /etc/ganglia folder to updated the following sections in gmond.conf as shown in bold and italics
cluster {
name = "prcluster"
owner = "unspecified"
latlong = "unspecified"
url = "unspecified"
}
udp_send_channel {
#mcast_join = 239.2.11.71
host = localhost
port = 8649
ttl = 1
}
udp_recv_channel {
#mcast_join = 239.2.11.71
port = 8649
#bind = 239.2.11.71
}
Go to /etc/ganglia folder to updated the following sections in gmetad.conf as shown in bold and italics
data_source "prcluster" 10 localhost
Restart Ganglia
sudo service ganglia-monitor restart && sudo service gmetad restart && sudo service apache2 restart
Starting flume service
The HDFS needs to be started first and the following flume agents should be started in the given order.
./flume-ng agent --conf ../conf/ -f ../conf/flumebridgeagent2-conf.properties -Dflume.root.logger=DEBUG,console -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=localhost:8649 -n bridgeagent2
./flume-ng agent --conf ../conf/ -f ../conf/flumebridgeagent1-conf.properties -Dflume.root.logger=INFO,console -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=localhost:8649 -n bridgeagent1
./flume-ng agent --conf ../conf/ -f ../conf/flumeagentCollector2-conf.properties -Dflume.root.logger=INFO,console -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=localhost:8649 -n agentCollector2
./flume-ng agent --conf ../conf/ -f ../conf/flumeagentCollector1-conf.properties -Dflume.root.logger=INFO,console -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=localhost:8649 -n agentCollector1
The following option enables flume to report metrices to ganglia
-Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=localhost:8649
Monitoring in ganglia
Graphs will appear in Ganglia if the local host is selected at the top drop down.
We shall now inspect the results.
It is notable that we do not see any capacity spill over to the redundant route we have provided and only one collector and one bridge used.
Now in the next step we start scaling up the load on the set up by launching parallel processes. In my case I have launched 30 parallel processes before I could see that first two agents getting into full capacity and redundant channels were used.
we have below the matrices of the first two agents in full capacity
We can see that the channels filled up for collector agent 1 and bridge agent 1
CPU load and the network load also shows peak during the time.
The Following graphs show the activity in the collector agent 2 and bridge agent 2
This shows the load balancing and horizontal scaling capability built in in flume.