Przegląd: Big Data jest nowym paradygmatem i jej przetwarzanie jest najważniejszym obszarem skoncentrować. Big dane odnoszą się do wielkiej ilości danych, które pochodzą z różnych źródeł, takich jak social media, dane czujnika, Dane mobilne, Dane www i wiele więcej. W tym artykule skupimy się na części przetwarzania przy użyciu Hadoop ramy oraz Map-Reduce programowania. Map-Reduce można zdefiniować jako szczególny rodzaj ram programowych stosowanych do przetwarzania dużej ilości danych w rozproszonym ramowej nazywa sprzęcie. Więc artykule opisze Map-Reduce pojęć i ich praktyczne wdrożenie za pomocą kodu Java.
Wprowadzenie: Hadoop MapReduce może być zdefiniowana jako ramy programowania oprogramowanie służące do przetwarzania duże ilości danych (w poziomie terabajtów) w równoległym środowisku klastrowych węzłów. Klaster składa się z tysięcy węzłów sprzęcie. Przetwarzanie jest rozprowadzany, niezawodne i odporne na uszkodzenia. Typowe zadanie MapReduce odbywa się zgodnie z następującymi etapami
1) Podzielić dane na niezależne kawałki oparte na parę klucz-wartość. Odbywa się to poprzez zadanie MAP w sposób równoległy.
2) Obecnie wprowadzone z pracy mapa jest sortowane na podstawie wartości klucza.
3) Uporządkowany wyjście jest wejściem do pracy Reduce. A to daje ostateczny wynik do przetwarzania i zwraca wynik do klienta.
Note: sprzęcie jest w zasadzie Polish systemy komputerowe
MapReduce ramy: Apache Hadoop MapReduce ramy jest napisany w Javie. Ramy składa się z konfiguracji master-slave. Mistrz jest znany jako JobTracker i niewolnicy są znane jako TaskTrackers.The mistrz kontroluje zadania przetwarzane w niewolników (które nie są niczym innym węzłów w klastrze). Obliczenia dokonuje się na niewolników. Tak więc obliczyć i magazyny węzły są takie same w środowisku klastrowym. Koncepcja jest ‘ przesunąć obliczeń z węzłami, w których dane są przechowywane ", i to sprawia, że przetwarzanie szybciej.
Przetwarzanie MapReduce: Model ramy MapReduce jest bardzo lekki. Więc koszt sprzętu jest niska w porównaniu do innych ram. Ale w tym samym czasie powinniśmy zrozumieć, że model działa sprawnie tylko w środowisku rozproszonym, jak przetwarzanie odbywa się na węzłach, gdzie rezyduje danych. Inne funkcje, takie jak skalowalność, niezawodność i Fault Tolerance również działa dobrze w środowisku rozproszonym.
wdrożenie MapReduce: Teraz omówimy o wdrożeniu modelu MapReduce wykorzystaniem platformy programowania Java. Poniżej znajdują się różne elementy całego końca do końca realizacji.
- The program kliencki która jest klasą kierowca i inicjuje proces
- The funkcja map która wykonuje split przy użyciu parę klucz-wartość.
- The funkcja Redukcja które agregują przetworzone dane i wysłać wyjście do klienta.
Klasa sterownika: Poniżej przedstawiono klasy sterownika, który wiąże mapę i funkcji Redukcja i rozpoczyna przetwarzanie. To program klienta, który inicjuje proces.
Listing1: Program kliencki (Klasa sterownika) inicjowanie procesu
[kod]
pakiet com.mapreduce.techalpine;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* @author Kaushik pal
*
* Jest to główny klasy sterownika do zainicjowania mapę, zmniejszyć
* process. Przedstawia ona w ziemię z powrotem przez cały proces i
* Następnie rozpoczyna się.
*/
public class DevXDriver {
public static void main(Smyczkowy[] args) throws Exception {
// Inicjowanie konfiguracji
configx Konfiguracja = new Configuration();
// Dodaj pliki zasobów
configx.addResource(nowa ścieżka(“/user / Hadoop / core-site.xml”));
configx.addResource(nowa ścieżka(“/user / Hadoop / HDFS-site.xml”));
// Tworzenie MapReduce pracę
devxmapjob Job = new job(configx,”DevXDriver.class”);
devxmapjob.setJarByClass(DevXDriver.class);
devxmapjob.setJobName(“DevX MapReduce Praca”);
// Ustaw wyjście Kay i klasa wartości
devxmapjob.setOutputKeyClass(Text.class);
devxmapjob.setOutputValueClass(Text.class);
// Zestaw klasy Mapa
devxmapjob.setMapperClass(DevXMap.class);
// Ustaw klasy Sumator
devxmapjob.setCombinerClass(DevXReducer.class);
// Zestaw klasy redukcyjna
devxmapjob.setReducerClass(DevXReducer.class);
// Ustaw Mapa kluczowe wyjściowego i wartości klas
devxmapjob.setMapOutputKeyClass(Text.class);
devxmapjob.setMapOutputValueClass(Text.class);
// Ustaw liczbę zadań reduktorem
devxmapjob.setNumReduceTasks(10);
// Ustaw klasy formatów wejściowych i wyjściowych
devxmapjob.setInputFormatClass(TextInputFormat.class);
devxmapjob.setOutputFormatClass(TextOutputFormat.class);
// Ustaw wejście i wyjście ścieżki
FileInputFormat.addInputPath(devxmapjob, nowa ścieżka(“/user / map_reduce / input /”));
FileOutputFormat.setOutputPath(devxmapjob,nowa ścieżka(“/user / map_reduce / wyjścia”));
// Zacznij MapReduce pracę
devxmapjob.waitForCompletion(prawdziwy);
}
}
[/kod]
funkcja map: Jest odpowiedzialny za rozdzielenie danych na podstawie wartości klucza parę. Jest to znane jako mapowanie danych.
Listing2: Jest to funkcja Map dzieląc dane na kawałki
[kod]
pakiet com.mapreduce.techalpine;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.StringTokenizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* @author Kaushik pal
*
* Jest to proces na mapie. Czyni mapowanie dla pary słów kluczowych wartości.
*/
public class DevXMap rozciąga Mapper<LongWritable, Tekst, Tekst,Tekst> {
// Tworzenie ścieżki, BufferedReader i zmienne tekstowe
ścieżka ścieżka_pliku;
BufferedReader buffer_reader;
tweet_values text = new Text();
/**
* kluczem @param
* wartość @param
* @param kontekstu
*/
public void mapie(kluczem LongWritable, wartość tekstowa, kontekst kontekst) {
próbować{
// Utwórz konfigurację mapą
map_config Konfiguracja = new Configuration();
// Załaduj podstawowe pliki Hadoop w konfiguracji
map_config.addResource(nowa ścieżka(“/user / Hadoop / core-site.xml”));
map_config.addResource(nowa ścieżka(“/user / Hadoop / HDFS-site.xml”));
// Tworzenie zmiennych
String searchkeyword = “”;
// Otwórz plik ze ścieżki pliku
ścieżka_pliku = new Path(“Pliki / repozytorium / keys.txt”);
FileSystem system_plików = FileSystem.get(URI.create(“Pliki / repozytorium / keys.txt”),nowa konfiguracja());
// Czytelnik bufora obciążenia
buffer_reader = new BufferedReader(nowy InputStreamReader(file_system.open(file_path)));
podczas gdy(buffer_reader.ready())
{
searchkeyword = buffer_reader.readLine().trym();
}
// Uzyskaj wartość klucza
Ostateczna key_value Text = new Text(Znajdź kluczowe słowo);
// Sprawdź wartość i podjąć decyzję
jeśli(wartość == null)
{
powrót;
}
else{
StringTokenizer string_tokens = new StringTokenizer(Value.ToString(),”,”);
int count = 0;
podczas gdy(string_tokens.hasMoreTokens()) {
count ++;
jeśli(count <= 1)
Kontyntynuj;
String new_tweet_value = string_tokens.nextToken().toLowerCase().trym().Zamień wszystko(“\\*”,””);
jeśli(new_tweet_value.contains(searchkeyword.toLowerCase().trym())) {
tweet_values.set(new_tweet_value);
context.write(kluczowa wartość,tweet_values);
}
}
}
}
złapać(Exception){
e.printStackTrace();
}
}
}
[/kod]
funkcja Redukcja: Jest odpowiedzialny za agregacji. Agregacja jest przeprowadzić na podstawie wartości klucza. Więc po przetworzeniu i sortowania agregacja zakończona i wysyła wynik z powrotem do programu klienckiego.
Listing3: Funkcja Redukcja agreguje przetwarzanych danych
[kod]
pakiet com.mapreduce.techalpine;
import java.io.BufferedReader;
java.io.IOException import;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.RandomAccess;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* @author Kaushik pal
*
* Jest to funkcja reduktor. To agreguje dane wyjściowe oparte na
* Sortowanie par klucz-wartość.
*/
public class DevXReducer rozciąga Reducer<Tekst ,Tekst,Tekst,Tekst>
{
// Tworzenie zmiennych dla ścieżki pliku
ścieżka positive_file_path;
ścieżka negative_file_path;
ścieżka output_file_path;
ścieżka keyword_file_path;
// Tworzenie zmiennych dla bufor
BufferedReader positive_buff_reader;
BufferedReader negative_buff_reader;
BufferedReader keyword_buff_reader;
// Tworzenie zmiennych dla obliczeń
statyczne dwukrotnie total_record_count = new Podwójna(“0”);
statyczne dwukrotnie count_neg = new Podwójna(“0”);
statyczne Podwójne count_pos = new Podwójna(“0”);
statyczne dwukrotnie count_neu = new Podwójna(“0”);
statyczne dwukrotnie percent_neg = new Podwójna(“0”);
statyczne Podwójne percent_pos = new Podwójna(“0”);
statyczne dwukrotnie percent_neu = new Podwójna(“0”);
pattrn_matcher wzór;
mecze matcher_txt;
static int NEW_ROW = 0;
FSDataOutputStream out_1st,out_2nd;
/**
* kluczem @param
* wartości @param
* @param kontekstu
* @throws IOException
* @throws InterruptedException
*/
public void zmniejszyć(Kluczowy tekst, iterowalny<Tekst> values,kontekst kontekst) rzuca IOException, InterruptedException
{
// Utwórz konfigurację reduktora
reduce_config Konfiguracja = new Configuration();
// Hadoop załadować pliki konfiguracyjne
reduce_config.addResource(nowa ścieżka(“/user / Hadoop / core-site.xml”));
reduce_config.addResource(nowa ścieżka(“/user / Hadoop / HDFS-site.xml”));
// Tworzenie zmiennych
String key_word = “”;
String check_keyword = key_word;
keyword_file_path = new Path(“Pliki / repozytorium / keys.txt”);
FileSystem file_system_read = FileSystem.get(URI.create(“Pliki / repozytorium / keys.txt”),nowa konfiguracja());
keyword_buff_reader = new BufferedReader(nowy InputStreamReader(file_system_read.open(keyword_file_path)));
FileSystem get_filesys = FileSystem.get(reduce_config);
FileSystem get_filesys_posneg = FileSystem.get(reduce_config);
Ścieżka path_output = new Path(“/user / sentiment_output_file.txt”);
Ścieżka path_output_posneg = new Path(“/user / posneg_output_file.txt”);
// Uzyskaj słowa kluczowego
podczas gdy(keyword_buff_reader.ready())
{
key_word = keyword_buff_reader.readLine().trym();
}
// Sprawdź system plików
jeśli (!get_filesys.exists(path_output)) {
out_1st = get_filesys.create(path_output);
out_2nd = get_filesys_posneg.create(path_output_posneg);
}
// Sprawdź dopasowanie słów kluczowych za pomocą pozytywnych i negatywnych słowniki
jeśli(check_keyword.equals(key.toString().toLowerCase()))
{
dla(new_tweets tekstowe:values)
{
// Załaduj pozytywny słownika słowo
positive_file_path = new Path(“/user / map_reduce / pos_words.txt”);
FileSystem filesystem_one = FileSystem.get(URI.create(“Pliki / pos_words.txt”),nowa konfiguracja());
positive_buff_reader = new BufferedReader(nowy InputStreamReader(filesystem_one.open(positive_file_path)));
// Załaduj negatywne słowo disctinary
negative_file_path = new Path(“/user / map_reduce / neg_words.txt”);
FileSystem filesystem_two = FileSystem.get(URI.create(“Pliki / neg_words.txt”),nowa konfiguracja());
negative_buff_reader = new BufferedReader(nowy InputStreamReader(filesystem_two.open(negative_file_path)));
++total_record_count;
logiczna first_flag = false;
logiczna second_flag = false;
String all_tweets = new_tweets.toString();
String first_regex = “”;
String second_regex = “”;
podczas gdy(positive_buff_reader.ready())
{
first_regex = positive_buff_reader.readLine().trym();
NEW_ROW ;
pattrn_matcher = Pattern.compile(first_regex, Pattern.CASE_INSENSITIVE);
matcher_txt = pattrn_matcher.matcher(all_tweets);
first_flag = matcher_txt.find();
jeśli(first_flag)
{
out_2nd.writeBytes(all_tweets);
context.write(nowy tekst(first_regex),nowy tekst(all_tweets));
przerwa;
}
}
podczas gdy(negative_buff_reader.ready())
{
NEW_ROW ;
second_regex = negative_buff_reader.readLine().trym();
pattrn_matcher = Pattern.compile(second_regex, Pattern.CASE_INSENSITIVE);
matcher_txt = pattrn_matcher.matcher(all_tweets);
second_flag = matcher_txt.find();
jeśli(second_flag)
{
out_2nd.writeBytes(all_tweets);
context.write(nowy tekst(second_regex),nowy tekst(all_tweets));
przerwa;
}
}
jeśli(first_flag&second_flag)
{
++count_neu;
}
else
{
jeśli(first_flag)
{
++count_pos;
}
jeśli(second_flag)
{
++count_neg;
}
jeśli(first_flag == false&second_flag == false)
{
++count_neu;
}
}
// Zamknij bufory
negative_buff_reader.close();
positive_buff_reader.close();
}
// Oblicz wartości procentowe
percent_pos = count_pos / total_record_count * 100;
percent_neg = count_neg / total_record_count * 100;
percent_neu = count_neu / total_record_count * 100;
próbować{
// Napisz do plików
out_1st.writeBytes(“\n”+key_word);
out_1st.writeBytes(“,”+total_record_count);
out_1st.writeBytes(“,”+percent_neg);
out_1st.writeBytes(“,”+percent_pos);
out_1st.writeBytes(“,”+percent_neu);
// Zamknij systemy plików
out_1st.close();
get_filesys.close();
}złapać(Exception){
e.printStackTrace();
}
}
}
}
[/kod]
Wniosek: W niniejszym artykule omówiłem przetwarzanie MapReduce przy użyciu środowiska programowania Java. Odmienne elementem funkcji jak mapy oraz zmniejszyć wykonać główne zadanie i zwraca wyjście do klienta. Przetwarzanie wykonuje skutecznie na środowisku rozproszonym tylko. Więc powinniśmy ustanowić ramy Apache Hadoop w środowisku rozproszonym, aby uzyskać najlepszy wynik.
Hope you have enjoyed the article and you will be able to implement it in your practical programming. Keep reading.