溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

MapReduce的入門

發布時間:2020-07-01 04:55:04 來源:網絡 閱讀:385 作者:原生zzy 欄目:大數據

1. MapReduce 的介紹:

   MapReduce 是一個分布式運算程序的編程框架,核心功能是將用戶編寫的業務邏輯代碼和自帶默認組件整合成一個完整的分布式運算程序,并發運行在一個 Hadoop 集群上。
  MapReduce大體上分三個部分:
  - MRAppMaster:MapReduce Application Master,分配任務,協調任務的運行
  - MapTask:階段并發任,負責 mapper 階段的任務處理 YARNChild
  - ReduceTask:階段匯總任務,負責 reducer 階段的任務處理 YARNChild

2.MapReduce編寫代碼的流程:

  • 用戶編寫的程序分成三個部分:Mapper,Reducer,Driver(提交運行 MR 程序的客戶端)
  • Mapper 的輸入數據是 KV 對的形式(KV 的類型可自定義)
  • Mapper 的輸出數據是 KV 對的形式(KV 的類型可自定義)
  • Mapper 中的業務邏輯寫在 map()方法中
  • map()方法(maptask 進程)對每一個<K,V>調用一次
  • Reducer 的輸入數據類型對應 Mapper 的輸出數據類型,也是 KV 對的形式
  • Reducer 的業務邏輯寫在 reduce()方法中
  • Reducetask 進程對每一組相同 k 的<K,V>組調用一次 reduce()方法
  • 用戶自定義的 Mapper 和 Reducer 都要繼承各自的父類
  • 整個程序需要一個 Drvier 來進行提交,提交的是一個描述了各種必要信息的 job 對象

3.WordCount 案例:

public class MyWordCount {
    public static void main(String[] args) {
        // 指定 hdfs 相關的參數
        Configuration conf=new Configuration(true);
        conf.set("fs.defaultFS","hdfs://hadoop01:9000");
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        try {
            // 新建一個 job 任務
            Job job=Job.getInstance(conf);
            // 設置 jar 包所在路徑
           job.setJarByClass(MyWordCount.class);
            // 指定 mapper 類和 reducer 類
            job.setMapperClass(Mapper.class);
            job.setReducerClass(MyReduce.class);

            // 指定 maptask 的輸出類型,注意,如果maptask的輸出類型與reducetask輸出類型一樣,mapTask可以不用設置
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            // 指定 reducetask 的輸出類型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            // 指定該 mapreduce 程序數據的輸入和輸出路徑
            Path input=new Path("/data/input");
            Path output =new Path("/data/output");
            //一定要保證output不存在
            if(output.getFileSystem(conf).exists(output)){
                output.getFileSystem(conf).delete(output,true);  //遞歸刪除
            }
            FileInputFormat.addInputPath(job,input);
            FileOutputFormat.setOutputPath(job,output);

            // 最后提交任務
             boolean success = job.waitForCompletion(true);
             System.exit(success?0:-1);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
    private class MyMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
        Text mk =new Text();
        IntWritable mv=new IntWritable(1);

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // 計算任務代碼:切割單詞,輸出每個單詞計 1 的 key-value 對
             String[] words = value.toString().split("\\s+");
             for(String word:words){
                 mk.set(word);
                 context.write(mk,mv);
             }
        }
    }
    private class MyReduce extends Reducer<Text,IntWritable,Text,IntWritable> {
        IntWritable mv=new IntWritable();
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum=0;
            // 匯總計算代碼:對每個 key 相同的一組 key-value 做匯總統計
            for(IntWritable value:values){
                sum+=value.get();
            }
            mv.set(sum);
            context.write(key,mv);
        }
    }
}

4. MapReduce 程序的核心運行機制:

1)MapReduce 程序的運行流程:
  • 一個 mr 程序啟動的時候,最先啟動的是 MRAppMaster,MRAppMaster 啟動后根據本次job 的描述信息,計算出需要的 maptask 實例數量,然后向集群申請機器啟動相應數量的maptask 進程
  • maptask 進程啟動之后,根據給定的數據切片(哪個文件的哪個偏移量范圍)范圍進行數據處理,主體流程為:
    • 利用客戶指定的 InputFormat 來獲取 RecordReader 讀取數據,形成輸入 KV 對
    • 將輸入 KV 對傳遞給客戶定義的 map()方法,做邏輯運算,并將 map()方法輸出的 KV 對收集到緩存
    • 將緩存中的 KV 對按照 K 分區排序后不斷溢寫到磁盤文件
  • MRAppMaster 監控到所有 maptask 進程任務完成之后(真實情況是,某些 maptask 進程處理完成后,就會開始啟動 reducetask 去已完成的 maptask 處 fetch 數據),會根據客戶指定的參數啟動相應數量的 reducetask 進程,并告知 reducetask 進程要處理的數據范圍(數據分區)
  • Reducetask 進程啟動之后,根據 MRAppMaster 告知的待處理數據所在位置,從若干臺maptask 運行所在機器上獲取到若干個 maptask 輸出結果文件,并在本地進行重新歸并排序,然后按照相同 key 的 KV 為一個組,調用客戶定義的 reduce()方法進行邏輯運算,并收集運算輸出的結果 KV,然后調用客戶指定的 OutputFormat 將結果數據輸出到外部存儲
    2)MapTask 并行度決定機制:

       一個 job 的 map 階段并行度由客戶端在提交 job 時決定,客戶端對 map 階段并行度的規劃的基本邏輯為:將待處理數據執行邏輯切片(即按照一個特定切片大小,將待處理數據劃分成邏輯上的多個 split),然后每一個 split 分配一個 mapTask 并行實例處理。這段邏輯及形成的切片規劃描述文件,是由FileInputFormat實現類的getSplits()方法完成的,小編后續會對MPjob提交過程的源碼進行詳細的分析。
       決定map task的個數主要由這幾個方面:
        -文件的大小
        - 文件的個數
        - block的大小
        - 邏輯切片的大小
       總的來說就是,當對文件進行邏輯劃分的時候,默認的劃分規則就是一個split和一個block的大小一樣,如果文件沒有到一個block大小,也會被切分出來一個split,這里有調優點,就是如果處理的文件都是小文件的話,那么機會并行很多的maptask,導致大量的時間都浪費在了啟動jvm上,此時可以通過合并小文件或者重用jvm的方式提高效率。
       邏輯切片機制
    long splitSize = computeSplitSize(blockSize, minSize, maxSize)
    blocksize:默認是 128M,可通過 dfs.blocksize 修改
    minSize:默認是 1,可通過 mapreduce.input.fileinputformat.split.minsize 修改
    maxsize:默認是 Long.MaxValue,可通過mapreduce.input.fileinputformat.split.maxsize 修改
    因此,如果是想調小split的大小,那么就將 maxsize調整到比block小。
    如果是想調大split的大小,那么就將minSize調整到比block大。

    3)ReduceTask 并行度決定機制:

       reducetask 的并行度同樣影響整個 job 的執行并發度和執行效率,但與 maptask 的并發數由切片數決定不同,Reducetask 數量的決定是可以直接手動設置:job.setNumReduceTasks(4);,默認是1個,如果設置為0個表示沒有reduce階段,當然也可以設置多個,根據需求,如果有些需要全局計數的操作,那么只能設置1個reduce,有些可以設置多個reducetask的,千萬不要設置太多,最好設置的和分區的個數能一一對應,不然的會就會有一些reduceTask空跑,導致了不能處理業務而且還占用系統資源。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女