Steps to write Map Reduce program using Hadoop framework and java

MapReduce

MapReduce

Επισκόπηση: Big data is a new paradigm and its processing is the most important area to concentrate. Big data refers to the big volume of data which comes from different sources like social media, sensor data, mobile data, internet data and many more. In this article we will concentrate on the processing part using Hadoop framework and Map-Reduce programming. Map-Reduce can be defined as a special type of programming framework used to process huge amount of data in a distributed framework called commodity hardware. So the article will describe the Map-Reduce concepts and its practical implementation using Java code.

Εισαγωγή: Hadoop MapReduce can be defined as a software programming framework used to process big volume of data (in terabyte level) in parallel environment of clustered nodes. The cluster consists of thousands of nodes of commodity hardware. The processing is distributed, reliable and fault tolerant. A typical MapReduce job is performed according to the following steps

1) Split the data into independent chunks based on key-value pair. This is done by Map task in a parallel manner.

2) The out put of the Map job is sorted based on the key values.

3) The sorted output is the input to the Reduce job. And then it produces the final output to the processing and returns the result to the client.

Note: Commodity hardware is basically low cost computer systems

MapReduce framework: Apache Hadoop MapReduce framework is written in Java. The framework consists of master-slave configuration. The master is known as JobTracker and the slaves are known as TaskTrackers.The master controls the task processed on the slaves (which are nothing but the nodes in a cluster). The computation is done on the slaves. So the compute and storages nodes are the same in a clustered environment. The concept is ‘ move the computation to the nodes where the data is stored’, and it makes the processing faster.

MapReduce Processing: The MapReduce framework model is very light weight. So the cost of hardware is low compared to other frameworks. But at the same time we should understand that the model works efficiently only in a distributed environment as the processing is done on nodes where the data resides. The other features like scalability, reliability and fault tolerance also works well on distributed environment.

MapReduce implementation: Now we will discuss about the implementation of MapReduce model using Java programming platform. Following are the different components of the entire end to end implementation.

  • The client program which is the driver class and initiates the process
  • The Map function which performs the split using the key-value pair.
  • The Reduce function which aggregate the processed data and send the output back to the client.

Driver class: Following is a driver class which binds the Map and Reduce function and starts the processing. This is the client program which initiates the process.

Listing1: The client program (driver class) initiating the process

[κωδικός]

package com.mapreduce.techalpine;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**

* @author kaushik pal

*

* This is the main driver class to initiate the map-reduce

* process. It sets the back ground for the entire process and

* Then starts it.

*/

public class DevXDriver {

δημόσια στατική άκυρη κύρια(Κορδόνι[] args) throws Exception {

// Initiate configuration

Configuration configx = new Configuration();

// Add resource files

configx.addResource(new Path(“/user/hadoop/core-site.xml”));

configx.addResource(new Path(“/user/hadoop/hdfs-site.xml”));

// Create MapReduce job

Job devxmapjob = new Job(configx,”DevXDriver.class”);

devxmapjob.setJarByClass(DevXDriver.class);

devxmapjob.setJobName(“DevX MapReduce Job”);

// Set output kay and value class

devxmapjob.setOutputKeyClass(Text.class);

devxmapjob.setOutputValueClass(Text.class);

// Set Map class

devxmapjob.setMapperClass(DevXMap.class);

// Set Combiner class

devxmapjob.setCombinerClass(DevXReducer.class);

// Set Reducer class

devxmapjob.setReducerClass(DevXReducer.class);

// Set Map output key and value classes

devxmapjob.setMapOutputKeyClass(Text.class);

devxmapjob.setMapOutputValueClass(Text.class);

// Set number of reducer tasks

devxmapjob.setNumReduceTasks(10);

// Set input and output format classes

devxmapjob.setInputFormatClass(TextInputFormat.class);

devxmapjob.setOutputFormatClass(TextOutputFormat.class);

// Set input and output path

FileInputFormat.addInputPath(devxmapjob, new Path(“/user/map_reduce/input/”));

FileOutputFormat.setOutputPath(devxmapjob,new Path(“/user/map_reduce/output”));

// Start MapReduce job

devxmapjob.waitForCompletion(αληθής);

}

}

[/κωδικός]

Map function: This is responsible for splitting the data based on the key-value pair. This is known as mapping of data.

Listing2: This is a Map function splitting the data into chunks

[κωδικός]

package com.mapreduce.techalpine;

import java.io.BufferedReader;

import java.io.InputStreamReader;

import java.net.URI;

import java.util.StringTokenizer;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

/**

* @author kaushik pal

*

* This is the map process. It does the mapping for keyword-value pair.

*/

public class DevXMap extends Mapper<LongWritable, Text, Text,Text> {

// Create Path, BufferedReader and Text variables

Path file_path;

BufferedReader buffer_reader;

Text tweet_values = new Text();

/**

* @param key

* @param value

* @param context

*/

public void map(LongWritable key, Text value, Context context) {

προσπαθώ{

// Create configuration for Map

Configuration map_config = new Configuration();

// Load Hadoop core files in configuration

map_config.addResource(new Path(“/user/hadoop/core-site.xml”));

map_config.addResource(new Path(“/user/hadoop/hdfs-site.xml”));

// Create variables

String searchkeyword = “”;

// Open file from the file path

file_path=new Path(“files/repository/keys.txt”);

FileSystem file_system = FileSystem.get(URI.create(“files/repository/keys.txt”),new Configuration());

// Load buffer reader

buffer_reader=new BufferedReader(new InputStreamReader(file_system.open(file_path)));

ενώ(buffer_reader.ready())

{

searchkeyword=buffer_reader.readLine().trim();

}

// Get key value

final Text key_value = new Text(searchkeyword);

// Check value and take decision

αν(value == null)

{

return;

}

αλλού{

StringTokenizer string_tokens = new StringTokenizer(value.toString(),”,”);

int count = 0;

ενώ(string_tokens.hasMoreTokens()) {

count ++;

αν(count <=1)

continue;

String new_tweet_value = string_tokens.nextToken().toLowerCase().trim().replaceAll(“\\*”,””);

αν(new_tweet_value.contains(searchkeyword.toLowerCase().trim())) {

tweet_values.set(new_tweet_value);

context.write(key_value,tweet_values);

}

}

}

}

σύλληψη(Exception e){

e.printStackTrace();

}

}

}

[/κωδικός]

Reduce function: This is responsible for aggregating the data. The aggregation is done based on the key values. So after processing and sorting the aggregation completed and sends the result back to the client program.

Listing3: The Reduce function aggregates the processed data

[κωδικός]

package com.mapreduce.techalpine;

import java.io.BufferedReader;

java.io.IOException εισαγωγής;

import java.io.InputStreamReader;

import java.net.URI;

import java.util.RandomAccess;

import java.util.regex.Matcher;

import java.util.regex.Pattern;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

/**

* @author kaushik pal

*

* This is the reducer function. It aggregates the output based on the

* sorting of key-value pairs.

*/

public class DevXReducer extends Reducer<Text ,Text,Text,Text>

{

// Create variables for file path

Path positive_file_path;

Path negative_file_path;

Path output_file_path;

Path keyword_file_path;

// Create variables for buffer

BufferedReader positive_buff_reader;

BufferedReader negative_buff_reader;

BufferedReader keyword_buff_reader;

// Create variables for calculation

static Double total_record_count=new Double(“0”);

static Double count_neg=new Double(“0”);

static Double count_pos=new Double(“0”);

static Double count_neu=new Double(“0”);

static Double percent_neg=new Double(“0”);

static Double percent_pos=new Double(“0”);

static Double percent_neu=new Double(“0”);

Pattern pattrn_matcher;

Matcher matcher_txt;

static int new_row=0;

FSDataOutputStream out_1st,out_2nd;

/**

* @param key

* @param values

* @param context

* @throws IOException

* @throws InterruptedException

*/

public void reduce(Text key, Iterable<Text> αξίες,Context context) throws IOException, InterruptedException

{

// Create configuration for reducer

Configuration reduce_config = new Configuration();

// Load hadoop config files

reduce_config.addResource(new Path(“/user/hadoop/core-site.xml”));

reduce_config.addResource(new Path(“/user/hadoop/hdfs-site.xml”));

// Create variables

String key_word = “”;

String check_keyword=key_word;

keyword_file_path=new Path(“files/repository/keys.txt”);

FileSystem file_system_read = FileSystem.get(URI.create(“files/repository/keys.txt”),new Configuration());

keyword_buff_reader=new BufferedReader(new InputStreamReader(file_system_read.open(keyword_file_path)));

FileSystem get_filesys = FileSystem.get(reduce_config);

FileSystem get_filesys_posneg = FileSystem.get(reduce_config);

Path path_output = new Path(“/user/sentiment_output_file.txt”);

Path path_output_posneg = new Path(“/user/posneg_output_file.txt”);

// Get keyword

ενώ(keyword_buff_reader.ready())

{

key_word=keyword_buff_reader.readLine().trim();

}

// Check file system

αν (!get_filesys.exists(path_output)) {

out_1st = get_filesys.create(path_output);

out_2nd = get_filesys_posneg.create(path_output_posneg);

}

// Check keyword matching using positive and negative dictionaries

αν(check_keyword.equals(key.toString().toLowerCase()))

{

για(Text new_tweets:αξίες)

{

// Load positive word dictionary

positive_file_path=new Path(“/user/map_reduce/pos_words.txt”);

FileSystem filesystem_one = FileSystem.get(URI.create(“files/pos_words.txt”),new Configuration());

positive_buff_reader=new BufferedReader(new InputStreamReader(filesystem_one.open(positive_file_path)));

// Load negative word disctinary

negative_file_path = new Path(“/user/map_reduce/neg_words.txt”);

FileSystem filesystem_two = FileSystem.get(URI.create(“αρχεία / neg_words.txt”),new Configuration());

negative_buff_reader = νέα BufferedReader(new InputStreamReader(filesystem_two.open(negative_file_path)));

++total_record_count;

boolean first_flag = false;

boolean second_flag = false;

String all_tweets = new_tweets.toString();

String first_regex = “”;

String second_regex = “”;

ενώ(positive_buff_reader.ready())

{

first_regex = positive_buff_reader.readLine().trim();

new_row ;

pattrn_matcher = Pattern.compile(first_regex, Pattern.CASE_INSENSITIVE);

matcher_txt = pattrn_matcher.matcher(all_tweets);

first_flag = matcher_txt.find();

 

αν(first_flag)

{

out_2nd.writeBytes(all_tweets);

context.write(νέο κείμενο(first_regex),νέο κείμενο(all_tweets));

break;

}

}

ενώ(negative_buff_reader.ready())

{

new_row ;

second_regex = negative_buff_reader.readLine().trim();

pattrn_matcher = Pattern.compile(second_regex, Pattern.CASE_INSENSITIVE);

matcher_txt = pattrn_matcher.matcher(all_tweets);

second_flag = matcher_txt.find();

αν(second_flag)

{

out_2nd.writeBytes(all_tweets);

context.write(νέο κείμενο(second_regex),νέο κείμενο(all_tweets));

break;

}

}

αν(first_flag&second_flag)

{

++count_neu;

}

αλλού

{

αν(first_flag)

{

++count_pos;

}

αν(second_flag)

{

++count_neg;

}

αν(first_flag == false&second_flag == false)

{

++count_neu;

}

}

// Κλείστε προσκρουστήρες

negative_buff_reader.close();

positive_buff_reader.close();

}

// Υπολογίστε τοις εκατό τιμές

percent_pos = count_pos / total_record_count * 100;

percent_neg = count_neg / total_record_count * 100;

percent_neu = count_neu / total_record_count * 100;

προσπαθώ{

// Γράψτε στα αρχεία

out_1st.writeBytes(“\n”+key_word);

out_1st.writeBytes(“,”+total_record_count);

out_1st.writeBytes(“,”+percent_neg);

out_1st.writeBytes(“,”+percent_pos);

out_1st.writeBytes(“,”+percent_neu);

// Κλείστε τα συστήματα αρχείων

out_1st.close();

get_filesys.close();

}σύλληψη(Exception e){

e.printStackTrace();

}

}

}

}

[/κωδικός]

Συμπέρασμα: Σε αυτό το άρθρο έχω συζητήσει με την επεξεργασία MapReduce χρησιμοποιώντας το περιβάλλον προγραμματισμού Java. Το διαφορετικό στοιχείο σαν χάρτης και Μειώστε τη λειτουργία εκτελέσει το κύριο καθήκον και επιστρέφει την έξοδο προς τον πελάτη. Η επεξεργασία εκτελεί αποτελεσματικά σε κατανεμημένο περιβάλλον μόνο. Γι 'αυτό και θα πρέπει να δημιουργήσει το πλαίσιο Apache Hadoop σε ένα κατανεμημένο περιβάλλον για να πάρει το καλύτερο αποτέλεσμα.

Ελπίζουμε να έχουν απολαύσει το άρθρο και θα είστε σε θέση να την εφαρμόσουν στην πράξη τον προγραμματισμό σας. Keep reading.

Tagged on: ,
============================================= ============================================== Buy best TechAlpine Books on Amazon
============================================== ---------------------------------------------------------------- electrician ct chestnutelectric
error

Enjoy this blog? Please spread the word :)

Follow by Email
LinkedIn
LinkedIn
Share