Total Pageviews

Friday, 6 March 2015

Introduction to Data Science Part 9: Flume advanced concepts- case study

Introduction to Data Science Part 9: Flume advanced concepts

Parag Ray
7-Feb-2015

Introduction

Welcome to the readers!

This is the ninth part of the series of articles. Here we are looking into the needs of uploading data into HDFS or data acquisition tools. We shall be exploring flume tuning as part of the strategy.

If you have not gone through already, you would need to go back to eight part (flume introduction) & fourth part where multi node set up(see related readings) is discussed, as this post will depend on the multi node set up and flume introduction.
Please note the related readings and target audience section to get help to better follow the blog. 

We are using the operating system Ubuntu 14.04. Hadoop version 1.2.1 and Flume version 1.5.2. java version 1.7.

Disclaimer
I do not represent any particular organization in providing this details here.Implementation of tools and techniques discussed in this presentation should be done by responsible Implementer, capable of undertaking such tasks.

Agenda
  • Target audience
  • Related readings/other blogs 
  • Overall objective of case study
  • Create test infrastructure with log4j flume appender 
  • Using the test infrastructure to simulate capacity over flow situation
  • Case study resolution of capacity issues
  • Create tiered topology of chained flume agents to create redundancy and load balancing
  • Use Ganglia to monitor the load behaviour.
Target audience

  • This is an intermediate level discussion on Hadoop and related tools.
  • Best suited for audience who are looking for introduction to this technology.
  • Prior knowledge in java and Linux required.
  • Intermediate level understanding of networking necessary.
Related readings/other blogs  
This is the eighth article of this series or blogs, other article shortcuts are available in the pages tab.
You would also like to look at Flume, Cloudera  & Hadoop home page for further details.
  
Overall objective of case study

Objectives of this blog is primarily to expose flume to varied degree of stress and see how flume behave under varied stress as a java process. Real world application will require various load levels and complexity and the strategy to have a test bet which will allow simulation of stress levels will definitely help design decisions.
  • To capture logs log4j appender and start putting them into Hadoop. 
  • To establish the flexibility of flume,we need to inspect some types of frequent issues and try to over come these problems.
    To keep our focus on flume , we shall create a java application which is simple but can generate varying volume of log4j entries as we need.
  • This application will be a small stand alone application, which will create log4j entries and using the log4j2 flume appender, we shall capture the logs directly.
    We shall observer the behavior of this process and see if the log capturing is robust and see the behavior against increased load. As the problems are identified we shall try to fix these. 
  • We shall try to analyze the flume process with profiler and see how the memory profile is revealing about the foot print and resource intensity of Flume.
  • We shall try to see if capacity bottlenecks can be overcome by chaining flume agents.
  • We shall monitor the high load behavior of flume with Ganglia. 

Creating test bed
 

We shall use the following tools to create application, we expect you to already have them , if not please set up these,-
  • Eclipse ( we are using  Luna Service Release 1 (4.4.1) you can use any of the recent versions
  • Log4j 2.1
  • Jprofiler 8.1.2 ( use can use it for 10 days free, if you are concerned about Jprofiler being commercial profiler), you can use other profiler but we shall cover Jprofiler here.
  • Flume 1.5.2 please down load and extract the tar archive, for detailed set up please refer to earlier blog on flume set up and initial run sample.
 
Creating source application
Now in eclipse you can create a java project and include the log4j libraries as well as flume libraries other than the defaults that you shall any way have for log4j.
You can ignore the source and java doc jars , but I suggest to have them as part of the project.


create a package pr.edu.loggen and create a java class in the package named LogGenerator.java

For Flume jars, please refer to the lib folder of fume directory.
Copy the source below,-

package pr.edu.loggen;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.*;
import java.sql.SQLException;
import java.util.*;

public class LogGenerator{

static Logger log = LogManager.getLogger(LogGenerator.class.getName());
private static long itercnt=0;

public static void main(String[] args) {
// TODO Auto-generated method stub
itercnt = Long.parseLong(args[0]);
System.out.println(itercnt);
long i=0;
log.info("starting log generation till i=" + itercnt);
for (i=0;i<itercnt;i++)
{log.debug("Hello this is an debug message i "+ i);
    log.info("Hello this is an info message i "+ i);
}
System.out.println(itercnt);
}

}
The class simply takes a number as argument and generates log that many times in a loop. We can run it for various numbers and also create parallel instances to increase the load many folds as we need.



Next would be the log4j configuration file which is an xml.
You have a folder like config and put the file log4j2.xml there. 


Please refer the folder structure showing the class file and log4j config file

 


 You can use the log4j config as below

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="debug" name="Flume" verbose="false" monitorInterval="30">
<MarkerFilter marker="EVENT" onMatch="ACCEPT" onMismatch="NEUTRAL" />
<Appenders>
<Flume name="flume" ignoreExceptions="false" mdcPrefix="ReqCtx_" compress="false">
<Agent host="localhost" port="8800" />
<PatternLayout pattern="%d [%p] %c %m%n"/>
<!-- <RFC5424Layout enterpriseNumber="12293" includeMDC="true" mdcId="RequestContext" appName="GL" /> -->
</Flume>
</Appenders>
<Loggers>
<Root name="EventLogger" level="debug" additivity="true">
<AppenderRef ref="flume" />
</Root>
</Loggers>
</Configuration>

Note here the port used to connect to flume is 8800 , in case you have trouble connecting , you might be intersted to check if the port is free in your set up.
I suggest you try to check if the logs are being generated using console appender before proceeding to next steps. Please use only one appender, as I have noted that using dual appender together with Flume like console & Flume causes the log4j to break. There seems to be a few bugs reported, but i have not concluded on this myself.

Moving over to flume config, save this in conf folder under the flume installation folder with name flume-conf.properties-

agent.sources = avroSrc
agent.channels = memoryChannel
agent.sinks = loggerSink
# For each one of the sources, the type is defined
agent.sources.avroSrc.type = avro
agent.sources.avroSrc.bind = localhost
agent.sources.avroSrc.port = 8800
# The channel can be defined as follows.
agent.sources.avroSrc.channels = memoryChannel
# Each sink's type must be defined
agent.sinks.loggerSink.type = hdfs
agent.sinks.loggerSink.hdfs.path =hdfs://PRUBNode1:9020/flume/events/
#Specify the channel the sink should use
agent.sinks.loggerSink.channel = memoryChannel
# Each channel's type is defined.
agent.channels.memoryChannel.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100

This assumes that flume is listening to local host and the log generator will be running in the same node as that of the flume agent
.

For Hadoop set up, we assume, you are using the same set up as it was created in the HDFS configuration used for multi-node cluster set up in previous blog (Multi-node HDFS cluster).
Please make sure that the previous blog on flume is actually working for you.

We are now all set to crank up the entire thing.

First thing to make sure that HDFS is running. Please use jps command to make sure that hdfs is up , if not, please start it up withh ./start-all.sh command in $HADOOP_HOME/bin.

Once Hadoop is up and running you must go to flume bin folder and start up flume with the following command,-

./flume-ng agent --conf ../conf/ -f ../conf/flume-conf.properties -Dflume.root.logger=INFO,console -n agent

Once you see an entry in the console as below you know flume is started,-

2015-02-08 21:42:31,079 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:253)] Avro source avroSrc started

Please go trough the logs printed in teh console to check in case of any error.

Once Flume is started , we are in a position to generate the logs. So we go to the Eclipse,

we ensure the following,-

1.Verify all the jars and libraries for flume and log 4j are present.
2. Convince yourself that log4j2 xml is in path for the class. If not put it here.
 


Right click on LogGenerator source and go to 'Run As' and select run configuration.
In the run configuration we need to do the following,-

1. In the program arguments put 100 under arguments tab.This ensures logs are written 201 times if you have read the java code.
Press 'Run' and go back to flume terminal,and see if entries similar to those below are appearing,-

2015-02-08 19:49:21,271 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:261)] Creating hdfs://PRUBNode1:9020/flume/events//FlumeData.1423405159441.tmp
2015-02-08 19:49:51,330 (hdfs-loggerSink-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:409)] Closing hdfs://PRUBNode1:9020/flume/events//FlumeData.1423405159441.tmp
2015-02-08 19:49:51,331 (hdfs-loggerSink-call-runner-8) [INFO - org.apache.flume.sink.hdfs.BucketWriter$3.call(BucketWriter.java:339)] Close tries incremented
2015-02-08 19:49:51,398 (hdfs-loggerSink-call-runner-9) [INFO - org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:669)] Renaming hdfs://PRUBNode1:9020/flume/events/FlumeData.1423405159441.tmp to hdfs://PRUBNode1:9020/flume/events/FlumeData.1423405159441

Now you have good reason to believe that data is written to HDFS.

We move to HDFS.

And from $HADOOP_HOME/bin issue the following

hadoop fs -ls /flume/events/* 


This will list the files written in HDFS. You can use other FS command to view the files and etc. Since your local file system may not actually be very big, you may use the command as under to clean it up after each run,-
 

hadoop fs -rm /flume/events/*

It might be handy to check the count of the files, hadoop fs -ls /flume/events/*|wc -l


Trouble shooting

Case 1::
Now in my case, if look very closely to the log, i see the following entry,-

New I/O  worker #2) [WARN - org.apache.flume.source.AvroSource.append(AvroSource.java:358)] Avro source avroSrc: Unable to process event. Exception follows.
org.apache.flume.ChannelException: Unable to put event on required channel: org.apache.flume.channel.MemoryChannel{name: memoryChannel}
    at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:275)
    at org.apache.flume.source.AvroSource.append(AvroSource.java:356)
    at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
...
...
Caused by: org.apache.flume.ChannelFullException: Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight

    at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:130)

Now if I inspect the data dumped in HDFS, I can be very sure that there is a loss of 10% of data compared to what was generated!  You can check it very easily as  we know how many (201) log entries we are expecting and we can see ow many are there.

Let us understand what is happening here.

The root cause of the issue seems to be that we are writing into the flume channel faster than rate at which sink can clean it out.This fills up the channel.So creating a bigger capacity in the channel may clear the issue.

It is notable here that the originator program here running without any interruption & generating log at a extremely high rate; where as flume is negotiating handshaking with hadoop, and hadoop is negotiating network bottleneck when it is doing the necessary replication across nodes. 


If we look at the steps in the logs,we see, flume is doing a lot of i/o intensive steps  like creating tmp file then finalizing it etc. So it is natural that in this case at continuous run , sink is likely to be falling behind the source. In many of the actual situation, however, source do not continuously generate events. There is likely to be intermittent gaps at random intervals in the flow of event generation. This may give enough of opportunity for the sink to catch up, provided we have enough capacity in channel. So this can be first level strategy. Let us try doing it and see what happens. 

In the flume properties file I had the following entries
agent.channels.memoryChannel.capacity = 500
agent.channels.memoryChannel.transactionCapacity = 200

And in flume-env.sh added the following as higher capacity will require bigger heap space.
JAVA_OPTS="-Xms100m -Xmx512m"

Once i do this and restart the flume process an fire the logs again, the error seems to have been resolved.And the files are now fully stored.

In our log generator we have been only doing one task and that is to generate logs and we do agree that hardly any real world applications do This. 

This would mean  that the task would involve other processing and by that token they generates less log per unit of time. Especially , once code is in production, logs are reduced to WARN or INFO level which gives much less log. And there will be gaps and cycles in the speed in which the logs will be generated.  

So this fix may work in real world , but we now consider further complexity. 
We do need however to model and try various capacities and tune the capacity of the channel and other parameters.

Case 2::
There are situations however, where it is necessary to consider very high and continuous event generation sinreario and if the events are generated faster than they are consumed by sink , we shall eventually end up in the same scenario.

We can simulate such situation by simply increasing the number of iteration the loop runs by changing the argument to 10000 so that it runs for longer duration and kicking up more than one process causing more load to flume.

This Will cause the flume to be engaged enough so that response will be slow and exception will be thrown as channel fills up.

There could be simple to more complex strategies to handle such situation.

1. Increase keepAlive time: By default if the channel is full, Channel processor keeps on retrying for 3 seconds before exception is thrown, However in case of delayed response due to over loaded processes the keepAlive time may get exceeded and the  exception thrown. We can tune it up.
Following property can be added,-
agent.channels.memoryChannel.keepAlive = 30

2. Try to make the payload for the events bigger. This will cause the flume hold up some data and then write together. In case we are writing to HDFS, this may help as HDFS is expected to perform better with large but small number of files compared to small but large number of files.This way we are able to take full advantage of the memmory channel also as less number of I/O is happening as for sink to write it is waiting longer. Ofcourse, we have to keep in mind the heap size.

We are using the following additional configuration,-

# Number of seconds to wait before rolling current file (0 = never roll based on time interval)                                                                                                                                             
agent.sinks.loggerSink.hdfs.rollInterval = 0
# File size to trigger roll, in bytes (0: never roll based on file size)                                                                                                                                                                    
agent.sinks.loggerSink.hdfs.rollSize = 524288
#Number of events written to file before it rolled (0 = never roll based on number of events)                                                                                                                                               
agent.sinks.loggerSink.hdfs.rollCount =0


Also the following are added for additional capacity,-

agent.channels.memoryChannel.capacity = 1000
agent.channels.memoryChannel.transactionCapacity = 100

For additional heap size we have already provided setting in flume-env.sh as under,-
And in flume-env.sh added the following
JAVA_OPTS="-Xms100m -Xmx512m" 


This sucessfully loads the logs to HDFS.

HDFS storage looks like the following,-
parag@PRUBNode1:~/work/hadoop-1.2.1/bin$ hadoop fs -ls /flume/events/
Warning: $HADOOP_HOME is deprecated.

Found 12 items
-rw-r--r--   2 parag supergroup     640216 2015-02-15 00:25 /flume/events/FlumeData.1423940101870
-rw-r--r--   2 parag supergroup     639846 2015-02-15 00:25 /flume/events/FlumeData.1423940101871
-rw-r--r--   2 parag supergroup     640000 2015-02-15 00:25 /flume/events/FlumeData.1423940101872
-rw-r--r--   2 parag supergroup     639747 2015-02-15 00:25 /flume/events/FlumeData.1423940101873
-rw-r--r--   2 parag supergroup     639495 2015-02-15 00:25 /flume/events/FlumeData.1423940101874
-rw-r--r--   2 parag supergroup     639009 2015-02-15 00:25 /flume/events/FlumeData.1423940101875
-rw-r--r--   2 parag supergroup     638991 2015-02-15 00:26 /flume/events/FlumeData.1423940101876
-rw-r--r--   2 parag supergroup     639157 2015-02-15 00:26 /flume/events/FlumeData.1423940101877
-rw-r--r--   2 parag supergroup     639762 2015-02-15 00:26 /flume/events/FlumeData.1423940101878
-rw-r--r--   2 parag supergroup     638749 2015-02-15 00:26 /flume/events/FlumeData.1423940101879
-rw-r--r--   2 parag supergroup     638597 2015-02-15 00:26 /flume/events/FlumeData.1423940101880
-rw-r--r--   2 parag supergroup          0 2015-02-15 00:26 /flume/events/FlumeData.1423940101881.tmp


At this point we would like to see how memory profile of flume process look like.


No doubt this is a very healthy profile foot print as we pass on almost 638 mb in 176 seconds.
It looks like we have used 75 mb where as we have allocated 512mb max. So it is very much possible for this configuration to do more heavy lifting.



Case 3::


Now we can turn our attention to load balancing and failure tolerance etc. We Define our problem set as under,-

1. For log4j to be able to submit the events, flume needs to be available all the time, prevention of any loss of availability will require redundancy on flume side.
2.There needs to be safe delivery of data so flume needs to have persistent channel.
3.Flume needs to have additional depth and should be able to hold data till it is safely handed over to HDFS.

Assumption here is that log4j is reliable in delivery of the payload to source.

So we propose the following lay out with chaining of flume agents.


We prefer to have four property and will be having four different JVM.

Since we are using 4 CPU PC with HDFS master running on the same PC, I do not have ideal infrastructure for a high configuration ,although we shall be able to make do as we have seen earlier.

Below are the config files.

In the above topology , we are configuring 4 agents as under,-

Collector Agent 1 save the file as conf/flumeagentCollector1-conf.properties under flume install location.
##Collector 1  ########################################
agentCollector1.sources = avroSrc1
# For each one of the sources, the type is defined
# agent.sources.seqGenSrc.type = seq
agentCollector1.sources.avroSrc1.type = avro
agentCollector1.sources.avroSrc1.bind = localhost
agentCollector1.sources.avroSrc1.port = 8800
agentCollector1.sources.avroSrc1.batchSize = 10000

# The channel can be defined as follows.
agentCollector1.sources.avroSrc1.channels = fileChannel1

agentCollector1.sinks = avroSink1 avroSink2
agentCollector1.sinks.avroSink1.type = avro
agentCollector1.sinks.avroSink1.hostname=localhost                                                                                                                                            
agentCollector1.sinks.avroSink1.port=8801                                                                                                                                                                
agentCollector1.sinks.avroSink1.channel = fileChannel1
agentCollector1.sinks.avroSink1.serializer=HEADER_AND_TEXT
                                                                                                                                    
agentCollector1.sinks.avroSink2.type = avro
agentCollector1.sinks.avroSink2.hostname=localhost                                                                                                                                             
agentCollector1.sinks.avroSink2.port=8802                                                                                                                                                                 
agentCollector1.sinks.avroSink2.channel = fileChannel1
agentCollector1.sinks.avroSink2.serializer=HEADER_AND_TEXT
                                                                                                                                            
agentCollector1.sinkgroups=sinkgroup1
agentCollector1.sinkgroups.sinkgroup1.sinks=avroSink1 avroSink2
agentCollector1.sinkgroups.sinkgroup1.processor.type=failover
agentCollector1.sinkgroups.sinkgroup1.processor.priority.avroSink1=01
agentCollector1.sinkgroups.sinkgroup1.processor.priority.avroSink2=02
agentCollector1.sinkgroups.sinkgroup1.processor.maxpenality=30000
                          
agentCollector1.channels = fileChannel1

# Each channel's type is defined.

agentCollector1.channels.fileChannel1.type = FILE
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agentCollector1.channels.fileChannel1.checkpointDir=/home/parag/work/flume/fileChannel1/checkpoint
agentCollector1.channels.fileChannel1.dataDirs=/home/parag/work/flume/fileChannel1/data

agentCollector1.channels.fileChannel1.capacity = 1000
agentCollector1.channels.fileChannel1.transactionCapacity = 400
agentCollector1.channels.fileChannel1.keepAlive = 30


##Collector 1 end##################################

Collector Agent 2 save the file as conf/flumeagentCollector2-conf.properties
under flume install location.

##Collector 2####################################
agentCollector2.sources = avroSrc2
# For each one of the sources, the type is defined
# agent.sources.seqGenSrc.type = seq
agentCollector2.sources.avroSrc2.type = avro
agentCollector2.sources.avroSrc2.bind = localhost
agentCollector2.sources.avroSrc2.port = 8803
agentCollector2.sources.avroSrc2.batchSize = 10000
# The channel can be defined as follows.
agentCollector2.sources.avroSrc2.channels = fileChannel2

agentCollector2.sinks = avroSink1 avroSink2
agentCollector2.sinks.avroSink1.type = avro
agentCollector2.sinks.avroSink1.hostname=localhost                                                                                                                                            
agentCollector2.sinks.avroSink1.port=8801                                                                                                                                                                
agentCollector2.sinks.avroSink1.channel = fileChannel2
agentCollector2.sinks.avroSink1.sink.serializer=HEADER_AND_TEXT
                                                                                                                                           
agentCollector2.sinks.avroSink2.type = avro
agentCollector2.sinks.avroSink2.hostname=localhost                                                                                                                                             
agentCollector2.sinks.avroSink2.port=8802                                                                                                                                                                 
agentCollector2.sinks.avroSink2.channel = fileChannel2
agentCollector2.sinks.avroSink2.sink.serializer=HEADER_AND_TEXT
                                                                                                                                               agentCollector2.sinkgroups=sinkgroup1
agentCollector2.sinkgroups.sinkgroup1.sinks=avroSink1 avroSink2
agentCollector2.sinkgroups.sinkgroup1.processor.type=failover
agentCollector2.sinkgroups.sinkgroup1.processor.priority.avroSink1=02
agentCollector2.sinkgroups.sinkgroup1.processor.priority.avroSink2=01
agentCollector2.sinkgroups.sinkgroup1.processor.maxpenality=30000
                          
agentCollector2.channels = fileChannel2

# Each channel's type is defined.

agentCollector2.channels.fileChannel2.type = FILE
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agentCollector2.channels.fileChannel2.checkpointDir=/home/parag/work/flume/fileChannel2/checkpoint
agentCollector2.channels.fileChannel2.dataDirs=/home/parag/work/flume/fileChannel2/data

agentCollector2.channels.fileChannel2.capacity = 1000
agentCollector2.channels.fileChannel2.transactionCapacity = 100
agentCollector2.channels.fileChannel2.keepAlive = 30


##Collector 2 end  ################################

Collector to HDFS bridge 1 save the file as conf/flumebridgeagent1-conf.properties under flume install location.

##Bridge 1####################################
bridgeagent1.sources = avroSrc3
bridgeagent1.sources.avroSrc3.type = avro
bridgeagent1.sources.avroSrc3.bind = localhost
bridgeagent1.sources.avroSrc3.port = 8801
bridgeagent1.sources.avroSrc3.batchSize = 10000
bridgeagent1.sources.avroSrc3.channels = memoryChannel1
bridgeagent1.channels = memoryChannel1
bridgeagent1.sinks = loggerSink1
bridgeagent1.sinks.loggerSink1.type = hdfs
bridgeagent1.sinks.loggerSink1.hdfs.path =hdfs://PRUBNode1:9020/flume/events/
                                                                                                                                          
bridgeagent1.sinks.loggerSink1.hdfs.rollInterval = 0
                                                                                                                                                                    
bridgeagent1.sinks.loggerSink1.hdfs.rollSize = 524288
                                                                                                                                            
bridgeagent1.sinks.loggerSink1.hdfs.rollCount =0
bridgeagent1.sinks.loggerSink1.sink.serializer = HEADER_AND_TEXT
                                                                                                                                                                      
bridgeagent1.sinks.loggerSink1.channel = memoryChannel1
bridgeagent1.channels.memoryChannel1.type = memory
bridgeagent1.channels.memoryChannel1.capacity = 1000
bridgeagent1.channels.memoryChannel1.transactionCapacity = 100
bridgeagent1.channels.memoryChannel1.keepAlive = 30


##Bridge 1 end  ################################

Collector to HDFS bridge 2 save the file as conf/flumebridgeagent2-conf.properties under flume install location.

##Bridge 2####################################
bridgeagent2.sources = avroSrc4
bridgeagent2.sources.avroSrc4.type = avro
bridgeagent2.sources.avroSrc4.bind = localhost
bridgeagent2.sources.avroSrc4.port = 8802
bridgeagent2.sources.avroSrc4.batchSize = 10000
bridgeagent2.sources.avroSrc4.channels = memoryChannel2
bridgeagent2.channels = memoryChannel2
bridgeagent2.sinks = loggerSink2
bridgeagent2.sinks.loggerSink2.type = hdfs
bridgeagent2.sinks.loggerSink2.hdfs.path =hdfs://PRUBNode1:9020/flume/events/
                                                                                                                                          
bridgeagent2.sinks.loggerSink2.hdfs.rollInterval = 0
                                                                                                                                                                    
bridgeagent2.sinks.loggerSink2.hdfs.rollSize = 524288
                                                                                                                                            
bridgeagent2.sinks.loggerSink2.hdfs.rollCount =0
bridgeagent2.sinks.loggerSink2.sink.serializer = HEADER_AND_TEXT
                                                                                                                                                                      
bridgeagent2.sinks.loggerSink2.channel = memoryChannel2
bridgeagent2.channels.memoryChannel2.type = memory
bridgeagent2.channels.memoryChannel2.capacity = 1000
bridgeagent2.channels.memoryChannel2.transactionCapacity = 100
bridgeagent2.channels.memoryChannel2.keepAlive = 30


##Bridge 2 end  ################################


Let us now focus on what is the nature of processes that we can see. We shall be gradually increasing load and see how does our system behave. But before that we need to set up the Ganglia to monitor the processes.

Ganglia
Now it is our time to set up ganglia.

For Ganglia set up , I shall ask you to refer to the following blog which is very good and helped me set up ganglia very easily

https://www.digitalocean.com/community/tutorials/introduction-to-ganglia-on-ubuntu-14-04.

Following are the basic commands,-
Install Ganglia
sudo apt-get install -y ganglia-monitor rrdtool gmetad ganglia-webfrontend

Use following to copy apache configuration to ganglia configur
sudo cp /etc/ganglia-webfrontend/apache.conf /etc/apache2/sites-enabled/ganglia.conf

Go to /etc/ganglia folder to updated the following sections in gmond.conf as shown in bold and italics

cluster {
  name = "prcluster"
  owner = "unspecified"
  latlong = "unspecified"
  url = "unspecified"
}

udp_send_channel {
  #mcast_join = 239.2.11.71
  host = localhost
  port = 8649
  ttl = 1
}

udp_recv_channel {
  #mcast_join = 239.2.11.71
  port = 8649
  #bind = 239.2.11.71
}

Go to /etc/ganglia folder to updated the following sections in gmetad.conf as shown in bold and italics
data_source "prcluster" 10 localhost

Restart Ganglia
sudo service ganglia-monitor restart && sudo service gmetad restart && sudo service apache2 restart


Starting flume service

The HDFS needs to be started first and the following flume agents should be started in the given order.

./flume-ng agent --conf ../conf/ -f ../conf/flumebridgeagent2-conf.properties -Dflume.root.logger=DEBUG,console -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=localhost:8649 -n bridgeagent2
 

./flume-ng agent --conf ../conf/ -f ../conf/flumebridgeagent1-conf.properties -Dflume.root.logger=INFO,console -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=localhost:8649 -n bridgeagent1

./flume-ng agent --conf ../conf/ -f ../conf/flumeagentCollector2-conf.properties -Dflume.root.logger=INFO,console -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=localhost:8649 -n agentCollector2
 

./flume-ng agent --conf ../conf/ -f ../conf/flumeagentCollector1-conf.properties -Dflume.root.logger=INFO,console -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=localhost:8649 -n agentCollector1

The following option enables flume to report metrices to ganglia
-Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=localhost:8649

Monitoring in ganglia


Graphs will appear in Ganglia if the local host is selected at the top drop down.
We shall now inspect the results.



It is notable that we do not see any capacity spill over to the redundant route we have provided and only one collector and one bridge used.

Now in the next step we start scaling up the load on the set up by launching parallel processes. In my case I have launched 30 parallel processes before  I could see that first two agents getting into full capacity and redundant channels were used.

we have below the matrices of the first two agents in full capacity


We can see that the channels filled up for collector agent 1 and bridge agent 1
CPU load and the network load also shows peak during the time.

The Following graphs show the activity in  the collector agent 2 and bridge agent 2


This shows the load balancing and horizontal scaling capability built in in flume.
 

Saturday, 29 November 2014

Introduction to Data science Part 8:Uploading data with flume

Introduction to Data science Part 8:Uploading data with flume

Parag Ray
29-Nov-2014

Introduction

Welcome to the readers!

This is the eighth part of the series of articles. Here we are looking into the needs of uploading data into HDFS or data acquisition tools. We shall be exploring flume as part of the strategy.

If you have not gone through already, you would need to go back to fourth part where multi node set up(see related readings) is discussed, as this post will depend on the multi node set up.
Please note the related readings and target audience section to get help to better follow the blog. 

We are using the operating system Ubuntu 14.04. Hadoop version 1.2.1 and Flume version 1.5.2. java version 1.7.

Disclaimer
I do not represent any particular organization in providing this details here.Implementation of tools and techniques discussed in this presentation should be done by responsible Implementer, capable of undertaking such tasks.

Agenda
  • Target audience
  • Related readings/other blogs
  • Requirements of data acquisition strategy
  • Flume introduction 
  • Flume set up
  • Test
Target audience

  • This is an intermediate level discussion on Hadoop and related tools.
  • Best suited for audience who are looking for introduction to this technology.
  • Prior knowledge in java and Linux required.
  • Intermediate level understanding of networking necessary.
Related readings/other blogs  
This is the eighth article of this series or blogs, other article shortcuts are available in the pages tab.
You would also like to look at Flume, Cloudera  & Hadoop home page for further details.

Requirements of data acquisition strategy

While HDFS works as a good storage infrastructure for storing massive data, providing optimized & fault tolerant data serving capability, we do need separate strategy for acquiring data.

We need high portability or interoperability , scalability and fault tolerance also so that we can reliably gather data from wide range of sources.

Flume is one such platform which can be used to capture data into HDFS.
  
Flume introduction 

Flume is a tool for capturing data from various sources and offers a very flexible way to capture large volume data into HDFS.


It provides features to capture data from multiple sources like logs of web applications or command output.

As a strategy it segregates accumulation of data and consumption of the same.
By providing a storage facility between the two , it can handle a large amount of difference in the accumulation rate and consumption rate.

With persistence between storage and consumption, it guarantees delivery of the captured data.

By having chaining capability and networking between various flume units(agents) it provides a complex and conditional data delivery capability which is also scalable horizontally.

This is not centrally controlled so it is not having any single point of failure.


Flume is designed to run stand alone as JVM process, each one is called an AGENT.
Every capture of data payload as byte array into the Agent is called an EVENT.

Typically , there is a client process , which will be the source of events. Flume can handle various types of data input as shown,-

Avro, log4j, syslog, Http POST(with JSON Body) ,twitter input and Exec source,last one being the out put of a local process call.

Source is a component which is responsible for capturing input. There are multiple type of sources by the nature of input that we are capturing.

Sink is the component which is persisting or delivering the captured data. Depending upon where the data is being delivered, there are various types of sinks like file system, hdfs etc.

Between source and the sink, there is CHANNEL. Channel holds data after it is written to by source and till it is consumed and delivered by sink.

Delivery guarantee is achieved by selecting persistent channel and it holds the data till it is delivered by sink. In case the Flume agent goes down before all of the data is delivered, it can redo delivery once it comes back up as it is retained by channel. However, in case the channel is none persistent,  such guarantee can not work.

All configuration changes are reflected without restart and the flume agents can be chained to create complex and conditional topology.

This done by chaining the sink of one agent to the source of another.
 
Flume set up
 
Flume gzip file needs to be downloaded and extracted to a folder. Below picture shows the flume version 1.5.2 was downloaded and extracted





We shall be setting up the components as shown below,-

In the below configuration ,The agent is called 'agent1' which will have source 's1' , channel 'c1' and sink 'k1'
s1, c1 and k1 will have relevant properties and at the end source s1 and the sink k1 is linked to channel c1.

Here telnet is taken as source for simple testing, and hdfs path is provided to the sink.

Open flume-conf.properties.template in conf folder under the folder created by extraction of the flume set up, and provide the following entries. All other example entries can be commented.

agent1.sources = s1
agent1.channels = c1
agent1.sinks = k1
# For each one of the sources, the type is defined'
#telnet source indicated by type netcat and port as provided on localhost
agent1.sources.s1.type = netcat
agent1.sources.s1.bind = localhost
agent1.sources.s1.port = 44444

# Each sink's type must be defined, hdfs sink at specified url is given

#if needed, the host name (PRUBNode1) and port should be changed.
#host and port should be in line with hdfs core-site.xml config.
agent1.sinks.k1.type = hdfs
agent1.sinks.k1.hdfs.path =hdfs://PRUBNode1:9020/flume/webdata/

# Each channel's type is defined.

agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100

#link source and sink to channel
agent1.sources.s1.channels = c1
agent1.sinks.k1.channel = c1


Special attention should be given to hdfs.path the property should be as per fs.default.name in core site.The matching entry in my case is as under in core-site.xml.

<property>
        <name>fs.default.name</name>
        <value>hdfs://PRUBNode1:9020</value>
    </property>


The part /flume/webdata is arbitrary and will be created by flume if not already present.
care should be taken for authentication. We are assuming that the user running flume is the same or used sudo to hadoop user.
Test
Now to test, first we have to start HDFS. so we need to move to HADOOP_HOME/bin and issue ./start-dfs.sh or ./start-all.sh

After HDFS is started and we an see that datanode , name node , secondary name node are active , then we can fire the following command to start flume from the bin folder under the folder created in the extraction of tar file as part of set up,-

  ./flume-ng agent -n agent1 -c conf -f ../conf/flume-conf.properties.template

 flume-ng is the executable command to start the flume following are the parameters explained,-

agent -n : choice of the agent name should come after this , in this case 'agent1'
conf -f: choice of the config file in this case flume-conf.properties.template under conf folder. we updated this properties file few steps before. 

Once this command is fired,there will be a lot of out put on screen, however following would be something where the out put should stop and you shall be able to see as under,-
14/11/30 02:02:38 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]

Since the flume agent is started as foreground process,terminating the command will stop the agent . So we shall open another terminal and issues following command to connect to flume,-
telnet localhost 44444
Once the prompt comes back, we can use telnet prompt to input data by typing

hello world[return]
telnet returns>OK
first upload[return]
telnet returns>OK
second upload[return]
telnet returns>OK



Now we can go to admin console of hdfs to check file,-

Saturday, 22 November 2014

Introduction to Data science Part 5:map red set up

Parag Ray
29-Sep-2014

Introduction

Welcome to the readers!

This is the fifth part of the series of articles. Here we actually start setting up map reduce in a multi node cluster and have a hands on experience of the map reduce.

If you have not gone through already,You would necessarily  go back to fourth part of the multi node set up as this post will depend on the multi node set up.

Please note the related readings and target audience section to get help to better follow the blog.

 
We are assuming the operating system to be Ubuntu 14.04.

Agenda
  • Target audience
  • Related readings/other blogs
  • MR configuration
  • Test
Target audience

  • This is an intermediate level discussion on Hadoop and related tools.
  • Best suited for audience who are looking for introduction to this technology.
  • Prior knowledge in java and Linux required.
  • Intermediate level understanding of networking necessary.
Related readings/other blogs  
You would also like to look at Cloudera home page & Hadoop home page for further details.

MR configuration

We shall do the configuration for master first.
Please move to $HADOOP_HOME folder and then conf folder under this.
All Map reduce and HDFS configuration files are located here.
Add the following entries to mapred-site.xml:
<configuration>     
<property>
         <name>mapred.job.tracker</name>
         <value>PRUBNode1:9001</value>
     </property>
<property>
         <name>mapred.local.dir</name>
         <value>/home/parag/work/mrhome/localdir</value>
     </property>
<property>
         <name>mapred.system.dir</name>
         <value>/home/parag/work/mrhome/sysdir</value>
     </property>
</configuration> 
 
Please note: PRUBNode1 is my hostname for the master node, please change it 
for host name of your master node.
mapred.local.dir  & mapred.system.dir  should be given a generic directory name 
which has to be the same (once we move to multinode) for all nodes. Directory needs 
be separately created as they will not be created by Hadoop.

For slave also the same configuration change is needed.

Start the hadoop daemons from the master node, issue following command from  $HADOOP_HOME/bin folder:
$ bin/start-all.sh 


as discussed in the first few blogs in this series, the Jobtracker is the master for map reduce and it should be visible now if you issue the following command,-

jps

Sample out put from Master,
parag@PRUBNode1:~/work/hadoop-1.2.1/bin$ jps
9530 DataNode
9788 JobTracker
9700 SecondaryNameNode
9962 TaskTracker
9357 NameNode
10252 Jps
 

Job tracker also has a built in UI for basic monitoring,-
http://<<hostname>>:50030/  for example in my case it is http://PRUBNode1:50030


TEST MR installation

Test 1: issue jps command and you would expect to see out put as in previous section.  master will show all jobs . Slave will show only datanode and task tracker.

Test 2:
go to admin UI (http://<<hostname>>:50030) and check all nodes are visible or not.


Test 3: run a test job

Copy the input files into the distributed filesystem:
$ bin/hadoop fs -put conf input
Run some of the examples provided:
$ bin/hadoop jar hadoop-examples-*.jar grep input output 'dfs[a-z.]+'
Examine the output files:
Copy the output files from the distributed filesystem to the local filesytem and examine them:
$ bin/hadoop fs -get output output
$ cat output/*
or
View the output files on the distributed filesystem:
$ bin/hadoop fs -cat output/*
When you're done, stop the daemons with:
$ bin/stop-all.sh
 

Thursday, 20 November 2014

Introduction to Data science Part 4:HDFS multi-node set up

Parag Ray
29-Sep-2014

Introduction

Welcome to the readers!

This is the fourth part of the series of articles. Here we actually start setting up HDFS in a multi node cluster and have a hands on experience of the HDFS.

If you have not gone through already,You would necessarily  go back to third part of the single node set up(see related readings) as this post will depend on the single node set up.

Please note the related readings and target audience section to get help to better follow the blog.

 
We are assuming the operating system to be Ubuntu 14.04.


Agenda
  • Target audience
  • Related readings/other blogs
  • Multi-node cluster setup steps
Target audience

  • This is an intermediate level discussion on Hadoop and related tools.
  • Best suited for audience who are looking for introduction to this technology.
  • Prior knowledge in java and Linux required.
  • Intermediate level understanding of networking necessary.
Related readings/other blogs  
Please refer MR setup first part in section 3 first.You would also like to look at Cloudera home page & Hadoop home page for further details.

Multi-node cluster setup steps

Multi-node set up requires that single node set up is done in each of the nodes. This can be done any time before step 3 Hadoop configuration below.

  1. Network connect
     Connect the underlying network
  2. Ssh connectivity
     Setup ssh connection
  3. HDFS configuration for multi-node. 
Network connect

A home network is demonstrated here.The set up is academic and not production quality. A full blown setup would set up comprise of all components including segment A to B below, but we are covering only segment B here with two nodes, one master and one slave.IP 4 network is shown here.

 Ip address set up:
Navigate to Use system settings > network>wired connection and turn the Wired network on.


Set 'manual' 'ip4' config and provide ip 'address' and use same 'netmask' for all PCs in network. Network config details may come under 'Options' button


In case you are using other Linux , the set up may be different. Here is one example for oracle linux.

SSH connectivity

SSH connectivity is used for connectivity may be used across nodes.
This needs to be connectivity across nodes where data communication is expected. Two way connectivity between master and slave node is a must.
Self connectivity is also a must.









In the above diagram NN indicate name node and DN indicate datanode of course.
  • Master and slaves talk via ssh connectivity.
     
  • We need ssh without need to provide credential for each requests.
     
  • The set up for ssh without password to provided is not ideal for security but
    In this case for each transaction of hadoop, providing credential is not practical.
     
  • There are other ways ,but for the purpose of this presentation, we shall stick to this set up
     
  • Both master and slave nodes must open ssh connectivity to each other  and
    to themselves.
Now for details steps

  • It is assumed that for all these we are using  the hadoop user. Sudo to respective user is also an option.
  • First from master node check that  ssh to the localhost  happens without a passphrase:
ssh localhost
  • If passphrase is requested, execute the following commands:
ssh-keygen -t rsa -P “” -f ~/.ssh/id_rsa
(The part -f ~/.ssh/id_rsa is not mandatory as that is the default location and system will ask for this location if not provided)
  • Copy public key to authorized_keys file
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
  • On master node try to ssh again to localhost and if passphrase is still requested, do the  following
chmod 700 $HOME $HOME/.ssh
chmod 600 $HOME/.ssh/authorized_keys
chown  <<hadoop user>> $HOME/.ssh/authorized_keys

it will be a good idea to check if you can do
ssh localhost and also ssh <<nodename>>
  • If after executing ssh localhost any error message is returned that indicates that ssh server is not installed you may like to try the following,-
    • Use the following command to install ssh server as under,-
sudo apt-get install openssh-server

  • Then can repeat the steps starting from trying ssh localhost
  • Now public key needs to be copied to all of  slave machine (only one here).  Ssh should be installed in all slave machines. We do strongly recommend ssh understanding.
scp ~/.ssh/id_rsa.pub <<slave host name>>:~/.ssh/<<master host name>>.pub   (/etc/hosts should be updated with slave and master host entries )
  • Now login (ssh <<slave host name>>)  to your slave machine. Password will be needed  still.
  • While on your slave machine ,issue the following command  to append your master machine’s hadoop user’s public key to the slave machine’s  authorized key store,
cat ~/.ssh/<<master host name>>.pub >> ~/.ssh/authorized_keys
  • Issue exit to come out of slave
  • Now, from the master node try to ssh to slave.
ssh <<slave host name>>
  • if passphrase is still requested, do the  following after  logging in with password to slave
chmod 700 $HOME $HOME/.ssh
chmod 600 $HOME/.ssh/authorized_keys 
chown  <<hadoop user>> $HOME/.ssh/authorized_keys
  • Exit and log in again from your master node.
ssh <<slave host name>>
  • if it still asks for password , log in using password, log out and issue 
ssh-add
  • Repeat for all Hadoop Cluster Nodes which now will point back to master.
 HDFS configuration for multi-node.

 /etc/hosts
should have all host names
parag@PRUBNode1:/etc$ cat hosts
10.192.0.2    PRUBNode2
10.192.0.1    PRUBNode1
127.0.0.1    localhost
 

there could be one problem that might happen out of hosts file.
 to detect this problem run sudo netstat -ntlp on the master , it shows:
tcp6 0 0 127.0.0.1:9020 :::* LISTEN 32646/java
This 127.0.0.1 means that it is only listening to connection on 9020 which is from localhost , all the connection on 9020 from outside cannot be received.


this may be resolved by preventing entries that point local host for both actual ip and 127.0.0.1.
the entry shown above will not have this problem but the following will


10.192.0.1     localhost
 10.192.0.2    PRUBNode2
10.192.0.1    PRUBNode1
127.0.0.1      localhost


 there is likely to be problems if hostnames are missed out as well.
slaves
slaves file in the folder $HADOOP_HOME/conf ,should have all the node names that has data node running
 we have proved the listing as below,
parag@PRUBNode1:~/work/hadoop-1.2.1/conf$ cat slaves
PRUBNode1
PRUBNode2

master
master file n the folder $HADOOP_HOME/conf should have all the node names that has secondarynamenode running
parag@PRUBNode1:~/work/hadoop-1.2.1/conf$ cat masters
PRUBNode1

hdfs-site.xml
<configuration>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<property>
<name>dfs.name.dir</n

ame>
<value>/home/parag/work/hdf/name</value>
</property>
<property>
<name>dfs.data.dir</name>
<value>/home/parag/work/hdf/data</value>
</property>
</configuration>


all folders should be generic and same across all nodes

core-site.xml
<configuration>

    <property>
        <name>fs.default.name</name>
        <value>hdfs://PRUBNode1:9020</value>
    </property>
<property>
        <name>hadoop.tmp.dir</name>
        <value>/home/parag/work/hdf/tmp</value>
    </property>

</configuration>

all folders should be generic and same across all nodes. we are now mentioning the nodename of master for fs.defaultt.name instead of local host as we expect it to be accessed from different nodes.

 Starting HDFS

In the name node containing Node issue command 
./start-dfs.sh 
from $HADOOP_HOME/bin folder

it should start name node , data node and secondary name node  in master and data node in slave.

if it is first time, name node format to be done. 

Test set up

Test 1: Issue command jps at master
parag@PRUBNode1:~/work/hadoop-1.2.1/bin$ jps
9530 DataNode
9700 SecondaryNameNode
9357 NameNode
10252 Jps
 


Issue command jps at slave will show only DataNode

Test 2: Open administrative console at http://<<hostname of master>>:50070
for me it is http://PRUBNode1:50070