大數據學習路線之mapreduce概述,mapreduce:分布式并行離線計算框架,是一個分布式運算程序的編程框架,是用戶開發“基于hadoop的數據分析應用”的核心框架;Mapreduce核心功能是將用戶編寫的業務邏輯代碼和自帶默認組件整合成一個完整的分布式運算程序,并發運行在一個hadoop集群上;
與HDFS解決問題的原理類似,HDFS是將大的文件切分成若干小文件,然后將它們分別存儲到集群中各個主機中。
同樣原理,mapreduce是將一個復雜的運算切分成若個子運算,然后將它們分別交給集群中各個主機,由各個主機并行運算。
1.1 mapreduce產生的背景
海量數據在單機上處理因為硬件資源限制,無法勝任。
而一旦將單機版程序擴展到集群來分布式運行,將極大增加程序的復雜度和開發難度。
引入mapreduce框架后,開發人員可以將絕大部分工作集中在業務邏輯的開發上,而將分布式計算中的復雜×××由框架來處理。
1.2 mapreduce編程模型
一種分布式計算模型。
MapReduce將這個并行計算過程抽象到兩個函數。
Map(映射):對一些獨立元素組成的列表的每一個元素進行指定的操作,可以高度并行。
Reduce(化簡 歸約):對一個列表的元素進行合并。
一個簡單的MapReduce程序只需要指定map()、reduce()、input和output,剩下的事由框架完成。
Job :用戶的每一個計算請求稱為一個作業。
Task:每一個作業,都需要拆分開了,交由多個主機來完成,拆分出來的執行單位就是任務。
Task又分為如下三種類型的任務:
Map:負責map階段的整個數據處理流程
Reduce:負責reduce階段的整個數據處理流程
MRAppMaster:負責整個程序的過程調度及狀態協調
具體流程說明:
一個mr程序啟動的時候,最先啟動的是MRAppMaster,MRAppMaster啟動后根據本次job的描述信息,計算出需要的maptask實例數量,然后向集群申請機器啟動相應數量的maptask進程
maptask進程啟動之后,根據給定的數據切片范圍進行數據處理,主體流程為:
–?利用客戶指定的inputformat來獲取RecordReader讀取數據,形成輸入KV對。
–?將輸入KV(k是文件的行號,v是文件一行的數據)對傳遞給客戶定義的map()方法,做邏輯運算,并將map()方法輸出的KV對收集到緩存。
–?將緩存中的KV對按照K分區排序后不斷溢寫到磁盤文件
MRAppMaster監控到所有maptask進程任務完成之后,會根據客戶指定的參數啟動相應數量的reducetask進程,并告知reducetask進程要處理的數據范圍(數據分區)
Reducetask進程啟動之后,根據MRAppMaster告知的待處理數據所在位置,從若干臺maptask運行所在機器上獲取到若干個maptask輸出結果文件,并在本地進行重新歸并排序,然后按照相同key的KV為一個組,調用客戶定義的reduce()方法進行邏輯運算,并收集運算輸出的結果KV,然后調用客戶指定的outputformat將結果數據輸出到外部存儲
基于MapReduce 計算模型編寫分布式并行程序非常簡單,程序員的主要編碼工作就是實現Map 和Reduce函數。
其它的并行編程中的種種復雜問題,如分布式存儲,工作調度,負載平衡,容錯處理,網絡通信等,均由YARN框架負責處理。
MapReduce中,map和reduce函數遵循如下常規格式:
?map: (K1, V1) → list(K2, V2)
reduce: (K2, list(V2)) → list(K3, V3)
Mapper的接口:
?protected?void?map(KEY key, VALUE value, Context?context)?
????throws?IOException, InterruptedException?{ ?
}
Reduce的接口:
?protected?void?reduce(KEY key, Iterable<VALUE> values,
?Context?context)?throws?IOException, InterruptedException?{?
}
Mapreduce程序代碼基本結構
用戶編寫的程序分成三個部分:Mapper,Reducer,Driver(提交運行mr程序的客戶端)
Mapper的輸入數據是KV對的形式(KV的類型可自定義)
Mapper的輸出數據是KV對的形式(KV的類型可自定義)
Mapper中的業務邏輯寫在map()方法中
map()方法(maptask進程)對每一個<K,V>調用一次
Reducer的輸入數據類型對應Mapper的輸出數據類型,也是KV
Reducer的業務邏輯寫在reduce()方法中
Reducetask進程對每一組相同k的<k,v>組調用一次reduce()方法
用戶自定義的Mapper和Reducer都要繼承各自的父類
整個程序需要一個Drvier來進行提交,提交的是一個描述了各種必要信息的job對象
需求:有一批文件(規模為TB級或者PB級),如何統計這些文件中所有單詞出現次數
?如有三個文件,文件名是qfcourse.txt、qfstu.txt 和 qf_teacher
?qf_course.txt內容:
?php java linux
bigdata VR
C C++ java web
linux shell
?qf_stu.txt內容:
?tom jim lucy
lily sally
andy
tom jim sally
?qf_teacher內容:
?jerry Lucy tom
jim
方案
–?分別統計每個文件中單詞出現次數?- map()
–?累加不同文件中同一個單詞出現次數?- reduce()
實現代碼
–?創建一個簡單的maven項目
–?添加hadoop client依賴的jar,pom.xml主要內容如下:
?<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
?
–?編寫代碼
–?自定義一個mapper類
?import?java.io.IOException;
??import?org.apache.hadoop.io.IntWritable;
??import?org.apache.hadoop.io.LongWritable;
??import?org.apache.hadoop.io.Text;
??import?org.apache.hadoop.mapreduce.Mapper;
??/**
???*?Maper里面的泛型的四個類型從左到右依次是:
???*?
???*?LongWritable KEYIN:?默認情況下,是mr框架所讀到的一行文本的起始偏移量,Long,??類似于行號但是在hadoop中有自己的更精簡的序列化接口,所以不直接用Long,而用LongWritable?
???*?Text VALUEIN:默認情況下,是mr框架所讀到的一行文本的內容,String,同上,用Text
???*
???*?Text KEYOUT:是用戶自定義邏輯處理完成之后輸出數據中的key,在此處是單詞,String,同上,用Text
???*?IntWritable VALUEOUT:是用戶自定義邏輯處理完成之后輸出數據中的value,在此處是單詞次數,Integer,同上,用IntWritable
???*/
??public?class?WordcountMapper?extends?Mapper<LongWritable, Text, Text, IntWritable>{
?? /**
?? ?*?map階段的業務邏輯就寫在自定義的map()方法中
?? ?*?maptask會對每一行輸入數據調用一次我們自定義的map()方法
?? ?*/
?? @Override
?? protected?void?map(LongWritable key, Text?value, Context?context)?throws?IOException, InterruptedException?{
??
?? //將maptask傳給我們的一行的文本內容先轉換成String
?? String?line = value.toString();
?? //根據空格將這一行切分成單詞
?? String[] words = line.split(" ");
??
?? /**
?? ?*將單詞輸出為<單詞,1>?
?? ?*如<lily,1>?<lucy,1>??<c,1>?<c++,1>?<tom,1>?
?? ?*/
?? for(String?word:words){
?? //將單詞作為key,將次數1作為value,以便于后續的數據分發,可以根據單詞分發,以便于相同單詞會到相同的reduce task
?? context.write(new?Text(word),?new?IntWritable(1));
?? }
?? }
??}
?
–?自定義一個reduce類
??import?java.io.IOException;
??import?org.apache.hadoop.io.IntWritable;
??import?org.apache.hadoop.io.Text;
??import?org.apache.hadoop.mapreduce.Reducer;
??/**
???*?Reducer里面的泛型的四個類型從左到右依次是:
???*? Text KEYIN:?對應mapper輸出的KEYOUT
???*? IntWritable VALUEIN:?對應mapper輸出的VALUEOUT
???*?
???*? KEYOUT,?是單詞
???*? VALUEOUT 是自定義reduce邏輯處理結果的輸出數據類型,是總次數
???*/
??public?class?WordcountReducer?extends?Reducer<Text, IntWritable, Text, IntWritable>{
?? /**
?? ?*?<tom,1>
?? ?*?<tom,1>
?? ?*?<linux,1>
?? ?*?<banana,1>
?? ?*?<banana,1>
?? ?*?<banana,1>
?? ?*?入參key,是一組相同單詞kv對的key
?? ?*?values是若干相同key的value集合
?? ?*?如?<tom,[1,1]>???<linux,[1]>???<banana,[1,1,1]>
?? ?*/
?? @Override
?? protected?void?reduce(Text?key, Iterable<IntWritable> values, Context?context)?throws?IOException, InterruptedException?{
?? int?count=0; ?//累加單詞的出現的次數
??
?? for(IntWritable value:values){
?? count += value.get();
?? }
?? context.write(key,?new?IntWritable(count));
?? }
??}
?
–?編寫一個Driver類
???import?org.apache.hadoop.conf.Configuration;
??import?org.apache.hadoop.fs.Path;
??import?org.apache.hadoop.io.IntWritable;
??import?org.apache.hadoop.io.Text;
??import?org.apache.hadoop.mapreduce.Job;
??import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
??import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
??/**
???*?相當于一個yarn集群的客戶端
???*?需要在此封裝我們的mr程序的相關運行參數,指定jar包
???*?最后提交給yarn
???*/
??public?class?WordcountDriver {
?? /**
?? ?*?該類是運行在hadoop客戶端的,main一運行,yarn客戶端就啟動起來了,與yarn服務器端通信
?? ?*?yarn服務器端負責啟動mapreduce程序并使用WordcountMapper和WordcountReducer類
?? ?*/
?? public?static?void?main(String[] args)?throws?Exception?{
?? //此代碼需要兩個輸入參數 ?第一個參數支持要處理的源文件;第二個參數是處理結果的輸出路徑
?? if?(args ==?null?|| args.length?== 0) {
?? args =?new?String[2];
?????????????//路徑都是 hdfs系統的文件路徑
?? args[0] = "hdfs://192.168.18.64:9000/wordcount/input/";
?? args[1] = "hdfs://192.168.18.64:9000/wordcount/output";
?? }
?? /**
?? ?*?什么也不設置時,如果在安裝了hadoop的機器上運行時,自動讀取
?? ?*?/home/hadoop/app/hadoop-2.7.1/etc/hadoop/core-site.xml
?? ?*?文件放入Configuration中
?? ?*/
?? Configuration?conf =?new?Configuration();
?? Job job = Job.getInstance(conf);
??
?? //指定本程序的jar包所在的本地路徑
?? job.setJarByClass(WordcountDriver.class);
??
?? //指定本業務job要使用的mapper業務類
?? job.setMapperClass(WordcountMapper.class);
?? //指定mapper輸出數據的kv類型
?? job.setMapOutputKeyClass(Text.class);
?? job.setMapOutputValueClass(IntWritable.class);
????????
?????????//指定本業務job要使用的Reducer業務類
?????????job.setReducerClass(WordcountReducer.class);
?? //指定最終輸出的數據的kv類型
?? job.setOutputKeyClass(Text.class);
?? job.setOutputValueClass(IntWritable.class);
??
?? //指定job的輸入原始文件所在目錄
?? FileInputFormat.setInputPaths(job,?new?Path(args[0]));
?? //指定job的輸出結果所在目錄
?? FileOutputFormat.setOutputPath(job,?new?Path(args[1]));
??
?? //將job中配置的相關參數,以及job所用的java類所在的jar包,提交給yarn去運行
?? /*job.submit();*/
?? boolean?res = job.waitForCompletion(true);
?? System.exit(res?0:1);
?? }
??}
wordcount處理過程
將文件拆分成splits,由于測試用的文件較小,所以每個文件為一個split,并將文件按行分割形成<key,value>對,下圖所示。這一步由MapReduce框架自動完成,其中偏移量(即key值)包括了回車所占的字符數(Windows/Linux環境不同)。
將分割好的<key,value>對交給用戶定義的map方法進行處理,生成新的<key,value>對,下圖所示。
得到map方法輸出的<key,value>對后,Mapper會將它們按照key值進行排序,并執行Combine過程,將key至相同value值累加,得到Mapper的最終輸出結果。下圖所示。
Reducer先對從Mapper接收的數據進行排序,再交由用戶自定義的reduce方法進行處理,得到新的<key,value>對,并作為WordCount的輸出結果,下圖所示。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。