Steps to write Map Reduce program using Hadoop framework and java

MapReduce

MapReduce

概觀: Big data is a new paradigm and its processing is the most important area to concentrate. Big data refers to the big volume of data which comes from different sources like social media, sensor data, mobile data, internet data and many more. In this article we will concentrate on the processing part using Hadoop framework and Map-Reduce programming. Map-Reduce can be defined as a special type of programming framework used to process huge amount of data in a distributed framework called commodity hardware. So the article will describe the Map-Reduce concepts and its practical implementation using Java code.

介紹: Hadoop MapReduce can be defined as a software programming framework used to process big volume of data (in terabyte level) in parallel environment of clustered nodes. The cluster consists of thousands of nodes of commodity hardware. The processing is distributed, reliable and fault tolerant. A typical MapReduce job is performed according to the following steps

1) Split the data into independent chunks based on key-value pair. This is done by Map task in a parallel manner.

2) The out put of the Map job is sorted based on the key values.

3) The sorted output is the input to the Reduce job. And then it produces the final output to the processing and returns the result to the client.

注意: Commodity hardware is basically low cost computer systems

MapReduce framework: Apache Hadoop MapReduce framework is written in Java. The framework consists of master-slave configuration. The master is known as JobTracker and the slaves are known as TaskTrackers.The master controls the task processed on the slaves (which are nothing but the nodes in a cluster). The computation is done on the slaves. So the compute and storages nodes are the same in a clustered environment. The concept is ‘ move the computation to the nodes where the data is stored’, and it makes the processing faster.

MapReduce Processing: The MapReduce framework model is very light weight. So the cost of hardware is low compared to other frameworks. But at the same time we should understand that the model works efficiently only in a distributed environment as the processing is done on nodes where the data resides. The other features like scalability, reliability and fault tolerance also works well on distributed environment.

MapReduce implementation: Now we will discuss about the implementation of MapReduce model using Java programming platform. Following are the different components of the entire end to end implementation.

  • client program which is the driver class and initiates the process
  • Map function which performs the split using the key-value pair.
  • Reduce function which aggregate the processed data and send the output back to the client.

Driver class: Following is a driver class which binds the Map and Reduce function and starts the processing. This is the client program which initiates the process.

清單1: The client program (driver class) initiating the process

[碼]

package com.mapreduce.techalpine;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

進口org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**

* @author kaushik pal

*

* This is the main driver class to initiate the map-reduce

* process. It sets the back ground for the entire process and

* Then starts it.

*/

public class DevXDriver {

公共靜態無效的主要(串[] 參數) 拋出異常 {

// Initiate configuration

Configuration configx = new Configuration();

// Add resource files

configx.addResource(new Path(“/user/hadoop/core-site.xml”));

configx.addResource(new Path(“/user/hadoop/hdfs-site.xml”));

// Create MapReduce job

Job devxmapjob = new Job(configx,”DevXDriver.class”);

devxmapjob.setJarByClass(DevXDriver.class);

devxmapjob.setJobName(“DevX MapReduce Job”);

// Set output kay and value class

devxmapjob.setOutputKeyClass(Text.class);

devxmapjob.setOutputValueClass(Text.class);

// Set Map class

devxmapjob.setMapperClass(DevXMap.class);

// Set Combiner class

devxmapjob.setCombinerClass(DevXReducer.class);

// Set Reducer class

devxmapjob.setReducerClass(DevXReducer.class);

// Set Map output key and value classes

devxmapjob.setMapOutputKeyClass(Text.class);

devxmapjob.setMapOutputValueClass(Text.class);

// Set number of reducer tasks

devxmapjob.setNumReduceTasks(10);

// Set input and output format classes

devxmapjob.setInputFormatClass(TextInputFormat.class);

devxmapjob.setOutputFormatClass(TextOutputFormat.class);

// Set input and output path

FileInputFormat.addInputPath(devxmapjob, new Path(“/user/map_reduce/input/”));

FileOutputFormat.setOutputPath(devxmapjob,new Path(“/user/map_reduce/output”));

// Start MapReduce job

devxmapjob.waitForCompletion(真);

}

}

[/碼]

Map function: This is responsible for splitting the data based on the key-value pair. This is known as mapping of data.

Listing2: This is a Map function splitting the data into chunks

[碼]

package com.mapreduce.techalpine;

import java.io.BufferedReader;

import java.io.InputStreamReader;

import java.net.URI;

import java.util.StringTokenizer;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

進口org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

/**

* @author kaushik pal

*

* This is the map process. It does the mapping for keyword-value pair.

*/

public class DevXMap extends Mapper<LongWritable, 文本, 文本,文本> {

// Create Path, BufferedReader and Text variables

Path file_path;

BufferedReader buffer_reader;

Text tweet_values = new Text();

/**

* @param key

* @param value

* @param context

*/

public void map(LongWritable關鍵, 文本值, 上下文的背景下) {

嘗試{

// Create configuration for Map

Configuration map_config = new Configuration();

// Load Hadoop core files in configuration

map_config.addResource(new Path(“/user/hadoop/core-site.xml”));

map_config.addResource(new Path(“/user/hadoop/hdfs-site.xml”));

// Create variables

String searchkeyword = “”;

// Open file from the file path

file_path=new Path(“files/repository/keys.txt”);

FileSystem file_system = FileSystem.get(URI.create(“files/repository/keys.txt”),new Configuration());

// Load buffer reader

buffer_reader=new BufferedReader(new InputStreamReader(file_system.open(file_path)));

while(buffer_reader.ready())

{

searchkeyword=buffer_reader.readLine().trim();

}

// Get key value

final Text key_value = new Text(searchkeyword);

// Check value and take decision

如果(value == null)

{

return;

}

其他{

StringTokenizer string_tokens = new StringTokenizer(value.toString(),”,”);

int count = 0;

while(string_tokens.hasMoreTokens()) {

count ++;

如果(count <=1)

continue;

String new_tweet_value = string_tokens.nextToken().toLowerCase().trim().replaceAll(“\\*”,””);

如果(new_tweet_value.contains(searchkeyword.toLowerCase().trim())) {

tweet_values.set(new_tweet_value);

context.write(key_value,tweet_values);

}

}

}

}

抓(例外五){

e.printStackTrace();

}

}

}

[/碼]

Reduce function: This is responsible for aggregating the data. The aggregation is done based on the key values. So after processing and sorting the aggregation completed and sends the result back to the client program.

Listing3: The Reduce function aggregates the processed data

[碼]

package com.mapreduce.techalpine;

import java.io.BufferedReader;

進口java.io.IOException異常;

import java.io.InputStreamReader;

import java.net.URI;

import java.util.RandomAccess;

import java.util.regex.Matcher;

import java.util.regex.Pattern;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

進口org.apache.hadoop.io.Text;

進口org.apache.hadoop.mapreduce.Reducer;

/**

* @author kaushik pal

*

* This is the reducer function. It aggregates the output based on the

* sorting of key-value pairs.

*/

public class DevXReducer extends Reducer<文本 ,文本,文本,文本>

{

// Create variables for file path

Path positive_file_path;

Path negative_file_path;

Path output_file_path;

Path keyword_file_path;

// Create variables for buffer

BufferedReader positive_buff_reader;

BufferedReader negative_buff_reader;

BufferedReader keyword_buff_reader;

// Create variables for calculation

static Double total_record_count=new Double(“0”);

static Double count_neg=new Double(“0”);

static Double count_pos=new Double(“0”);

static Double count_neu=new Double(“0”);

static Double percent_neg=new Double(“0”);

static Double percent_pos=new Double(“0”);

static Double percent_neu=new Double(“0”);

Pattern pattrn_matcher;

Matcher matcher_txt;

static int new_row=0;

FSDataOutputStream out_1st,out_2nd;

/**

* @param key

* @param values

* @param context

* @throws IOException

* @throws InterruptedException

*/

public void reduce(Text key, 可迭代<文本> values,上下文的背景下) 拋出IOException異常, InterruptedException的

{

// Create configuration for reducer

Configuration reduce_config = new Configuration();

// Load hadoop config files

reduce_config.addResource(new Path(“/user/hadoop/core-site.xml”));

reduce_config.addResource(new Path(“/user/hadoop/hdfs-site.xml”));

// Create variables

String key_word = “”;

String check_keyword=key_word;

keyword_file_path=new Path(“files/repository/keys.txt”);

FileSystem file_system_read = FileSystem.get(URI.create(“files/repository/keys.txt”),new Configuration());

keyword_buff_reader=new BufferedReader(new InputStreamReader(file_system_read.open(keyword_file_path)));

FileSystem get_filesys = FileSystem.get(reduce_config);

FileSystem get_filesys_posneg = FileSystem.get(reduce_config);

Path path_output = new Path(“/user/sentiment_output_file.txt”);

Path path_output_posneg = new Path(“/user/posneg_output_file.txt”);

// Get keyword

while(keyword_buff_reader.ready())

{

key_word=keyword_buff_reader.readLine().trim();

}

// Check file system

如果 (!get_filesys.exists(path_output)) {

out_1st = get_filesys.create(path_output);

out_2nd = get_filesys_posneg.create(path_output_posneg);

}

// Check keyword matching using positive and negative dictionaries

如果(check_keyword.equals(key.toString().toLowerCase()))

{

為(Text new_tweets:values)

{

// Load positive word dictionary

positive_file_path=new Path(“/user/map_reduce/pos_words.txt”);

FileSystem filesystem_one = FileSystem.get(URI.create(“files/pos_words.txt”),new Configuration());

positive_buff_reader=new BufferedReader(new InputStreamReader(filesystem_one.open(positive_file_path)));

// Load negative word disctinary

negative_file_path = new Path(“/user/map_reduce/neg_words.txt”);

FileSystem filesystem_two = FileSystem.get(URI.create(“files/neg_words.txt”),new Configuration());

negative_buff_reader =new BufferedReader(new InputStreamReader(filesystem_two.open(negative_file_path)));

++total_record_count;

boolean first_flag=false;

boolean second_flag=false;

String all_tweets=new_tweets.toString();

String first_regex = “”;

String second_regex = “”;

while(positive_buff_reader.ready())

{

first_regex=positive_buff_reader.readLine().trim();

new_row ;

pattrn_matcher = Pattern.compile(first_regex, Pattern.CASE_INSENSITIVE);

matcher_txt = pattrn_matcher.matcher(all_tweets);

first_flag=matcher_txt.find();

 

如果(first_flag)

{

out_2nd.writeBytes(all_tweets);

context.write(new Text(first_regex),new Text(all_tweets));

break;

}

}

while(negative_buff_reader.ready())

{

new_row ;

second_regex=negative_buff_reader.readLine().trim();

pattrn_matcher = Pattern.compile(second_regex, Pattern.CASE_INSENSITIVE);

matcher_txt = pattrn_matcher.matcher(all_tweets);

second_flag=matcher_txt.find();

如果(second_flag)

{

out_2nd.writeBytes(all_tweets);

context.write(new Text(second_regex),new Text(all_tweets));

break;

}

}

如果(first_flag&second_flag)

{

++count_neu;

}

其他

{

如果(first_flag)

{

++count_pos;

}

如果(second_flag)

{

++count_neg;

}

如果(first_flag==false&second_flag==false)

{

++count_neu;

}

}

// Close buffers

negative_buff_reader.close();

positive_buff_reader.close();

}

// Calculate percent values

percent_pos=count_pos/total_record_count*100;

percent_neg=count_neg/total_record_count*100;

percent_neu=count_neu/total_record_count*100;

嘗試{

// Write to the files

out_1st.writeBytes(“\Ň”+key_word);

out_1st.writeBytes(“,”+total_record_count);

out_1st.writeBytes(“,”+percent_neg);

out_1st.writeBytes(“,”+percent_pos);

out_1st.writeBytes(“,”+percent_neu);

// Close file systems

out_1st.close();

get_filesys.close();

}抓(例外五){

e.printStackTrace();

}

}

}

}

[/碼]

結論: In this article I have discussed the MapReduce processing using Java programming environment. The different component like Map and Reduce function perform the main task and returns the output to the client. The processing performs efficiently on distributed environment only. So we should set up the Apache Hadoop framework on a distributed environment to get the best result.

Hope you have enjoyed the article and you will be able to implement it in your practical programming. Keep reading.

標籤:
============================================= ============================================== 在亞馬遜上購買最佳技術書籍,en,電工CT Chestnutelectric,en
============================================== ---------------------------------------------------------------- electrician ct chestnutelectric
error

Enjoy this blog? Please spread the word :)

Follow by Email
LinkedIn
LinkedIn
Share