Kroki napisać Map Reduce program przy użyciu Hadoop ramy i Java

MapReduce

MapReduce

Przegląd: Big Data jest nowym paradygmatem i jej przetwarzanie jest najważniejszym obszarem skoncentrować. Big dane odnoszą się do wielkiej ilości danych, które pochodzą z różnych źródeł, takich jak social media, dane czujnika, Dane mobilne, Dane www i wiele więcej. W tym artykule skupimy się na części przetwarzania przy użyciu Hadoop ramy oraz Map-Reduce programowania. Map-Reduce można zdefiniować jako szczególny rodzaj ram programowych stosowanych do przetwarzania dużej ilości danych w rozproszonym ramowej nazywa sprzęcie. Więc artykule opisze Map-Reduce pojęć i ich praktyczne wdrożenie za pomocą kodu Java.

Wprowadzenie: Hadoop MapReduce może być zdefiniowana jako ramy programowania oprogramowanie służące do przetwarzania duże ilości danych (w poziomie terabajtów) w równoległym środowisku klastrowych węzłów. Klaster składa się z tysięcy węzłów sprzęcie. Przetwarzanie jest rozprowadzany, niezawodne i odporne na uszkodzenia. Typowe zadanie MapReduce odbywa się zgodnie z następującymi etapami

1) Podzielić dane na niezależne kawałki oparte na parę klucz-wartość. Odbywa się to poprzez zadanie MAP w sposób równoległy.

2) Obecnie wprowadzone z pracy mapa jest sortowane na podstawie wartości klucza.

3) Uporządkowany wyjście jest wejściem do pracy Reduce. A to daje ostateczny wynik do przetwarzania i zwraca wynik do klienta.

Note: sprzęcie jest w zasadzie Polish systemy komputerowe

MapReduce ramy: Apache Hadoop MapReduce ramy jest napisany w Javie. Ramy składa się z konfiguracji master-slave. Mistrz jest znany jako JobTracker i niewolnicy są znane jako TaskTrackers.The mistrz kontroluje zadania przetwarzane w niewolników (które nie są niczym innym węzłów w klastrze). Obliczenia dokonuje się na niewolników. Tak więc obliczyć i magazyny węzły są takie same w środowisku klastrowym. Koncepcja jest ‘ przesunąć obliczeń z węzłami, w których dane są przechowywane ", i to sprawia, że ​​przetwarzanie szybciej.

Przetwarzanie MapReduce: Model ramy MapReduce jest bardzo lekki. Więc koszt sprzętu jest niska w porównaniu do innych ram. Ale w tym samym czasie powinniśmy zrozumieć, że model działa sprawnie tylko w środowisku rozproszonym, jak przetwarzanie odbywa się na węzłach, gdzie rezyduje danych. Inne funkcje, takie jak skalowalność, niezawodność i Fault Tolerance również działa dobrze w środowisku rozproszonym.

wdrożenie MapReduce: Teraz omówimy o wdrożeniu modelu MapReduce wykorzystaniem platformy programowania Java. Poniżej znajdują się różne elementy całego końca do końca realizacji.

  • The program kliencki która jest klasą kierowca i inicjuje proces
  • The funkcja map która wykonuje split przy użyciu parę klucz-wartość.
  • The funkcja Redukcja które agregują przetworzone dane i wysłać wyjście do klienta.

Klasa sterownika: Poniżej przedstawiono klasy sterownika, który wiąże mapę i funkcji Redukcja i rozpoczyna przetwarzanie. To program klienta, który inicjuje proces.

Listing1: Program kliencki (Klasa sterownika) inicjowanie procesu

[kod]

pakiet com.mapreduce.techalpine;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**

* @author Kaushik pal

*

* Jest to główny klasy sterownika do zainicjowania mapę, zmniejszyć

* process. Przedstawia ona w ziemię z powrotem przez cały proces i

* Następnie rozpoczyna się.

*/

public class DevXDriver {

public static void main(Smyczkowy[] args) throws Exception {

// Inicjowanie konfiguracji

configx Konfiguracja = new Configuration();

// Dodaj pliki zasobów

configx.addResource(nowa ścieżka(“/user / Hadoop / core-site.xml”));

configx.addResource(nowa ścieżka(“/user / Hadoop / HDFS-site.xml”));

// Tworzenie MapReduce pracę

devxmapjob Job = new job(configx,”DevXDriver.class”);

devxmapjob.setJarByClass(DevXDriver.class);

devxmapjob.setJobName(“DevX MapReduce Praca”);

// Ustaw wyjście Kay i klasa wartości

devxmapjob.setOutputKeyClass(Text.class);

devxmapjob.setOutputValueClass(Text.class);

// Zestaw klasy Mapa

devxmapjob.setMapperClass(DevXMap.class);

// Ustaw klasy Sumator

devxmapjob.setCombinerClass(DevXReducer.class);

// Zestaw klasy redukcyjna

devxmapjob.setReducerClass(DevXReducer.class);

// Ustaw Mapa kluczowe wyjściowego i wartości klas

devxmapjob.setMapOutputKeyClass(Text.class);

devxmapjob.setMapOutputValueClass(Text.class);

// Ustaw liczbę zadań reduktorem

devxmapjob.setNumReduceTasks(10);

// Ustaw klasy formatów wejściowych i wyjściowych

devxmapjob.setInputFormatClass(TextInputFormat.class);

devxmapjob.setOutputFormatClass(TextOutputFormat.class);

// Ustaw wejście i wyjście ścieżki

FileInputFormat.addInputPath(devxmapjob, nowa ścieżka(“/user / map_reduce / input /”));

FileOutputFormat.setOutputPath(devxmapjob,nowa ścieżka(“/user / map_reduce / wyjścia”));

// Zacznij MapReduce pracę

devxmapjob.waitForCompletion(prawdziwy);

}

}

[/kod]

funkcja map: Jest odpowiedzialny za rozdzielenie danych na podstawie wartości klucza parę. Jest to znane jako mapowanie danych.

Listing2: Jest to funkcja Map dzieląc dane na kawałki

[kod]

pakiet com.mapreduce.techalpine;

import java.io.BufferedReader;

import java.io.InputStreamReader;

import java.net.URI;

import java.util.StringTokenizer;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

/**

* @author Kaushik pal

*

* Jest to proces na mapie. Czyni mapowanie dla pary słów kluczowych wartości.

*/

public class DevXMap rozciąga Mapper<LongWritable, Tekst, Tekst,Tekst> {

// Tworzenie ścieżki, BufferedReader i zmienne tekstowe

ścieżka ścieżka_pliku;

BufferedReader buffer_reader;

tweet_values ​​text = new Text();

/**

* kluczem @param

* wartość @param

* @param kontekstu

*/

public void mapie(kluczem LongWritable, wartość tekstowa, kontekst kontekst) {

próbować{

// Utwórz konfigurację mapą

map_config Konfiguracja = new Configuration();

// Załaduj podstawowe pliki Hadoop w konfiguracji

map_config.addResource(nowa ścieżka(“/user / Hadoop / core-site.xml”));

map_config.addResource(nowa ścieżka(“/user / Hadoop / HDFS-site.xml”));

// Tworzenie zmiennych

String searchkeyword = “”;

// Otwórz plik ze ścieżki pliku

ścieżka_pliku = new Path(“Pliki / repozytorium / keys.txt”);

FileSystem system_plików = FileSystem.get(URI.create(“Pliki / repozytorium / keys.txt”),nowa konfiguracja());

// Czytelnik bufora obciążenia

buffer_reader = new BufferedReader(nowy InputStreamReader(file_system.open(file_path)));

podczas gdy(buffer_reader.ready())

{

searchkeyword = buffer_reader.readLine().trym();

}

// Uzyskaj wartość klucza

Ostateczna key_value Text = new Text(Znajdź kluczowe słowo);

// Sprawdź wartość i podjąć decyzję

jeśli(wartość == null)

{

powrót;

}

else{

StringTokenizer string_tokens = new StringTokenizer(Value.ToString(),”,”);

int count = 0;

podczas gdy(string_tokens.hasMoreTokens()) {

count ++;

jeśli(count <= 1)

Kontyntynuj;

String new_tweet_value = string_tokens.nextToken().toLowerCase().trym().Zamień wszystko(“\\*”,””);

jeśli(new_tweet_value.contains(searchkeyword.toLowerCase().trym())) {

tweet_values.set(new_tweet_value);

context.write(kluczowa wartość,tweet_values);

}

}

}

}

złapać(Exception){

e.printStackTrace();

}

}

}

[/kod]

funkcja Redukcja: Jest odpowiedzialny za agregacji. Agregacja jest przeprowadzić na podstawie wartości klucza. Więc po przetworzeniu i sortowania agregacja zakończona i wysyła wynik z powrotem do programu klienckiego.

Listing3: Funkcja Redukcja agreguje przetwarzanych danych

[kod]

pakiet com.mapreduce.techalpine;

import java.io.BufferedReader;

java.io.IOException import;

import java.io.InputStreamReader;

import java.net.URI;

import java.util.RandomAccess;

import java.util.regex.Matcher;

import java.util.regex.Pattern;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

/**

* @author Kaushik pal

*

* Jest to funkcja reduktor. To agreguje dane wyjściowe oparte na

* Sortowanie par klucz-wartość.

*/

public class DevXReducer rozciąga Reducer<Tekst ,Tekst,Tekst,Tekst>

{

// Tworzenie zmiennych dla ścieżki pliku

ścieżka positive_file_path;

ścieżka negative_file_path;

ścieżka output_file_path;

ścieżka keyword_file_path;

// Tworzenie zmiennych dla bufor

BufferedReader positive_buff_reader;

BufferedReader negative_buff_reader;

BufferedReader keyword_buff_reader;

// Tworzenie zmiennych dla obliczeń

statyczne dwukrotnie total_record_count = new Podwójna(“0”);

statyczne dwukrotnie count_neg = new Podwójna(“0”);

statyczne Podwójne count_pos = new Podwójna(“0”);

statyczne dwukrotnie count_neu = new Podwójna(“0”);

statyczne dwukrotnie percent_neg = new Podwójna(“0”);

statyczne Podwójne percent_pos = new Podwójna(“0”);

statyczne dwukrotnie percent_neu = new Podwójna(“0”);

pattrn_matcher wzór;

mecze matcher_txt;

static int NEW_ROW = 0;

FSDataOutputStream out_1st,out_2nd;

/**

* kluczem @param

* wartości @param

* @param kontekstu

* @throws IOException

* @throws InterruptedException

*/

public void zmniejszyć(Kluczowy tekst, iterowalny<Tekst> values,kontekst kontekst) rzuca IOException, InterruptedException

{

// Utwórz konfigurację reduktora

reduce_config Konfiguracja = new Configuration();

// Hadoop załadować pliki konfiguracyjne

reduce_config.addResource(nowa ścieżka(“/user / Hadoop / core-site.xml”));

reduce_config.addResource(nowa ścieżka(“/user / Hadoop / HDFS-site.xml”));

// Tworzenie zmiennych

String key_word = “”;

String check_keyword = key_word;

keyword_file_path = new Path(“Pliki / repozytorium / keys.txt”);

FileSystem file_system_read = FileSystem.get(URI.create(“Pliki / repozytorium / keys.txt”),nowa konfiguracja());

keyword_buff_reader = new BufferedReader(nowy InputStreamReader(file_system_read.open(keyword_file_path)));

FileSystem get_filesys = FileSystem.get(reduce_config);

FileSystem get_filesys_posneg = FileSystem.get(reduce_config);

Ścieżka path_output = new Path(“/user / sentiment_output_file.txt”);

Ścieżka path_output_posneg = new Path(“/user / posneg_output_file.txt”);

// Uzyskaj słowa kluczowego

podczas gdy(keyword_buff_reader.ready())

{

key_word = keyword_buff_reader.readLine().trym();

}

// Sprawdź system plików

jeśli (!get_filesys.exists(path_output)) {

out_1st = get_filesys.create(path_output);

out_2nd = get_filesys_posneg.create(path_output_posneg);

}

// Sprawdź dopasowanie słów kluczowych za pomocą pozytywnych i negatywnych słowniki

jeśli(check_keyword.equals(key.toString().toLowerCase()))

{

dla(new_tweets tekstowe:values)

{

// Załaduj pozytywny słownika słowo

positive_file_path = new Path(“/user / map_reduce / pos_words.txt”);

FileSystem filesystem_one = FileSystem.get(URI.create(“Pliki / pos_words.txt”),nowa konfiguracja());

positive_buff_reader = new BufferedReader(nowy InputStreamReader(filesystem_one.open(positive_file_path)));

// Załaduj negatywne słowo disctinary

negative_file_path = new Path(“/user / map_reduce / neg_words.txt”);

FileSystem filesystem_two = FileSystem.get(URI.create(“Pliki / neg_words.txt”),nowa konfiguracja());

negative_buff_reader = new BufferedReader(nowy InputStreamReader(filesystem_two.open(negative_file_path)));

++total_record_count;

logiczna first_flag = false;

logiczna second_flag = false;

String all_tweets = new_tweets.toString();

String first_regex = “”;

String second_regex = “”;

podczas gdy(positive_buff_reader.ready())

{

first_regex = positive_buff_reader.readLine().trym();

NEW_ROW ;

pattrn_matcher = Pattern.compile(first_regex, Pattern.CASE_INSENSITIVE);

matcher_txt = pattrn_matcher.matcher(all_tweets);

first_flag = matcher_txt.find();

 

jeśli(first_flag)

{

out_2nd.writeBytes(all_tweets);

context.write(nowy tekst(first_regex),nowy tekst(all_tweets));

przerwa;

}

}

podczas gdy(negative_buff_reader.ready())

{

NEW_ROW ;

second_regex = negative_buff_reader.readLine().trym();

pattrn_matcher = Pattern.compile(second_regex, Pattern.CASE_INSENSITIVE);

matcher_txt = pattrn_matcher.matcher(all_tweets);

second_flag = matcher_txt.find();

jeśli(second_flag)

{

out_2nd.writeBytes(all_tweets);

context.write(nowy tekst(second_regex),nowy tekst(all_tweets));

przerwa;

}

}

jeśli(first_flag&second_flag)

{

++count_neu;

}

else

{

jeśli(first_flag)

{

++count_pos;

}

jeśli(second_flag)

{

++count_neg;

}

jeśli(first_flag == false&second_flag == false)

{

++count_neu;

}

}

// Zamknij bufory

negative_buff_reader.close();

positive_buff_reader.close();

}

// Oblicz wartości procentowe

percent_pos = count_pos / total_record_count * 100;

percent_neg = count_neg / total_record_count * 100;

percent_neu = count_neu / total_record_count * 100;

próbować{

// Napisz do plików

out_1st.writeBytes(“\n”+key_word);

out_1st.writeBytes(“,”+total_record_count);

out_1st.writeBytes(“,”+percent_neg);

out_1st.writeBytes(“,”+percent_pos);

out_1st.writeBytes(“,”+percent_neu);

// Zamknij systemy plików

out_1st.close();

get_filesys.close();

}złapać(Exception){

e.printStackTrace();

}

}

}

}

[/kod]

Wniosek: W niniejszym artykule omówiłem przetwarzanie MapReduce przy użyciu środowiska programowania Java. Odmienne elementem funkcji jak mapy oraz zmniejszyć wykonać główne zadanie i zwraca wyjście do klienta. Przetwarzanie wykonuje skutecznie na środowisku rozproszonym tylko. Więc powinniśmy ustanowić ramy Apache Hadoop w środowisku rozproszonym, aby uzyskać najlepszy wynik.

Hope you have enjoyed the article and you will be able to implement it in your practical programming. Keep reading.

Tagged on: ,
============================================= ============================================== Buy best TechAlpine Books on Amazon
============================================== ---------------------------------------------------------------- electrician ct chestnutelectric
error

Enjoy this blog? Please spread the word :)

Follow by Email
LinkedIn
LinkedIn
Share