Steps to write Map Reduce program using Hadoop framework and java

MapReduce

MapReduce

Visión de conjunto: Big data es un nuevo paradigma y su procesamiento es el área más importante concentrarse. grandes volúmenes de datos se refiere a la gran cantidad de datos que proviene de diversas fuentes como las redes sociales, datos del sensor, datos móviles, de datos de Internet y muchos más. En este artículo nos concentraremos en la parte de procesamiento utilizando el framework Hadoop y Mapa-Reducir la programación. Map-Reduce puede definirse como un tipo especial de marco de programación utilizado para procesar gran cantidad de datos en un marco llamado hardware comercial distribuida. Por lo que el artículo se describen los Mapa-Reducir los conceptos y su aplicación práctica utilizando código Java.

Introducción: Hadoop MapReduce puede ser definida como un marco de programación de software utilizado para procesar gran volumen de datos (en el nivel terabyte) en entorno paralelo de nodos agrupados. El clúster se compone de miles de nodos de hardware de consumo. El procesamiento se distribuye, tolerante a fallos y fiable. Un trabajo MapReduce típico se realiza de acuerdo a los siguientes pasos

1) Dividir los datos en bloques independientes con base en par clave-valor. Esto se hace mediante la tarea Mapa de una manera paralela.

2) El hacia fuera puesto de trabajo Mapa se ordenan en función de los valores clave.

3) La salida clasificada es la entrada al puesto de trabajo Reducir. Y entonces se produce la salida final al tratamiento y devuelve el resultado al cliente.

Nota: hardware básico es básicamente sistemas informáticos de bajo coste

marco MapReduce: Hadoop MapReduce marco Apache está escrito en Java. El marco consiste en configuración maestro-esclavo. El maestro se conoce como JobTracker y son conocidos como los esclavos TaskTrackers.The maestro controla la tarea procesada en los esclavos (que son nada más que los nodos de un clúster). El cálculo se realiza sobre los esclavos. Por lo que los nodos de cómputo y almacenes son los mismos en un entorno agrupado. El concepto es ‘ mover el cálculo de los nodos en los que los datos se almacenan ', y hace que el procesamiento más rápido.

Procesamiento de MapReduce: El modelo de marco MapReduce es muy ligero. Por lo que el costo del hardware es baja en comparación con otros marcos. Pero al mismo tiempo debemos entender que el modelo funciona de manera eficiente sólo en un entorno distribuido como el procesamiento se realiza en los nodos de datos en el que reside. Las otras características como la escalabilidad, fiabilidad y tolerancia a fallos también funciona bien en el entorno distribuido.

implementación de MapReduce: Ahora vamos a discutir sobre la aplicación del modelo MapReduce usando la plataforma de programación Java. Los siguientes son los diferentes componentes de todo el extremo para poner fin a la ejecución.

  • La programa cliente que es la clase del controlador e inicia el proceso de
  • La función de mapa los cuales realiza la división utilizando el par clave-valor.
  • La Reducir la función el que agregar los datos procesados ​​y enviar el resultado al cliente.

clase del controlador: Lo que sigue es una clase de controlador que se une el mapa y la función Reducir e inicia el procesamiento. Este es el programa cliente que inicia el proceso.

Listing1: El programa de cliente (clase del controlador) iniciar el proceso de

[código]

com.mapreduce.techalpine paquete;

importación org.apache.commons.logging.Log;

importación org.apache.commons.logging.LogFactory;

importación org.apache.hadoop.conf.Configuration;

importación org.apache.hadoop.fs.Path;

importación org.apache.hadoop.io.Text;

org.apache.hadoop.mapreduce.Job importación;

org.apache.hadoop.mapreduce.lib.input.FileInputFormat importación;

org.apache.hadoop.mapreduce.lib.input.TextInputFormat importación;

org.apache.hadoop.mapreduce.lib.output.FileOutputFormat importación;

org.apache.hadoop.mapreduce.lib.output.TextOutputFormat importación;

/**

* @author kaushik pal

*

* Esta es la clase principal motor para iniciar el mapa-a reducir

* process. Se coloca la parte de atrás de todo el proceso y

* A continuación, lo inicia.

*/

DevXDriver clase pública {

public static void main(Cadena[] args) throws Exception {

// iniciar la configuración

configX = new Configuration();

// Añadir archivos de recursos

configx.addResource(nuevo camino(“/usuario / hadoop / núcleo-site.xml”));

configx.addResource(nuevo camino(“/usuario / hadoop / hdfs-site.xml”));

// Crear trabajo MapReduce

devxmapjob Trabajo = trabajo nuevo(configX,”DevXDriver.class”);

devxmapjob.setJarByClass(DevXDriver.class);

devxmapjob.setJobName(“DevX MapReduce Trabajo”);

// Ajuste de salida Kay y clase de valor

devxmapjob.setOutputKeyClass(Text.class);

devxmapjob.setOutputValueClass(Text.class);

// Conjunto de clases Mapa

devxmapjob.setMapperClass(DevXMap.class);

// Conjunto de clase combinador

devxmapjob.setCombinerClass(DevXReducer.class);

// Reductor conjunto de clases

devxmapjob.setReducerClass(DevXReducer.class);

// Establecer el mapa de claves producción y el valor clases

devxmapjob.setMapOutputKeyClass(Text.class);

devxmapjob.setMapOutputValueClass(Text.class);

// Establecer el número de tareas reductor

devxmapjob.setNumReduceTasks(10);

// Conjunto de entrada y salida clases de formato

devxmapjob.setInputFormatClass(TextInputFormat.class);

devxmapjob.setOutputFormatClass(TextOutputFormat.class);

// Conjunto de entrada y salida de ruta

FileInputFormat.addInputPath(devxmapjob, nuevo camino(“/usuario / map_reduce / input /”));

FileOutputFormat.setOutputPath(devxmapjob,nuevo camino(“/usuario / map_reduce / salida”));

// Iniciar el trabajo de MapReduce

devxmapjob.waitForCompletion(verdadero);

}

}

[/código]

función de mapa: Este es el responsable de la división de los datos basado en el par clave-valor. Esto se conoce como el mapeo de los datos.

LISTING2: Esta es una función Mapa dividiendo los datos en trozos

[código]

com.mapreduce.techalpine paquete;

importación java.io.BufferedReader;

importación java.io.InputStreamReader;

importación java.net.URI;

importación java.util.StringTokenizer;

importación org.apache.commons.logging.Log;

importación org.apache.commons.logging.LogFactory;

importación org.apache.hadoop.conf.Configuration;

importación org.apache.hadoop.fs.FileSystem;

importación org.apache.hadoop.fs.Path;

importación org.apache.hadoop.io.LongWritable;

importación org.apache.hadoop.io.Text;

importación org.apache.hadoop.mapreduce.Mapper;

/**

* @author kaushik pal

*

* Este es el proceso de la hoja. Se hace el mapeo de la palabra clave-valor par.

*/

DevXMap clase pública se extiende Mapper<LongWritable, Texto, Texto,Texto> {

// crear trayectoria, las variables BufferedReader y texto

vía_acceso_archivo ruta;

buffer_reader BufferedReader;

tweet_values ​​texto = texto nuevo();

/**

* @param clave

* valor @param

* @param contexto

*/

mapa public void(clave LongWritable, valor de texto, contexto contexto) {

tratar de{

// Crear configuración de mapa

map_config = new Configuration();

// Cargar archivos de núcleo en la configuración de Hadoop

map_config.addResource(nuevo camino(“/usuario / hadoop / núcleo-site.xml”));

map_config.addResource(nuevo camino(“/usuario / hadoop / hdfs-site.xml”));

// crear variables

Cadena BúsquedaPalabra = “”;

// Abrir el archivo de la ruta del archivo

vía_acceso_archivo = nuevo camino(“archivos / repositorio / keys.txt”);

FileSystem sistema_archivos = FileSystem.get(URI.create(“archivos / repositorio / keys.txt”),nueva configuración());

// lector de tampón de carga

buffer_reader = new BufferedReader(nueva InputStreamReader(file_system.open(file_path)));

while(buffer_reader.ready())

{

BúsquedaPalabra = buffer_reader.readLine().recortar();

}

// Obtener valor de la clave

última key_value texto = texto nuevo(buscar la palabra clave);

// Compruebe el valor y tomar decisiones

si(valor == null)

{

return;

}

else{

StringTokenizer string_tokens = new StringTokenizer(value.toString(),”,”);

int cuenta = 0;

while(string_tokens.hasMoreTokens()) {

count ++;

si(count <= 1)

continuar;

Cadena new_tweet_value = string_tokens.nextToken().toLowerCase().recortar().reemplaza todo(“\\*”,””);

si(new_tweet_value.contains(searchkeyword.toLowerCase().recortar())) {

tweet_values.set(new_tweet_value);

context.write(valor clave,tweet_values);

}

}

}

}

coger(excepción de correo){

e.printStackTrace();

}

}

}

[/código]

Reducir la función: Este es el responsable de la agregación de los datos. La agregación se realiza en base a los valores de las claves. Así que después de procesamiento y clasificación de la agregación completado y envía el resultado al programa cliente.

Listing3: Reducir la función suma los datos de procesado

[código]

com.mapreduce.techalpine paquete;

importación java.io.BufferedReader;

importación java.io.IOException;

importación java.io.InputStreamReader;

importación java.net.URI;

importación java.util.RandomAccess;

importación java.util.regex.Matcher;

importación java.util.regex.Pattern;

importación org.apache.commons.logging.Log;

importación org.apache.commons.logging.LogFactory;

importación org.apache.hadoop.conf.Configuration;

importación org.apache.hadoop.fs.FSDataOutputStream;

importación org.apache.hadoop.fs.FileSystem;

importación org.apache.hadoop.fs.Path;

importación org.apache.hadoop.io.Text;

importación org.apache.hadoop.mapreduce.Reducer;

/**

* @author kaushik pal

*

* Esta es la función de reductor. Se agrega la salida basándose en el

* la clasificación de los pares de valores clave.

*/

DevXReducer clase pública se extiende Reductor<Texto ,Texto,Texto,Texto>

{

// Creación de variables para la ruta del archivo

positive_file_path ruta;

negative_file_path ruta;

output_file_path ruta;

keyword_file_path ruta;

// Creación de variables para búfer

positive_buff_reader BufferedReader;

negative_buff_reader BufferedReader;

BufferedReader keyword_buff_reader;

// Creación de variables para el cálculo

total_record_count Doble Doble estática = new(“0”);

count_neg Doble Doble estática = new(“0”);

count_pos dobles estáticas = new Doble(“0”);

count_neu Doble Doble estática = new(“0”);

percent_neg Doble Doble estática = new(“0”);

percent_pos dobles estáticas = new Doble(“0”);

percent_neu Doble Doble estática = new(“0”);

pattrn_matcher patrón;

partidos matcher_txt;

static int NEW_ROW = 0;

FSDataOutputStream out_1st,out_2nd;

/**

* @param clave

* Los valores @param

* @param contexto

* @throws IOException

* @throws InterruptedException

*/

public void reducir(clave de texto, iterable<Texto> values,contexto contexto) throws IOException, InterruptedException

{

// Crear configuración para reductor

reduce_config = new Configuration();

// Cargar archivos de configuración de Hadoop

reduce_config.addResource(nuevo camino(“/usuario / hadoop / núcleo-site.xml”));

reduce_config.addResource(nuevo camino(“/usuario / hadoop / hdfs-site.xml”));

// crear variables

Cadena key_word = “”;

Cadena check_keyword = key_word;

keyword_file_path = nuevo camino(“archivos / repositorio / keys.txt”);

FileSystem file_system_read = FileSystem.get(URI.create(“archivos / repositorio / keys.txt”),nueva configuración());

keyword_buff_reader = new BufferedReader(nueva InputStreamReader(file_system_read.open(keyword_file_path)));

FileSystem get_filesys = FileSystem.get(reduce_config);

FileSystem get_filesys_posneg = FileSystem.get(reduce_config);

Path = path_output nueva ruta(“/usuario / sentiment_output_file.txt”);

Path = path_output_posneg nueva ruta(“/usuario / posneg_output_file.txt”);

// Obtener palabra clave

while(keyword_buff_reader.ready())

{

key_word = keyword_buff_reader.readLine().recortar();

}

// Compruebe el sistema de archivos

si (!get_filesys.exists(path_output)) {

out_1st = get_filesys.create(path_output);

out_2nd = get_filesys_posneg.create(path_output_posneg);

}

// Compruebe concordancia de palabras clave usando los diccionarios positivos y negativos

si(check_keyword.equals(key.toString().toLowerCase()))

{

para(new_tweets de texto:values)

{

// Cargar el diccionario de la palabra positiva

positive_file_path = nuevo camino(“/usuario / map_reduce / pos_words.txt”);

FileSystem filesystem_one = FileSystem.get(URI.create(“archivos / pos_words.txt”),nueva configuración());

positive_buff_reader = new BufferedReader(nueva InputStreamReader(filesystem_one.open(positive_file_path)));

// Cargar palabra disctinary negativo

negative_file_path = nuevo camino(“/usuario / map_reduce / neg_words.txt”);

FileSystem filesystem_two = FileSystem.get(URI.create(“archivos / neg_words.txt”),nueva configuración());

negative_buff_reader = new BufferedReader(nueva InputStreamReader(filesystem_two.open(negative_file_path)));

++total_record_count;

first_flag = false;

second_flag = false;

Cadena all_tweets = new_tweets.toString();

Cadena first_regex = “”;

Cadena second_regex = “”;

while(positive_buff_reader.ready())

{

first_regex = positive_buff_reader.readLine().recortar();

NEW_ROW ;

pattrn_matcher = Pattern.compile(first_regex, Pattern.CASE_INSENSITIVE);

matcher_txt = pattrn_matcher.matcher(all_tweets);

first_flag = matcher_txt.find();

 

si(first_flag)

{

out_2nd.writeBytes(all_tweets);

context.write(Texto nuevo(first_regex),Texto nuevo(all_tweets));

descanso;

}

}

while(negative_buff_reader.ready())

{

NEW_ROW ;

second_regex = negative_buff_reader.readLine().recortar();

pattrn_matcher = Pattern.compile(second_regex, Pattern.CASE_INSENSITIVE);

matcher_txt = pattrn_matcher.matcher(all_tweets);

second_flag = matcher_txt.find();

si(second_flag)

{

out_2nd.writeBytes(all_tweets);

context.write(Texto nuevo(second_regex),Texto nuevo(all_tweets));

descanso;

}

}

si(first_flag&second_flag)

{

++count_neu;

}

else

{

si(first_flag)

{

++count_pos;

}

si(second_flag)

{

++count_neg;

}

si(first_flag == false&second_flag == false)

{

++count_neu;

}

}

// Cerrar tampones

negative_buff_reader.close();

positive_buff_reader.close();

}

// Calcular los valores de porcentaje

percent_pos = count_pos / total_record_count * 100;

percent_neg = count_neg / total_record_count * 100;

percent_neu = count_neu / total_record_count * 100;

tratar de{

// Escribir en los archivos

out_1st.writeBytes(“\n”+palabra clave);

out_1st.writeBytes(“,”+total_record_count);

out_1st.writeBytes(“,”+percent_neg);

out_1st.writeBytes(“,”+percent_pos);

out_1st.writeBytes(“,”+percent_neu);

// sistemas de archivos Cerrar

out_1st.close();

get_filesys.close();

}coger(excepción de correo){

e.printStackTrace();

}

}

}

}

[/código]

Conclusión: En este artículo he discutido el procesamiento MapReduce usando entorno de programación Java. El componente diferente como Mapa y reducir la función de realizar la tarea principal y devuelve el resultado al cliente. El procesamiento se realiza de manera eficiente en el entorno distribuido solamente. Así que debemos establecer el marco Apache Hadoop en un entorno distribuido para obtener el mejor resultado.

Hope you have enjoyed the article and you will be able to implement it in your practical programming. Keep reading.

Etiquetado en:,
============================================= ============================================== Buy best TechAlpine Books on Amazon
============================================== ---------------------------------------------------------------- electrician ct chestnutelectric
error

Enjoy this blog? Please spread the word :)

Follow by Email
LinkedIn
LinkedIn
Share