Steps to write Map Reduce program using Hadoop framework and java

MapReduce

MapReduce

Visão global: Big data é um novo paradigma e seu processamento é a área mais importante para se concentrar. Big data refere-se ao grande volume de dados que vem de diferentes fontes, como a mídia social, dados do sensor, Dados móveis, dados de internet e muitos mais. Neste artigo, vamos concentrar-se na parte de processamento usando o framework Hadoop e Map-Reduce programação. Map-Reduce pode ser definida como um tipo especial de estrutura de programação usada para processar grande quantidade de dados em um quadro chamado de hardware commodity distribuídos. Assim, o artigo irá descrever as Map-Reduce conceitos e sua aplicação prática utilizando código Java.

Introdução: Hadoop MapReduce pode ser definido como uma estrutura de programação software usado para processar grande volume de dados (em nível terabyte) no ambiente paralelo de nós em cluster. O cluster é composto por milhares de nós de hardware commodity. O processamento é distribuído, tolerante a falhas e fiável. Um trabalho MapReduce típica é realizada de acordo com os seguintes passos

1) Dividir os dados em pedaços independentes baseados em key-value pair. Isso é feito por tarefa Mapa de forma paralela.

2) O posto para fora do trabalho Mapa é classificado com base nos valores-chave.

3) A saída ordenada é a entrada para o trabalho Reduzir. E, em seguida, que produz o resultado final para o processamento e devolve o resultado para o cliente.

Nota: hardware commodity é basicamente sistemas de computadores de baixo custo

quadro MapReduce: framework Apache Hadoop MapReduce é escrito em Java. A estrutura consiste de configuração master-slave. O mestre é conhecido como JobTracker e os escravos são conhecidos como TaskTrackers.The principal controla a tarefa processadas em escravos (que são nada, mas os nós em um cluster). O cálculo é feito sobre os escravos. Assim, os computação e armazenamentos nós são o mesmo em um ambiente de cluster. O conceito é ‘ mover o cálculo para os nódulos onde os dados são armazenados ', e torna o processamento mais rápido.

MapReduce Processing: O modelo de quadro MapReduce é muito leve. Assim, o custo do hardware é baixo quando comparado com outros enquadramentos. Mas, ao mesmo tempo, devemos entender que o modelo funciona de forma eficiente apenas em um ambiente distribuído como o processamento é feito em nós onde reside a dados. As outras características como escalabilidade, tolerância confiabilidade e falha também funciona bem em ambiente distribuído.

implementação MapReduce: Agora vamos discutir sobre a implementação do modelo MapReduce usando a plataforma de programação Java. Seguem-se os diferentes componentes de toda a ponta a ponta implementação.

  • The programa de cliente que é a classe condutor e inicia o processo de
  • The função de mapa que realiza a separação utilizando o par de key-value.
  • The função de reduzir que agrega os dados processados ​​e enviar a saída de volta para o cliente.

classe do driver: Segue-se uma classe de driver que liga o Mapa e função Reduzir e inicia o processamento. Este é o programa de cliente que inicia o processo de.

Listing1: O programa cliente (classe do driver) iniciar o processo de

[código]

com.mapreduce.techalpine pacote;

importação org.apache.commons.logging.Log;

importação org.apache.commons.logging.LogFactory;

importação org.apache.hadoop.conf.Configuration;

importação org.apache.hadoop.fs.Path;

importação org.apache.hadoop.io.Text;

org.apache.hadoop.mapreduce.Job importação;

org.apache.hadoop.mapreduce.lib.input.FileInputFormat importação;

org.apache.hadoop.mapreduce.lib.input.TextInputFormat importação;

org.apache.hadoop.mapreduce.lib.output.FileOutputFormat importação;

org.apache.hadoop.mapreduce.lib.output.TextOutputFormat importação;

/**

* @author kaushik pal

*

* Esta é a principal classe de driver para iniciar o mapa-reduce

* process. Ele define o terreno de volta para todo o processo e

* Em seguida, inicia-lo.

*/

DevXDriver classe pública {

public static void main(Corda[] args) throws Exception {

// iniciar a configuração

configx = new Configuration();

// Adicionar arquivos de recursos

configx.addResource(novo Caminho(“/user / hadoop / core-site.xml”));

configx.addResource(novo Caminho(“/user / hadoop / hdfs-site.xml”));

// Criar trabalho MapReduce

devxmapjob Job = new Job(configx,”DevXDriver.class”);

devxmapjob.setJarByClass(DevXDriver.class);

devxmapjob.setJobName(“DevX MapReduce Job”);

// Definir a saída kay e classe de valor

devxmapjob.setOutputKeyClass(Text.class);

devxmapjob.setOutputValueClass(Text.class);

// Set classe Map

devxmapjob.setMapperClass(DevXMap.class);

// Definir classe Combiner

devxmapjob.setCombinerClass(DevXReducer.class);

// Set classe Redutor

devxmapjob.setReducerClass(DevXReducer.class);

// Definir Mapa chave de saída e valor aulas

devxmapjob.setMapOutputKeyClass(Text.class);

devxmapjob.setMapOutputValueClass(Text.class);

// Defina o número de tarefas redutor

devxmapjob.setNumReduceTasks(10);

// Definir as classes de formato de entrada e de saída

devxmapjob.setInputFormatClass(TextInputFormat.class);

devxmapjob.setOutputFormatClass(TextOutputFormat.class);

// Conjunto de entrada e passagem de saída

FileInputFormat.addInputPath(devxmapjob, novo Caminho(“/user / map_reduce / input /”));

FileOutputFormat.setOutputPath(devxmapjob,novo Caminho(“/user / map_reduce / saída”));

// Iniciar o trabalho de MapReduce

devxmapjob.waitForCompletion(verdadeiro);

}

}

[/código]

função de mapa: Este é responsável pela divisão dos dados com base no par de chaves de valor. Isto é conhecido como mapeamento de dados.

Listing2: Esta é uma função Mapa dividindo os dados em pedaços

[código]

com.mapreduce.techalpine pacote;

importação java.io.BufferedReader;

importação java.io.InputStreamReader;

importação java.net.URI;

importação java.util.StringTokenizer;

importação org.apache.commons.logging.Log;

importação org.apache.commons.logging.LogFactory;

importação org.apache.hadoop.conf.Configuration;

importação org.apache.hadoop.fs.FileSystem;

importação org.apache.hadoop.fs.Path;

importação org.apache.hadoop.io.LongWritable;

importação org.apache.hadoop.io.Text;

importação org.apache.hadoop.mapreduce.Mapper;

/**

* @author kaushik pal

*

* Este é o processo de mapa. Ele faz o mapeamento para palavras-value pair.

*/

classe pública DevXMap estende Mapper<LongWritable, Texto, Texto,Texto> {

// Criar o caminho, variáveis ​​BufferedReader e texto

file_path path;

BufferedReader buffer_reader;

tweet_values ​​texto = new Texto();

/**

* chave @param

* valor @param

* @param contexto

*/

mapa public void(chave LongWritable, valor de texto, contexto contexto) {

tentar{

// Criar configuração para Mapa

map_config = new Configuration();

// Carregar arquivos de núcleo do Hadoop na configuração

map_config.addResource(novo Caminho(“/user / hadoop / core-site.xml”));

map_config.addResource(novo Caminho(“/user / hadoop / hdfs-site.xml”));

// Criar variáveis

Cordas PesquisaPalavra = “”;

// Abrir arquivo do caminho de arquivo

file_path = new Path(“files / repositório / keys.txt”);

FileSystem file_system = FileSystem.get(URI.create(“files / repositório / keys.txt”),nova configuração());

// leitor de tampão de carga

buffer_reader = new BufferedReader(nova InputStreamReader(file_system.open(file_path)));

while(buffer_reader.ready())

{

PesquisaPalavra = buffer_reader.readLine().aparar();

}

// Obter valor de chave

key_value texto final = novo texto(Pesquisar palavra-chave);

// Verifique o valor e tomar uma decisão

se(valor == null)

{

return;

}

else{

StringTokenizer string_tokens = new StringTokenizer(Value.ToString(),”,”);

int count = 0;

while(string_tokens.hasMoreTokens()) {

contagem ++;

se(contagem <1 =)

continuar;

Cordas new_tweet_value = string_tokens.nextToken().toLowerCase().aparar().substitua tudo(“\\*”,””);

se(new_tweet_value.contains(searchkeyword.toLowerCase().aparar())) {

tweet_values.set(new_tweet_value);

context.write(valor chave,tweet_values);

}

}

}

}

pegar(Exception e){

e.printStackTrace();

}

}

}

[/código]

função de reduzir: Este é responsável pela agregação dos dados. A agregação é feito com base nos valores principais. Então, depois de processamento e triagem a agregação concluído e envia o resultado de volta para o programa de cliente.

Listing3: A função Reduzir agrega os dados processados

[código]

com.mapreduce.techalpine pacote;

importação java.io.BufferedReader;

import java.io.IOException;

importação java.io.InputStreamReader;

importação java.net.URI;

importação java.util.RandomAccess;

importação java.util.regex.Matcher;

importação java.util.regex.Pattern;

importação org.apache.commons.logging.Log;

importação org.apache.commons.logging.LogFactory;

importação org.apache.hadoop.conf.Configuration;

importação org.apache.hadoop.fs.FSDataOutputStream;

importação org.apache.hadoop.fs.FileSystem;

importação org.apache.hadoop.fs.Path;

importação org.apache.hadoop.io.Text;

importação org.apache.hadoop.mapreduce.Reducer;

/**

* @author kaushik pal

*

* Esta é a função redutor. Ele agrega a saída com base no

* triagem de pares de valor-chave.

*/

classe pública DevXReducer estende Redutor<Texto ,Texto,Texto,Texto>

{

// Criar variáveis ​​para caminho de arquivo

positive_file_path path;

negative_file_path path;

output_file_path path;

keyword_file_path path;

// Criar variáveis ​​para tampão

BufferedReader positive_buff_reader;

BufferedReader negative_buff_reader;

BufferedReader keyword_buff_reader;

// Criar variáveis ​​para o cálculo

total_record_count Duplo estática = new Duplo(“0”);

count_neg Duplo estática = new Duplo(“0”);

count_pos static double = new Duplo(“0”);

count_neu Duplo estática = new Duplo(“0”);

percent_neg Duplo estática = new Duplo(“0”);

percent_pos static double = new Duplo(“0”);

percent_neu Duplo estática = new Duplo(“0”);

pattrn_matcher Pattern;

partidas matcher_txt;

static int NEW_ROW = 0;

FSDataOutputStream out_1st,out_2nd;

/**

* chave @param

* valores @param

* @param contexto

* @throws IOException

* @throws InterruptedException

*/

public void reduzir(chave de texto, Iterable<Texto> valores,contexto contexto) lança IOException, InterruptedException

{

// Criar configuração para redutor

reduce_config = new Configuration();

// arquivos de configuração de carga do Hadoop

reduce_config.addResource(novo Caminho(“/user / hadoop / core-site.xml”));

reduce_config.addResource(novo Caminho(“/user / hadoop / hdfs-site.xml”));

// Criar variáveis

Cordas key_word = “”;

Cordas check_keyword = key_word;

keyword_file_path = new Path(“files / repositório / keys.txt”);

FileSystem file_system_read = FileSystem.get(URI.create(“files / repositório / keys.txt”),nova configuração());

keyword_buff_reader = new BufferedReader(nova InputStreamReader(file_system_read.open(keyword_file_path)));

FileSystem get_filesys = FileSystem.get(reduce_config);

FileSystem get_filesys_posneg = FileSystem.get(reduce_config);

Caminho path_output = new Path(“/user / sentiment_output_file.txt”);

Caminho path_output_posneg = new Path(“/user / posneg_output_file.txt”);

// obter palavra-chave

while(keyword_buff_reader.ready())

{

key_word = keyword_buff_reader.readLine().aparar();

}

// Verifique o sistema de arquivos

se (!get_filesys.exists(path_output)) {

out_1st = get_filesys.create(path_output);

out_2nd = get_filesys_posneg.create(path_output_posneg);

}

// Verifique correspondência de palavra-chave usando dicionários positivos e negativos

se(check_keyword.equals(key.toString().toLowerCase()))

{

para(new_tweets texto:valores)

{

// Coloque dicionário palavra positiva

positive_file_path = new Path(“/user / map_reduce / pos_words.txt”);

FileSystem filesystem_one = FileSystem.get(URI.create(“files / pos_words.txt”),nova configuração());

positive_buff_reader = new BufferedReader(nova InputStreamReader(filesystem_one.open(positive_file_path)));

// Coloque disctinary palavra negativa

negative_file_path = new Path(“/user / map_reduce / neg_words.txt”);

FileSystem filesystem_two = FileSystem.get(URI.create(“files / neg_words.txt”),nova configuração());

negative_buff_reader = new BufferedReader(nova InputStreamReader(filesystem_two.open(negative_file_path)));

++total_record_count;

boolean first_flag = false;

boolean second_flag = false;

Cordas all_tweets = new_tweets.toString();

Cordas first_regex = “”;

Cordas second_regex = “”;

while(positive_buff_reader.ready())

{

first_regex = positive_buff_reader.readLine().aparar();

NEW_ROW ;

pattrn_matcher = Pattern.compile(first_regex, Pattern.CASE_INSENSITIVE);

matcher_txt = pattrn_matcher.matcher(all_tweets);

first_flag = matcher_txt.find();

 

se(first_flag)

{

out_2nd.writeBytes(all_tweets);

context.write(nOVO TEXTO(first_regex),nOVO TEXTO(all_tweets));

pausa;

}

}

while(negative_buff_reader.ready())

{

NEW_ROW ;

second_regex = negative_buff_reader.readLine().aparar();

pattrn_matcher = Pattern.compile(second_regex, Pattern.CASE_INSENSITIVE);

matcher_txt = pattrn_matcher.matcher(all_tweets);

second_flag = matcher_txt.find();

se(second_flag)

{

out_2nd.writeBytes(all_tweets);

context.write(nOVO TEXTO(second_regex),nOVO TEXTO(all_tweets));

pausa;

}

}

se(first_flag&second_flag)

{

++count_neu;

}

else

{

se(first_flag)

{

++count_pos;

}

se(second_flag)

{

++count_neg;

}

se(first_flag == false&second_flag == false)

{

++count_neu;

}

}

// Fechar buffers

negative_buff_reader.close();

positive_buff_reader.close();

}

// Calcule os valores percentuais

percent_pos = count_pos / total_record_count * 100;

percent_neg = count_neg / total_record_count * 100;

percent_neu = count_neu / total_record_count * 100;

tentar{

// Escreva para os arquivos

out_1st.writeBytes(“\n”+key_word);

out_1st.writeBytes(“,”+total_record_count);

out_1st.writeBytes(“,”+percent_neg);

out_1st.writeBytes(“,”+percent_pos);

out_1st.writeBytes(“,”+percent_neu);

// Fechar sistemas de arquivos

out_1st.close();

get_filesys.close();

}pegar(Exception e){

e.printStackTrace();

}

}

}

}

[/código]

Conclusão: Neste artigo, discutimos o processamento de MapReduce usando o ambiente de programação Java. O componente diferente, como Mapa e reduzir a função executar a tarefa principal e retorna a saída para o cliente. O processamento realiza eficientemente em ambiente distribuído apenas. Assim, devemos definir o quadro Apache Hadoop em um ambiente distribuído para obter o melhor resultado.

Hope you have enjoyed the article and you will be able to implement it in your practical programming. Keep reading.

Tagged on: ,
============================================= ============================================== Buy best TechAlpine Books on Amazon
============================================== ---------------------------------------------------------------- electrician ct chestnutelectric
error

Enjoy this blog? Please spread the word :)

Follow by Email
LinkedIn
LinkedIn
Share