Visão global: Big data é um novo paradigma e seu processamento é a área mais importante para se concentrar. Big data refere-se ao grande volume de dados que vem de diferentes fontes, como a mídia social, dados do sensor, Dados móveis, dados de internet e muitos mais. Neste artigo, vamos concentrar-se na parte de processamento usando o framework Hadoop e Map-Reduce programação. Map-Reduce pode ser definida como um tipo especial de estrutura de programação usada para processar grande quantidade de dados em um quadro chamado de hardware commodity distribuídos. Assim, o artigo irá descrever as Map-Reduce conceitos e sua aplicação prática utilizando código Java.
Introdução: Hadoop MapReduce pode ser definido como uma estrutura de programação software usado para processar grande volume de dados (em nível terabyte) no ambiente paralelo de nós em cluster. O cluster é composto por milhares de nós de hardware commodity. O processamento é distribuído, tolerante a falhas e fiável. Um trabalho MapReduce típica é realizada de acordo com os seguintes passos
1) Dividir os dados em pedaços independentes baseados em key-value pair. Isso é feito por tarefa Mapa de forma paralela.
2) O posto para fora do trabalho Mapa é classificado com base nos valores-chave.
3) A saída ordenada é a entrada para o trabalho Reduzir. E, em seguida, que produz o resultado final para o processamento e devolve o resultado para o cliente.
Nota: hardware commodity é basicamente sistemas de computadores de baixo custo
quadro MapReduce: framework Apache Hadoop MapReduce é escrito em Java. A estrutura consiste de configuração master-slave. O mestre é conhecido como JobTracker e os escravos são conhecidos como TaskTrackers.The principal controla a tarefa processadas em escravos (que são nada, mas os nós em um cluster). O cálculo é feito sobre os escravos. Assim, os computação e armazenamentos nós são o mesmo em um ambiente de cluster. O conceito é ‘ mover o cálculo para os nódulos onde os dados são armazenados ', e torna o processamento mais rápido.
MapReduce Processing: O modelo de quadro MapReduce é muito leve. Assim, o custo do hardware é baixo quando comparado com outros enquadramentos. Mas, ao mesmo tempo, devemos entender que o modelo funciona de forma eficiente apenas em um ambiente distribuído como o processamento é feito em nós onde reside a dados. As outras características como escalabilidade, tolerância confiabilidade e falha também funciona bem em ambiente distribuído.
implementação MapReduce: Agora vamos discutir sobre a implementação do modelo MapReduce usando a plataforma de programação Java. Seguem-se os diferentes componentes de toda a ponta a ponta implementação.
- The programa de cliente que é a classe condutor e inicia o processo de
- The função de mapa que realiza a separação utilizando o par de key-value.
- The função de reduzir que agrega os dados processados e enviar a saída de volta para o cliente.
classe do driver: Segue-se uma classe de driver que liga o Mapa e função Reduzir e inicia o processamento. Este é o programa de cliente que inicia o processo de.
Listing1: O programa cliente (classe do driver) iniciar o processo de
[código]
com.mapreduce.techalpine pacote;
importação org.apache.commons.logging.Log;
importação org.apache.commons.logging.LogFactory;
importação org.apache.hadoop.conf.Configuration;
importação org.apache.hadoop.fs.Path;
importação org.apache.hadoop.io.Text;
org.apache.hadoop.mapreduce.Job importação;
org.apache.hadoop.mapreduce.lib.input.FileInputFormat importação;
org.apache.hadoop.mapreduce.lib.input.TextInputFormat importação;
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat importação;
org.apache.hadoop.mapreduce.lib.output.TextOutputFormat importação;
/**
* @author kaushik pal
*
* Esta é a principal classe de driver para iniciar o mapa-reduce
* process. Ele define o terreno de volta para todo o processo e
* Em seguida, inicia-lo.
*/
DevXDriver classe pública {
public static void main(Corda[] args) throws Exception {
// iniciar a configuração
configx = new Configuration();
// Adicionar arquivos de recursos
configx.addResource(novo Caminho(“/user / hadoop / core-site.xml”));
configx.addResource(novo Caminho(“/user / hadoop / hdfs-site.xml”));
// Criar trabalho MapReduce
devxmapjob Job = new Job(configx,”DevXDriver.class”);
devxmapjob.setJarByClass(DevXDriver.class);
devxmapjob.setJobName(“DevX MapReduce Job”);
// Definir a saída kay e classe de valor
devxmapjob.setOutputKeyClass(Text.class);
devxmapjob.setOutputValueClass(Text.class);
// Set classe Map
devxmapjob.setMapperClass(DevXMap.class);
// Definir classe Combiner
devxmapjob.setCombinerClass(DevXReducer.class);
// Set classe Redutor
devxmapjob.setReducerClass(DevXReducer.class);
// Definir Mapa chave de saída e valor aulas
devxmapjob.setMapOutputKeyClass(Text.class);
devxmapjob.setMapOutputValueClass(Text.class);
// Defina o número de tarefas redutor
devxmapjob.setNumReduceTasks(10);
// Definir as classes de formato de entrada e de saída
devxmapjob.setInputFormatClass(TextInputFormat.class);
devxmapjob.setOutputFormatClass(TextOutputFormat.class);
// Conjunto de entrada e passagem de saída
FileInputFormat.addInputPath(devxmapjob, novo Caminho(“/user / map_reduce / input /”));
FileOutputFormat.setOutputPath(devxmapjob,novo Caminho(“/user / map_reduce / saída”));
// Iniciar o trabalho de MapReduce
devxmapjob.waitForCompletion(verdadeiro);
}
}
[/código]
função de mapa: Este é responsável pela divisão dos dados com base no par de chaves de valor. Isto é conhecido como mapeamento de dados.
Listing2: Esta é uma função Mapa dividindo os dados em pedaços
[código]
com.mapreduce.techalpine pacote;
importação java.io.BufferedReader;
importação java.io.InputStreamReader;
importação java.net.URI;
importação java.util.StringTokenizer;
importação org.apache.commons.logging.Log;
importação org.apache.commons.logging.LogFactory;
importação org.apache.hadoop.conf.Configuration;
importação org.apache.hadoop.fs.FileSystem;
importação org.apache.hadoop.fs.Path;
importação org.apache.hadoop.io.LongWritable;
importação org.apache.hadoop.io.Text;
importação org.apache.hadoop.mapreduce.Mapper;
/**
* @author kaushik pal
*
* Este é o processo de mapa. Ele faz o mapeamento para palavras-value pair.
*/
classe pública DevXMap estende Mapper<LongWritable, Texto, Texto,Texto> {
// Criar o caminho, variáveis BufferedReader e texto
file_path path;
BufferedReader buffer_reader;
tweet_values texto = new Texto();
/**
* chave @param
* valor @param
* @param contexto
*/
mapa public void(chave LongWritable, valor de texto, contexto contexto) {
tentar{
// Criar configuração para Mapa
map_config = new Configuration();
// Carregar arquivos de núcleo do Hadoop na configuração
map_config.addResource(novo Caminho(“/user / hadoop / core-site.xml”));
map_config.addResource(novo Caminho(“/user / hadoop / hdfs-site.xml”));
// Criar variáveis
Cordas PesquisaPalavra = “”;
// Abrir arquivo do caminho de arquivo
file_path = new Path(“files / repositório / keys.txt”);
FileSystem file_system = FileSystem.get(URI.create(“files / repositório / keys.txt”),nova configuração());
// leitor de tampão de carga
buffer_reader = new BufferedReader(nova InputStreamReader(file_system.open(file_path)));
while(buffer_reader.ready())
{
PesquisaPalavra = buffer_reader.readLine().aparar();
}
// Obter valor de chave
key_value texto final = novo texto(Pesquisar palavra-chave);
// Verifique o valor e tomar uma decisão
se(valor == null)
{
return;
}
else{
StringTokenizer string_tokens = new StringTokenizer(Value.ToString(),”,”);
int count = 0;
while(string_tokens.hasMoreTokens()) {
contagem ++;
se(contagem <1 =)
continuar;
Cordas new_tweet_value = string_tokens.nextToken().toLowerCase().aparar().substitua tudo(“\\*”,””);
se(new_tweet_value.contains(searchkeyword.toLowerCase().aparar())) {
tweet_values.set(new_tweet_value);
context.write(valor chave,tweet_values);
}
}
}
}
pegar(Exception e){
e.printStackTrace();
}
}
}
[/código]
função de reduzir: Este é responsável pela agregação dos dados. A agregação é feito com base nos valores principais. Então, depois de processamento e triagem a agregação concluído e envia o resultado de volta para o programa de cliente.
Listing3: A função Reduzir agrega os dados processados
[código]
com.mapreduce.techalpine pacote;
importação java.io.BufferedReader;
import java.io.IOException;
importação java.io.InputStreamReader;
importação java.net.URI;
importação java.util.RandomAccess;
importação java.util.regex.Matcher;
importação java.util.regex.Pattern;
importação org.apache.commons.logging.Log;
importação org.apache.commons.logging.LogFactory;
importação org.apache.hadoop.conf.Configuration;
importação org.apache.hadoop.fs.FSDataOutputStream;
importação org.apache.hadoop.fs.FileSystem;
importação org.apache.hadoop.fs.Path;
importação org.apache.hadoop.io.Text;
importação org.apache.hadoop.mapreduce.Reducer;
/**
* @author kaushik pal
*
* Esta é a função redutor. Ele agrega a saída com base no
* triagem de pares de valor-chave.
*/
classe pública DevXReducer estende Redutor<Texto ,Texto,Texto,Texto>
{
// Criar variáveis para caminho de arquivo
positive_file_path path;
negative_file_path path;
output_file_path path;
keyword_file_path path;
// Criar variáveis para tampão
BufferedReader positive_buff_reader;
BufferedReader negative_buff_reader;
BufferedReader keyword_buff_reader;
// Criar variáveis para o cálculo
total_record_count Duplo estática = new Duplo(“0”);
count_neg Duplo estática = new Duplo(“0”);
count_pos static double = new Duplo(“0”);
count_neu Duplo estática = new Duplo(“0”);
percent_neg Duplo estática = new Duplo(“0”);
percent_pos static double = new Duplo(“0”);
percent_neu Duplo estática = new Duplo(“0”);
pattrn_matcher Pattern;
partidas matcher_txt;
static int NEW_ROW = 0;
FSDataOutputStream out_1st,out_2nd;
/**
* chave @param
* valores @param
* @param contexto
* @throws IOException
* @throws InterruptedException
*/
public void reduzir(chave de texto, Iterable<Texto> valores,contexto contexto) lança IOException, InterruptedException
{
// Criar configuração para redutor
reduce_config = new Configuration();
// arquivos de configuração de carga do Hadoop
reduce_config.addResource(novo Caminho(“/user / hadoop / core-site.xml”));
reduce_config.addResource(novo Caminho(“/user / hadoop / hdfs-site.xml”));
// Criar variáveis
Cordas key_word = “”;
Cordas check_keyword = key_word;
keyword_file_path = new Path(“files / repositório / keys.txt”);
FileSystem file_system_read = FileSystem.get(URI.create(“files / repositório / keys.txt”),nova configuração());
keyword_buff_reader = new BufferedReader(nova InputStreamReader(file_system_read.open(keyword_file_path)));
FileSystem get_filesys = FileSystem.get(reduce_config);
FileSystem get_filesys_posneg = FileSystem.get(reduce_config);
Caminho path_output = new Path(“/user / sentiment_output_file.txt”);
Caminho path_output_posneg = new Path(“/user / posneg_output_file.txt”);
// obter palavra-chave
while(keyword_buff_reader.ready())
{
key_word = keyword_buff_reader.readLine().aparar();
}
// Verifique o sistema de arquivos
se (!get_filesys.exists(path_output)) {
out_1st = get_filesys.create(path_output);
out_2nd = get_filesys_posneg.create(path_output_posneg);
}
// Verifique correspondência de palavra-chave usando dicionários positivos e negativos
se(check_keyword.equals(key.toString().toLowerCase()))
{
para(new_tweets texto:valores)
{
// Coloque dicionário palavra positiva
positive_file_path = new Path(“/user / map_reduce / pos_words.txt”);
FileSystem filesystem_one = FileSystem.get(URI.create(“files / pos_words.txt”),nova configuração());
positive_buff_reader = new BufferedReader(nova InputStreamReader(filesystem_one.open(positive_file_path)));
// Coloque disctinary palavra negativa
negative_file_path = new Path(“/user / map_reduce / neg_words.txt”);
FileSystem filesystem_two = FileSystem.get(URI.create(“files / neg_words.txt”),nova configuração());
negative_buff_reader = new BufferedReader(nova InputStreamReader(filesystem_two.open(negative_file_path)));
++total_record_count;
boolean first_flag = false;
boolean second_flag = false;
Cordas all_tweets = new_tweets.toString();
Cordas first_regex = “”;
Cordas second_regex = “”;
while(positive_buff_reader.ready())
{
first_regex = positive_buff_reader.readLine().aparar();
NEW_ROW ;
pattrn_matcher = Pattern.compile(first_regex, Pattern.CASE_INSENSITIVE);
matcher_txt = pattrn_matcher.matcher(all_tweets);
first_flag = matcher_txt.find();
se(first_flag)
{
out_2nd.writeBytes(all_tweets);
context.write(nOVO TEXTO(first_regex),nOVO TEXTO(all_tweets));
pausa;
}
}
while(negative_buff_reader.ready())
{
NEW_ROW ;
second_regex = negative_buff_reader.readLine().aparar();
pattrn_matcher = Pattern.compile(second_regex, Pattern.CASE_INSENSITIVE);
matcher_txt = pattrn_matcher.matcher(all_tweets);
second_flag = matcher_txt.find();
se(second_flag)
{
out_2nd.writeBytes(all_tweets);
context.write(nOVO TEXTO(second_regex),nOVO TEXTO(all_tweets));
pausa;
}
}
se(first_flag&second_flag)
{
++count_neu;
}
else
{
se(first_flag)
{
++count_pos;
}
se(second_flag)
{
++count_neg;
}
se(first_flag == false&second_flag == false)
{
++count_neu;
}
}
// Fechar buffers
negative_buff_reader.close();
positive_buff_reader.close();
}
// Calcule os valores percentuais
percent_pos = count_pos / total_record_count * 100;
percent_neg = count_neg / total_record_count * 100;
percent_neu = count_neu / total_record_count * 100;
tentar{
// Escreva para os arquivos
out_1st.writeBytes(“\n”+key_word);
out_1st.writeBytes(“,”+total_record_count);
out_1st.writeBytes(“,”+percent_neg);
out_1st.writeBytes(“,”+percent_pos);
out_1st.writeBytes(“,”+percent_neu);
// Fechar sistemas de arquivos
out_1st.close();
get_filesys.close();
}pegar(Exception e){
e.printStackTrace();
}
}
}
}
[/código]
Conclusão: Neste artigo, discutimos o processamento de MapReduce usando o ambiente de programação Java. O componente diferente, como Mapa e reduzir a função executar a tarefa principal e retorna a saída para o cliente. O processamento realiza eficientemente em ambiente distribuído apenas. Assim, devemos definir o quadro Apache Hadoop em um ambiente distribuído para obter o melhor resultado.
Hope you have enjoyed the article and you will be able to implement it in your practical programming. Keep reading.