Steps to write Map Reduce program using Hadoop framework and java

MapReduce

MapReduce

개요: 빅 데이터는 새로운 패러다임과 처리 집중 가장 중요한 영역이다. 빅 데이터는 소셜 미디어와 같은 다양한 소스에서 오는 데이터의 큰 볼륨을 말한다, 센서 데이터, 모바일 데이터, 인터넷 데이터 및 많은. 이 문서에서 우리는 하둡 프레임 워크를 사용하여 처리 부분에 집중 프로그램을지도-줄 것이다. 맵 감소는 분산 구조라는 범용 하드웨어에 대용량의 데이터를 처리하는 데 사용되는 프로그래밍 프레임 워크의 특수한 유형으로 정의 될 수있다. 따라서 문서가 자바 코드를 이용하여 맵을 줄이기 개념 및 그 실제 구현을 설명 할 것이다.

소개: 하둡 MapReduce는이 데이터의 큰 양을 처리하기 위해 사용되는 소프트웨어 프로그래밍 구조로 정의 할 수있다 (테라 바이트 수준) 클러스터 노드의 병렬 환경에서. 클러스터는 범용 하드웨어의 수천 개의 노드로 구성. 처리는 분배, 신뢰성 및 내결함성. 전형적인 MapReduce의 작업은 다음과 같은 단계에 따라 수행된다

1) 키 - 값 쌍에 기반한 독립적 인 덩어리로 데이터를 분할. 이것은 병렬 방식으로지도 작업에 의해 이루어집니다.

2) 지도 작업의 넣어 밖으로는 키 값을 기준으로 정렬.

3) 정렬 된 출력은 줄이고 작업에 입력. 그리고, 그 처리로 최종적인 출력을 생성하고 클라이언트에게 결과를 반환.

Note: 상품 하드웨어는 기본적으로 저렴한 비용으로 컴퓨터 시스템입니다

맵리 듀스 프레임 워크: 아파치 하둡 맵리 듀스 프레임 워크는 자바로 작성. 이 프레임 워크는 마스터 - 슬레이브 구성이 이루어져. TaskTrackers.The 마스터가 슬레이브에 처리 작업을 제어하는, 상기 마스터는 JobTracker에라고하며 슬레이브는 공지 (클러스터 노드 지나지되는 없다). 계산은 노예에서 수행됩니다. 그래서 컴퓨팅 및 스토리지 노드가 클러스터 환경에서 동일. 개념은 ‘ 노드에 계산 이동 데이터는 '저장 위치, 그리고 빨리 처리한다.

맵리 듀스 처리: 맵리 듀스 프레임 워크 모델은 매우 가벼운 무게. 따라서 하드웨어의 비용을 다른 프레임 워크에 비해 낮은. 그러나, 동시에 우리는 처리 노드 여기서 데이터 상주에서 수행 된 바와 같이 모델에만 분산 환경에서 효율적으로 작동하는 것으로 이해해야. 확장 성 등의 다른 기능, 신뢰성과 내결함성은 분산 환경에서 잘 작동합니다.

맵리 듀스 구현: 이제 우리는 자바 프로그래밍 플랫폼을 사용하여 맵리 듀스 모델의 구현에 대해 논의 할 것이다. 다음은 구현을 종료 전체 끝의 다른 구성 요소.

  • The 클라이언트 프로그램 이는 드라이버 클래스되고 프로세스를 개시
  • The 지도 기능 이는 키 - 값 쌍을 사용하여 분할을 수행.
  • The 기능을 감소 이는 처리 된 데이터를 집계하고, 클라이언트에게 출력을 전송할.

드라이버 클래스: 지도를 결합하여 기능을 줄이고 처리를 시작하는 드라이버 클래스는 다음과 같다. 이 과정을 시작하는 클라이언트 프로그램.

목록 1: 클라이언트 프로그램 (드라이버 클래스) 과정을 개시

[코드]

패키지 com.mapreduce.techalpine;

수입 org.apache.commons.logging.Log;

수입 org.apache.commons.logging.LogFactory;

수입 org.apache.hadoop.conf.Configuration;

수입 org.apache.hadoop.fs.Path;

수입 org.apache.hadoop.io.Text;

수입 org.apache.hadoop.mapreduce.Job;

수입 org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

수입 org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

수입 org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

수입 org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**

* @author Kaushik이 친구

*

* 이것은 시작하는 메인 드라이버 클래스이다지도 - 감소

* process. 그것은 전체 프로세스에 대해 다시 그라운드를 설정하고

* 그런 다음 시작.

*/

공용 클래스 DevXDriver {

공공 정적 무효 메인(끈[] 인수) throws Exception {

// 구성을 시작합니다

구성하여 configx = 새로운 구성();

// 리소스 파일 추가

configx.addResource(새로운 경로(“/사용자 / 하둡 / 코어를 site.xml”));

configx.addResource(새로운 경로(“/사용자 / 하둡 / HDFS-에 site.xml”));

// 맵리 듀스 작업을 만듭니다

작업 devxmapjob = 새로운 작업(하여 configx,”DevXDriver.class”);

devxmapjob.setJarByClass(DevXDriver.class);

devxmapjob.setJobName(“DevX 맵리 듀스 작업”);

// 설정 출력 케이와 값 클래스

devxmapjob.setOutputKeyClass(Text.class);

devxmapjob.setOutputValueClass(Text.class);

// 설정지도 클래스

devxmapjob.setMapperClass(DevXMap.class);

// 설정 결합기 클래스

devxmapjob.setCombinerClass(DevXReducer.class);

// 설정 감속기 클래스

devxmapjob.setReducerClass(DevXReducer.class);

// 지도 출력 키와 값 클래스를 설정

devxmapjob.setMapOutputKeyClass(Text.class);

devxmapjob.setMapOutputValueClass(Text.class);

// 감속기 작업의 설정 번호

devxmapjob.setNumReduceTasks(10);

// 입력 및 출력 형식 클래스를 설정

devxmapjob.setInputFormatClass(TextInputFormat.class);

devxmapjob.setOutputFormatClass(TextOutputFormat.class);

// 세트 입력 및 출력 경로

FileInputFormat.addInputPath(devxmapjob, 새로운 경로(“/사용자 / map_reduce / 입력 /”));

FileOutputFormat.setOutputPath(devxmapjob,새로운 경로(“/사용자 / map_reduce / 출력”));

// 맵리 듀스 작업을 시작합니다

devxmapjob.waitForCompletion(참된);

}

}

[/코드]

지도 기능: 이것은 키 - 값 쌍에 기초하여 데이터를 분할하기위한 책임. 이는 데이터의 매핑으로 알려져있다.

Listing2: 이 데이터 청크로 분할 맵 함수

[코드]

패키지 com.mapreduce.techalpine;

수입 java.io.BufferedReader의;

수입 java.io.InputStreamReader;

수입 java.net.URI의;

수입 java.util.StringTokenizer의;

수입 org.apache.commons.logging.Log;

수입 org.apache.commons.logging.LogFactory;

수입 org.apache.hadoop.conf.Configuration;

수입 org.apache.hadoop.fs.FileSystem;

수입 org.apache.hadoop.fs.Path;

수입 org.apache.hadoop.io.LongWritable;

수입 org.apache.hadoop.io.Text;

수입 org.apache.hadoop.mapreduce.Mapper;

/**

* @author Kaushik이 친구

*

* 이지도 과정. 이 키워드 값 쌍에 대한 매핑을 수행.

*/

공용 클래스 DevXMap는 매퍼를 확장<LongWritable, 본문, 본문,본문> {

// 경로 만들기, 의 BufferedReader와 텍스트 변수

경로 FILE_PATH;

의 BufferedReader의 buffer_reader;

텍스트 tweet_values ​​= 새로운 텍스트();

/**

* @param 키

* @param 값

* @param 컨텍스트

*/

공공 무효지도(LongWritable 키, 텍스트 값, 컨텍스트 컨텍스트) {

시도{

// 지도에 대한 구성을 만듭니다

구성 map_config = 새로운 구성();

// 구성에서 하둡 코어 파일을로드

map_config.addResource(새로운 경로(“/사용자 / 하둡 / 코어를 site.xml”));

map_config.addResource(새로운 경로(“/사용자 / 하둡 / HDFS-에 site.xml”));

// 변수를 만듭니다

문자열 searchkeyword = “”;

// 파일 경로에서 파일 열기

FILE_PATH = 새로운 경로(“파일 / 저장소 / keys.txt”);

파일 시스템 인 file_system = FileSystem.get(URI.create(“파일 / 저장소 / keys.txt”),새로운 구성());

// 로드 버퍼 리더

buffer_reader = 새의 BufferedReader(새에 InputStreamReader(file_system.open(file_path)));

while(buffer_reader.ready())

{

searchkeyword = buffer_reader.readLine().손질();

}

// 키 값을 가져 오기

최종 텍스트 key_value = 새로운 텍스트(searchkeyword);

// 값을 확인하고 결정을

면(값 == 널 (null))

{

return;

}

그밖에{

그러면 StringTokenizer string_tokens = 새로운 그러면 StringTokenizer(value.toString(),”,”);

INT 카운트 = 0;

while(string_tokens.hasMoreTokens()) {

count ++;

면(count <= 1)

잇다;

문자열 new_tweet_value = string_tokens.nextToken().와 toLowerCase().손질().대체하기(“\\*”,””);

면(new_tweet_value.contains(searchkeyword.toLowerCase().손질())) {

tweet_values.set(new_tweet_value);

context.write(key_value,tweet_values);

}

}

}

}

잡기(예외 전자){

e.printStackTrace();

}

}

}

[/코드]

기능을 감소: 이는 데이터를 응집을 담당. 집계는 키 값에 기초하여 이루어진다. 따라서 소트 처리 후의 응집을 완료하고, 클라이언트 프로그램에 결과를 다시 보낸다.

Listing3: 줄이기 기능은 처리 된 데이터를 집계

[코드]

패키지 com.mapreduce.techalpine;

수입 java.io.BufferedReader의;

수입 java.io.IOException;

수입 java.io.InputStreamReader;

수입 java.net.URI의;

수입 java.util.RandomAccess;

수입 java.util.regex.Matcher;

수입 있으며, java.util.regex.Pattern;

수입 org.apache.commons.logging.Log;

수입 org.apache.commons.logging.LogFactory;

수입 org.apache.hadoop.conf.Configuration;

수입 org.apache.hadoop.fs.FSDataOutputStream;

수입 org.apache.hadoop.fs.FileSystem;

수입 org.apache.hadoop.fs.Path;

수입 org.apache.hadoop.io.Text;

수입 org.apache.hadoop.mapreduce.Reducer;

/**

* @author Kaushik이 친구

*

* 이 감속기 기능입니다. 그것은에 기초하여 출력을 합산

* 키 - 값 쌍의 정렬.

*/

공용 클래스 DevXReducer는 감속기를 확장<본문 ,본문,본문,본문>

{

// 파일 경로에 대한 변수를 만듭니다

경로 positive_file_path;

경로 negative_file_path;

경로 output_file_path;

경로 keyword_file_path;

// 버퍼 변수를 만듭니다

의 BufferedReader의 positive_buff_reader;

의 BufferedReader의 negative_buff_reader;

의 BufferedReader keyword_buff_reader;

// 계산을위한 변수를 만듭니다

정적을 두 번 total_record_count = 새로운 더블(“0”);

정적을 두 번 count_neg = 새로운 더블(“0”);

정적을 두 번 count_pos = 새로운 더블(“0”);

정적을 두 번 count_neu = 새로운 더블(“0”);

정적을 두 번 percent_neg = 새로운 더블(“0”);

정적을 두 번 percent_pos = 새로운 더블(“0”);

정적을 두 번 percent_neu = 새로운 더블(“0”);

패턴 pattrn_matcher;

경기는 matcher_txt;

정적 INT NEW_ROW = 0;

FSDataOutputStream out_1st,out_2nd;

/**

* @param 키

* @param 값

* @param 컨텍스트

* @throws IOException이

* @throws 예외 : InterruptedException

*/

공공 무효 감소(텍스트 키, 의 Iterable<본문> 값,컨텍스트 컨텍스트) 예외 : IOException가 슬로우, 예외 : InterruptedException

{

// 감속기에 대한 구성을 만듭니다

구성 reduce_config = 새로운 구성();

// 로드 하둡 설정 파일

reduce_config.addResource(새로운 경로(“/사용자 / 하둡 / 코어를 site.xml”));

reduce_config.addResource(새로운 경로(“/사용자 / 하둡 / HDFS-에 site.xml”));

// 변수를 만듭니다

문자열 key_word = “”;

문자열 check_keyword = key_word;

keyword_file_path = 새로운 경로(“파일 / 저장소 / keys.txt”);

파일 시스템 file_system_read = FileSystem.get(URI.create(“파일 / 저장소 / keys.txt”),새로운 구성());

keyword_buff_reader = 새의 BufferedReader(새에 InputStreamReader(file_system_read.open(keyword_file_path)));

파일 시스템 get_filesys = FileSystem.get(reduce_config);

파일 시스템 get_filesys_posneg = FileSystem.get(reduce_config);

경로 path_output = 새로운 경로(“/사용자 / sentiment_output_file.txt”);

경로 path_output_posneg = 새로운 경로(“/사용자 / posneg_output_file.txt”);

// 키워드를 받기

while(keyword_buff_reader.ready())

{

key_word = keyword_buff_reader.readLine().손질();

}

// 파일 시스템 검사

면 (!get_filesys.exists(path_output)) {

out_1st = get_filesys.create(path_output);

out_2nd = get_filesys_posneg.create(path_output_posneg);

}

// 긍정과 부정 사전을 사용하여 키워드 검색을 확인

면(check_keyword.equals(key.toString().와 toLowerCase()))

{

용(텍스트 new_tweets:값)

{

// 긍정적 인 단어 사전을로드

positive_file_path = 새로운 경로(“/사용자 / map_reduce / pos_words.txt”);

파일 시스템 filesystem_one = FileSystem.get(URI.create(“파일 / pos_words.txt”),새로운 구성());

positive_buff_reader = 새의 BufferedReader(새에 InputStreamReader(filesystem_one.open(positive_file_path)));

// 부정적인 단어 disctinary를로드

negative_file_path = 새로운 경로(“/사용자 / map_reduce / neg_words.txt”);

파일 시스템 filesystem_two = FileSystem.get(URI.create(“파일 / neg_words.txt”),새로운 구성());

negative_buff_reader = 새의 BufferedReader(새에 InputStreamReader(filesystem_two.open(negative_file_path)));

++total_record_count;

부울 first_flag = 거짓;

부울 second_flag = 거짓;

문자열 all_tweets = new_tweets.toString();

문자열 first_regex = “”;

문자열 second_regex = “”;

while(positive_buff_reader.ready())

{

first_regex = positive_buff_reader.readLine().손질();

NEW_ROW ;

pattrn_matcher =는 Pattern.compile(first_regex, Pattern.CASE_INSENSITIVE);

matcher_txt = pattrn_matcher.matcher(all_tweets);

first_flag = matcher_txt.find();

 

면(first_flag)

{

out_2nd.writeBytes(all_tweets);

context.write(새 텍스트(first_regex),새 텍스트(all_tweets));

단절;

}

}

while(negative_buff_reader.ready())

{

NEW_ROW ;

second_regex = negative_buff_reader.readLine().손질();

pattrn_matcher =는 Pattern.compile(second_regex, Pattern.CASE_INSENSITIVE);

matcher_txt = pattrn_matcher.matcher(all_tweets);

second_flag = matcher_txt.find();

면(second_flag)

{

out_2nd.writeBytes(all_tweets);

context.write(새 텍스트(second_regex),새 텍스트(all_tweets));

단절;

}

}

면(first_flag&second_flag)

{

++count_neu;

}

그밖에

{

면(first_flag)

{

++count_pos;

}

면(second_flag)

{

++count_neg;

}

면(first_flag == 거짓&second_flag == 거짓)

{

++count_neu;

}

}

// 닫기 버퍼

negative_buff_reader.close();

positive_buff_reader.close();

}

// 백분율 값을 계산할

percent_pos = count_pos / total_record_count * 100;

percent_neg = count_neg / total_record_count * 100;

percent_neu = count_neu / total_record_count * 100;

시도{

// 파일에 쓰기

out_1st.writeBytes(“\N”+예어);

out_1st.writeBytes(“,”+total_record_count);

out_1st.writeBytes(“,”+percent_neg);

out_1st.writeBytes(“,”+percent_pos);

out_1st.writeBytes(“,”+percent_neu);

// 닫기 파일 시스템

out_1st.close();

get_filesys.close();

}잡기(예외 전자){

e.printStackTrace();

}

}

}

}

[/코드]

결론: 이 글에서 나는 자바 프로그래밍 환경을 사용하여 맵리 듀스 처리를 논의했습니다. 지도 축소 기능 등 다른 성분은 기본 작업을 수행하고, 클라이언트에 출력을 리턴. 프로세싱은 분산 환경에서 효과적으로 수행 만. 그래서 우리는 최선의 결과를 얻기 위해 분산 환경에 아파치 하둡 프레임 워크를 설정해야합니다.

희망 당신은 기사를 즐겼다 당신은 당신의 실제적인 프로그래밍을 구현할 수있을 것이다. Keep reading.

Tagged on: ,
============================================= ============================================== 아마존에서 최고의 Techalpine 책을 구입하십시오,en,전기 기술자 CT 밤나무 전기,en
============================================== ---------------------------------------------------------------- electrician ct chestnutelectric
error

Enjoy this blog? Please spread the word :)

Follow by Email
LinkedIn
LinkedIn
Share