TechAlpine – The Technology world

Steps to write Map Reduce program using Hadoop framework and java

MapReduce

MapReduce

Overview: Big data is a new paradigm and its processing is the most important area to concentrate. Big data refers to the big volume of data which comes from different sources like social media, sensor data, mobile data, internet data and many more. In this article we will concentrate on the processing part using Hadoop framework and Map-Reduce programming. Map-Reduce can be defined as a special type of programming framework used to process huge amount of data in a distributed framework called commodity hardware. So the article will describe the Map-Reduce concepts and its practical implementation using Java code.

Introduction: Hadoop MapReduce can be defined as a software programming framework used to process big volume of data (in terabyte level) in parallel environment of clustered nodes. The cluster consists of thousands of nodes of commodity hardware. The processing is distributed, reliable and fault tolerant. A typical MapReduce job is performed according to the following steps

1) Split the data into independent chunks based on key-value pair. This is done by Map task in a parallel manner.

2) The out put of the Map job is sorted based on the key values.

3) The sorted output is the input to the Reduce job. And then it produces the final output to the processing and returns the result to the client.

Note: Commodity hardware is basically low cost computer systems

MapReduce framework: Apache Hadoop MapReduce framework is written in Java. The framework consists of master-slave configuration. The master is known as JobTracker and the slaves are known as TaskTrackers.The master controls the task processed on the slaves (which are nothing but the nodes in a cluster). The computation is done on the slaves. So the compute and storages nodes are the same in a clustered environment. The concept is ‘ move the computation to the nodes where the data is stored’, and it makes the processing faster.

MapReduce Processing: The MapReduce framework model is very light weight. So the cost of hardware is low compared to other frameworks. But at the same time we should understand that the model works efficiently only in a distributed environment as the processing is done on nodes where the data resides. The other features like scalability, reliability and fault tolerance also works well on distributed environment.

MapReduce implementation: Now we will discuss about the implementation of MapReduce model using Java programming platform. Following are the different components of the entire end to end implementation.

  • The client program which is the driver class and initiates the process
  • The Map function which performs the split using the key-value pair.
  • The Reduce function which aggregate the processed data and send the output back to the client.

Driver class: Following is a driver class which binds the Map and Reduce function and starts the processing. This is the client program which initiates the process. 

Listing1: The client program (driver class) initiating the process

[code]

package com.mapreduce.techalpine;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**

* @author kaushik pal

*

* This is the main driver class to initiate the map-reduce

* process. It sets the back ground for the entire process and

* Then starts it.

*/

public class DevXDriver {

public static void main(String[] args) throws Exception {

// Initiate configuration

Configuration configx = new Configuration();

// Add resource files

configx.addResource(new Path(“/user/hadoop/core-site.xml”));

configx.addResource(new Path(“/user/hadoop/hdfs-site.xml”));

// Create MapReduce job

Job devxmapjob = new Job(configx,”DevXDriver.class”);

devxmapjob.setJarByClass(DevXDriver.class);

devxmapjob.setJobName(“DevX MapReduce Job”);

// Set output kay and value class

devxmapjob.setOutputKeyClass(Text.class);

devxmapjob.setOutputValueClass(Text.class);

// Set Map class

devxmapjob.setMapperClass(DevXMap.class);

// Set Combiner class

devxmapjob.setCombinerClass(DevXReducer.class);

// Set Reducer class

devxmapjob.setReducerClass(DevXReducer.class);

// Set Map output key and value classes

devxmapjob.setMapOutputKeyClass(Text.class);

devxmapjob.setMapOutputValueClass(Text.class);

// Set number of reducer tasks

devxmapjob.setNumReduceTasks(10);

// Set input and output format classes

devxmapjob.setInputFormatClass(TextInputFormat.class);

devxmapjob.setOutputFormatClass(TextOutputFormat.class);

// Set input and output path

FileInputFormat.addInputPath(devxmapjob, new Path(“/user/map_reduce/input/”));

FileOutputFormat.setOutputPath(devxmapjob,new Path(“/user/map_reduce/output”));

// Start MapReduce job

devxmapjob.waitForCompletion(true);

}

}

[/code]

Map function: This is responsible for splitting the data based on the key-value pair. This is known as mapping of data.

Listing2: This is a Map function splitting the data into chunks

[code]

package com.mapreduce.techalpine;

import java.io.BufferedReader;

import java.io.InputStreamReader;

import java.net.URI;

import java.util.StringTokenizer;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

/**

* @author kaushik pal

*

* This is the map process. It does the mapping for keyword-value pair.

*/

public class DevXMap extends Mapper<LongWritable, Text, Text,Text> {

// Create Path, BufferedReader and Text variables

Path file_path;

BufferedReader buffer_reader;

Text tweet_values = new Text();

/**

* @param key

* @param value

* @param context

*/

public void map(LongWritable key, Text value, Context context)  {

try{

// Create configuration for Map

Configuration map_config = new Configuration();

// Load Hadoop core files in configuration

map_config.addResource(new Path(“/user/hadoop/core-site.xml”));

map_config.addResource(new Path(“/user/hadoop/hdfs-site.xml”));

// Create variables

String searchkeyword = “”;

// Open file from the file path

file_path=new Path(“files/repository/keys.txt”);

FileSystem file_system = FileSystem.get(URI.create(“files/repository/keys.txt”),new Configuration());

// Load buffer reader

buffer_reader=new BufferedReader(new InputStreamReader(file_system.open(file_path)));

while(buffer_reader.ready())

{

searchkeyword=buffer_reader.readLine().trim();

}

// Get key value

final Text key_value = new Text(searchkeyword);

// Check value and take decision

if(value == null)

{

return;

}

else{

StringTokenizer string_tokens = new StringTokenizer(value.toString(),”,”);

int count = 0;

while(string_tokens.hasMoreTokens()) {

count ++;

if(count <=1)

continue;

String new_tweet_value = string_tokens.nextToken().toLowerCase().trim().replaceAll(“\\*”,””);

if(new_tweet_value.contains(searchkeyword.toLowerCase().trim()))                                                                                          {

tweet_values.set(new_tweet_value);

context.write(key_value,tweet_values);

}

}

}

}

catch(Exception e){

e.printStackTrace();

}

}

}

[/code] 

Reduce function: This is responsible for aggregating the data. The aggregation is done based on the key values. So after processing and sorting the aggregation completed and sends the result back to the client program.

Listing3: The Reduce function aggregates the processed data

[code]

package com.mapreduce.techalpine;

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import java.net.URI;

import java.util.RandomAccess;

import java.util.regex.Matcher;

import java.util.regex.Pattern;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

/**

* @author kaushik pal

*

* This is the reducer function. It aggregates the output based on the

* sorting of key-value pairs.

*/

public class DevXReducer extends Reducer<Text ,Text,Text,Text>

{

// Create variables for file path

Path positive_file_path;

Path negative_file_path;

Path output_file_path;

Path keyword_file_path;

// Create variables for buffer

BufferedReader positive_buff_reader;

BufferedReader negative_buff_reader;

BufferedReader keyword_buff_reader;

// Create variables for calculation

static Double total_record_count=new Double(“0”);

static Double count_neg=new Double(“0”);

static Double count_pos=new Double(“0”);

static Double count_neu=new Double(“0”);

static Double percent_neg=new Double(“0”);

static Double percent_pos=new Double(“0”);

static Double percent_neu=new Double(“0”);

Pattern pattrn_matcher;

Matcher matcher_txt;

static int new_row=0;

FSDataOutputStream out_1st,out_2nd;

/**

* @param key

* @param values

* @param context

* @throws IOException

* @throws InterruptedException

*/

public void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException

{

// Create configuration for reducer

Configuration reduce_config = new Configuration();

// Load hadoop config files

reduce_config.addResource(new Path(“/user/hadoop/core-site.xml”));

reduce_config.addResource(new Path(“/user/hadoop/hdfs-site.xml”));

// Create variables

String key_word = “”;

String check_keyword=key_word;

keyword_file_path=new Path(“files/repository/keys.txt”);

FileSystem file_system_read = FileSystem.get(URI.create(“files/repository/keys.txt”),new Configuration());

keyword_buff_reader=new BufferedReader(new InputStreamReader(file_system_read.open(keyword_file_path)));

FileSystem get_filesys = FileSystem.get(reduce_config);

FileSystem get_filesys_posneg = FileSystem.get(reduce_config);

Path path_output = new Path(“/user/sentiment_output_file.txt”);

Path path_output_posneg = new Path(“/user/posneg_output_file.txt”);

// Get keyword

while(keyword_buff_reader.ready())

{

key_word=keyword_buff_reader.readLine().trim();

}

// Check file system

if (!get_filesys.exists(path_output)) {

out_1st = get_filesys.create(path_output);

out_2nd = get_filesys_posneg.create(path_output_posneg);

}

// Check keyword matching using positive and negative dictionaries

if(check_keyword.equals(key.toString().toLowerCase()))

{

for(Text new_tweets:values)

{

// Load positive word dictionary

positive_file_path=new Path(“/user/map_reduce/pos_words.txt”);

FileSystem filesystem_one = FileSystem.get(URI.create(“files/pos_words.txt”),new Configuration());

positive_buff_reader=new BufferedReader(new InputStreamReader(filesystem_one.open(positive_file_path)));

// Load negative word disctinary

negative_file_path = new Path(“/user/map_reduce/neg_words.txt”);

FileSystem filesystem_two = FileSystem.get(URI.create(“files/neg_words.txt”),new Configuration());

negative_buff_reader =new BufferedReader(new InputStreamReader(filesystem_two.open(negative_file_path)));

++total_record_count;

boolean first_flag=false;

boolean second_flag=false;

String all_tweets=new_tweets.toString();

String first_regex = “”;

String second_regex = “”;

while(positive_buff_reader.ready())

{

first_regex=positive_buff_reader.readLine().trim();

new_row++;

pattrn_matcher = Pattern.compile(first_regex, Pattern.CASE_INSENSITIVE);

matcher_txt = pattrn_matcher.matcher(all_tweets);

first_flag=matcher_txt.find();

 

if(first_flag)

{

out_2nd.writeBytes(all_tweets);

context.write(new Text(first_regex),new Text(all_tweets));

break;

}

}

while(negative_buff_reader.ready())

{

new_row++;

second_regex=negative_buff_reader.readLine().trim();

pattrn_matcher = Pattern.compile(second_regex, Pattern.CASE_INSENSITIVE);

matcher_txt = pattrn_matcher.matcher(all_tweets);

second_flag=matcher_txt.find();

if(second_flag)

{

out_2nd.writeBytes(all_tweets);

context.write(new Text(second_regex),new Text(all_tweets));

break;

}

}

if(first_flag&second_flag)

{

++count_neu;

}

else

{

if(first_flag)

{

++count_pos;

}

if(second_flag)

{

++count_neg;

}

if(first_flag==false&second_flag==false)

{

++count_neu;

}

}

// Close buffers

negative_buff_reader.close();

positive_buff_reader.close();

}

// Calculate percent values

percent_pos=count_pos/total_record_count*100;

percent_neg=count_neg/total_record_count*100;

percent_neu=count_neu/total_record_count*100;

try{

// Write to the files

out_1st.writeBytes(“\n”+key_word);

out_1st.writeBytes(“,”+total_record_count);

out_1st.writeBytes(“,”+percent_neg);

out_1st.writeBytes(“,”+percent_pos);

out_1st.writeBytes(“,”+percent_neu);

// Close file systems

out_1st.close();

get_filesys.close();

}catch(Exception e){

e.printStackTrace();

}

}

}

}

[/code]

Conclusion: In this article I have discussed the MapReduce processing using Java programming environment. The different component like Map and Reduce function perform the main task and returns the output to the client. The processing performs efficiently on distributed environment only. So we should set up the Apache Hadoop framework on a distributed environment to get the best result.

Hope you have enjoyed the article and you will be able to implement it in your practical programming. Keep reading.

Tagged on: ,

Leave a Reply

Your email address will not be published. Required fields are marked *


4 − 1 =

TechAlpine Books
-----------------------------------------------------------