A use case for what-if tool – Long horizon forecasting example
27 April 0202Implement Infinite Scroll Pagination in AngularJS
1 April 2014Tags
Published by
BluePi
Data-Driven Business Transformation
Implementing Join in Hadoop Map-Reduce
Lets define the problem statement first
There are two Data sources namely ds1 and ds2. ds1 is represented by data1.txt which contains multiple columns separated by comma. We are going to deal with just two columns (groupkey and status1) This is how a record in data1.txt looks like groupkey,x1,x2,x3,x4,status1
ds2 is another csv datastore contained in data2.txt which also contains multiple columns separated by comma. we are going to deal with two coumns (groupkey and status2). A sample record would look like
1,"some value","some value","some value","some value","Ready"
groupkey,x5,x6,x7,x48,status2
1,"some value","some value","some value","some value","Pending"
Our aim is to get the output data source containing status1 and status2 columns corresponding to the same groupkey. Let’s define various components that would allow us to achieve our objective. We will be using a Maven project to demonstrate the code, so lets create a maven project first and add following dependecny to the pom.xml
Now lets write some code, firstly we need to define a Mapper class and implement the map method inside it
public class Mapper1 extends MapReduceBase implements Mapper { private String commonkey,
status1, fileTag = "s1~"; public void map(LongWritable key, Text value,OutputCollector output, Reporter
reporter) throws IOException { // taking one line/record at a time and parsing them into key value pairs
String values[] = value.toString().split(","); commonkey = values[0].trim(); status1 = values[4].trim(); //
sending the key value pair out of mapper output.collect(new Text(commonkey), new Text(fileTag + status1)); } }
map method defined above processes data1.txt and frames the initial key value pairs Key(Text) – commonkey Value(Text) – An identifier to indicate the source of input(using s1’ for the “file1″ file) + status1
Define another Mapper which does similar stuff on the other datasource namely ds2
public class Mapper2 extends MapReduceBase implements Mapper
{ // variables to process delivery report private String commonkey, status2, fileTag = "s2~";
public void map(LongWritable key, Text value,OutputCollector[Text, Text] output, Reporter reporter)
throws IOException { String line = value.toString(); String values[] = line.split(",");
commonkey = values[3].trim(); status2 = values[23].trim(); output.collect(new Text(commonkey),
new Text(fileTag + status2)); } }
Its time to implement a Reducer now,
public class StatusReducer extends MapReduceBase implements Reducer
[Text, Text, Text, Text] { // Variables to aid the join process private String status1,
status2; public void reduce(Text key, Iterator values,OutputCollector; output, Reporter reporter) throws IOException { while (values.hasNext()) { String currValue = values.next().toString(); String splitVals[] = currValue.split("~"); /* * identifying the record source that corresponds to a commonkey and * parses the values accordingly */ if (splitVals[0].equals("s1")) { status1 = splitVals[1] != null ? splitVals[1].trim(): "status1"; } else if (splitVals[0].equals("s2")) { // getting the file2 and using the same to obtain the Message status2 = splitVals[2] != null ? splitVals[2].trim(): "status2"; } output.collect(new Text(status1), new Text(status2)); } } }
For running the Hadoop join we need to write a class which becomes the job runner, we need to configure various parameters like Mapper, Reducer etc in the Job Configuration in order to run it correctly, following configurations need to be set up
public class Executor extends Configured implements Tool
{ public int run(String[] args) throws Exception { JobConf conf = new JobConf(getConf(), Excecutor.class); conf.setJobName("SMS Reports"); // setting key value types for mapper and reducer outputs conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(Text.class); // specifying the custom reducer class conf.setReducerClass(StatusReducer.class); // Specifying the input directories(@ runtime) and Mappers independently // for inputs from multiple sources MultipleInputs.addInputPath(conf, new Path(args[0]), TextInputFormat.class, Mapper1.class); MultipleInputs.addInputPath(conf, new Path(args[1]), TextInputFormat.class, Mapper2.class); // Specifying the output directory @ runtime FileOutputFormat.setOutputPath(conf, new Path(args[2])); JobClient.runJob(conf); } } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new Excecutor(),args); System.exit(res); } }
The next step is now to create a jar file, which simply could be done with following command run at the root of your directory from command line
mvn clean package
This creates the desired jar file(lets call it mapreduce-join-example.jar) in the target folder of your project, transfer the jar to the Master Node of the Hadoop cluster. Also we need to copy the files to be processed on the HDFS before we run the job, this could be done with following command
hadoop dfs -copyFromLocal /home/ds1/data1.txt /hdfs_home/data1 hadoop dfs
-copyFromLocal /home/ds2/data2.txt /hdfs_home/data2
Once files are copied we can run the jar file on Hadoop cluster using the following command
hadoop jar mapreduce-join-example.jar com.bluepi.join.Executor /hdfs_home/data1
/hdfs_home/data2
/hdfs_home/output
and thats it, your output directory should contain the resultant file with desired results.