溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

MapReduce怎么使用

發布時間:2021-12-30 13:59:44 來源:億速云 閱讀:165 作者:iii 欄目:云計算

本篇內容主要講解“MapReduce怎么使用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“MapReduce怎么使用”吧!

  1. 什么是MR

    MR是一種分布計算模型,主要用來解決海量數據的計算問題的。它包含了兩種計算函數,一個是Mapping,另外一個是Reducing。Mapping對集合內的每個目標做同一個操作,Reduceing則是遍歷集合中的元素返回一個綜合的結果。我們操作代碼時,只需要重寫map和reduce方法就行,十分簡單。這兩個函數的形參都是k,v對,當數據量到達10PB以上時,則會速度變慢。

  2. MR執行過程

    MR程序啟動時,會把輸入文件轉化成<k1,v1>鍵值對傳給map函數,有幾個鍵值對就執行幾次map函數,但不是說有幾個鍵值對就有幾個Mapper進程,這是不對的。經過map函數處理,變成<k2,v2>鍵值對。由<k2,v2>轉變成reduce函數的輸入<k2,{v2,.....}>的過程被稱之為shuffle。shuffle并不是象map和reduce這樣的某個函數,不是需要單獨拿出節點運行的,它僅僅只是一個過程。<k2,{v2...}>進過reduce函數處理,變成了最后的輸出<k3,v3>。在到達reduce函數之前,鍵值對的數目是不變的。

                  Map階段

     (1).根據輸入文件解析成<k1,v1>對,每一對調用一次map函數

     (2).根據自己編寫的map函數,將鍵值對處理,變成新的<k2,v2>鍵值對輸出 

     (3).對輸出的鍵值對進行分區,不同分區對應著不同的Reducer進程

     (4).每個分區中的鍵值對,根據key進行排序,分組。然后把相同key的val放到同一個集合中。

      (5).進行規約(可選)

                     Reduce階段

         (1).多個map函數輸出的kv對,按照不同分區,傳輸到不同的reduce節點上。

          (2).將多個map函數輸出的kv對合并,排序。根據reduce函數邏輯,處理<k2,{v2..}>,轉換成新的鍵值對輸出

         (3).輸出保存文件

  3.簡單例子

      Wordcount

public class WordCount {
 public static class  MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
  Text k2=new Text();
  LongWritable v2=new LongWritable();
  @Override
  protected void map(LongWritable k1, Text v1,Context context)
    throws IOException, InterruptedException {
    String[] words=v1.toString().split("\t");
    for (String string : words) {
     k2.set(string);
     v2.set(1L);
    context.write(k2, v2);
   }
  }
 }
 public static class MyReduce extends Reducer<Text, LongWritable, Text, LongWritable>{
  LongWritable v3=new LongWritable();
  @Override
  protected void reduce(Text k2, Iterable<LongWritable> v2s,Context context) throws IOException, InterruptedException {
   long sum=0; 
   for (LongWritable longWritable : v2s) {
    sum=sum+longWritable.get();
   }
   v3.set(sum);
   context.write(k2, v3);
  }
  
 }
 public static void main(String[] args) throws Exception {
  Configuration conf=new Configuration();
  Job job=Job.getInstance(conf, WordCount.class.getSimpleName());
  job.setJarByClass(WordCount.class);
  job.setMapperClass(MyMapper.class);
  job.setReducerClass(MyReduce.class);
  
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(LongWritable.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(LongWritable.class);
  
  FileInputFormat.setInputPaths(job, new Path("hdfs://115.28.138.100:9000/a.txt"));
  FileOutputFormat.setOutputPath(job, new Path("hdfs://115.28.138.100:9000/out4"));
  
  job.waitForCompletion(true);
 } 
 
}

4.MR的序列化

   序列化就是把結構化的對象轉換為字節流,在MR中,他沒有用java自己的序列化,而是自己實現了一套序列化。因為相比較而言,hadoop的序列化有著諸多優點。在mr程序中,我們的參數和輸出的鍵值對全都是實現了序列化的對象,當我們需要自訂一個序列化對象,該如何操作呢?只需要實現Writable接口即可,當然key需要實現WritableComparable接口,因為需要根據key來排序和分組。

   接著有個小例子來展示序列化。就是電信流量的處理例子。

public class LiuLiang {
 public static class MyMapper extends Mapper<LongWritable, Text, Text, MyArrayWritable>{
  Text k2=new Text();
  MyArrayWritable v2=new MyArrayWritable();
  LongWritable v21=new LongWritable();
  LongWritable v22=new LongWritable();
  LongWritable v23=new LongWritable();
  LongWritable v24=new LongWritable();
  LongWritable[] values=new LongWritable[4];
  @Override
  protected void map(LongWritable k1, Text v1, Context context)
      throws IOException, InterruptedException {
    String[] words=v1.toString().split("\t");
    k2.set(words[1]);
    v21.set(Long.parseLong(words[6]));
    v22.set(Long.parseLong(words[7]));
    v23.set(Long.parseLong(words[8]));
    v24.set(Long.parseLong(words[9]));
    values[0]=v21;
    values[1]=v22;
    values[2]=v23;
    values[3]=v24;
    v2.set(values);
    context.write(k2, v2);
  }
 }
 public static class MyReduce extends Reducer<Text, MyArrayWritable, Text, Text>{
  Text v3=new Text();
  @Override
  protected void reduce(Text k2, Iterable<MyArrayWritable> v2s, Context context)
      throws IOException, InterruptedException {
    long sum1=0;
    long sum2=0;
    long sum3=0;
    long sum4=0;
    for (MyArrayWritable myArrayWritable : v2s) {
     Writable[] values= myArrayWritable.get();
     sum1=sum1+((LongWritable)values[0]).get();
     sum2=sum2+((LongWritable)values[1]).get();
     sum3=sum3+((LongWritable)values[2]).get();
     sum4=sum4+((LongWritable)values[3]).get();
   }
    v3.set("\t"+sum1+"\t"+sum2+"\t"+sum3+"\t"+sum4);
    context.write(k2, v3);
  }
 }
 public static void main(String[] args) throws Exception {
  Configuration conf=new Configuration();
  Job job=Job.getInstance(conf, LiuLiang.class.getSimpleName());
  job.setJarByClass(LiuLiang.class);
  job.setMapperClass(MyMapper.class);
  job.setReducerClass(MyReduce.class);
  
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(MyArrayWritable.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  
  FileInputFormat.setInputPaths(job, new Path("hdfs://115.28.138.100:9000/HTTP_20130313143750.dat"));
  FileOutputFormat.setOutputPath(job, new Path("hdfs://115.28.138.100:9000/ceshi3"));
  
  job.waitForCompletion(true);
 } 
}
class MyArrayWritable extends ArrayWritable{
 public MyArrayWritable(){
  super(LongWritable.class);
 }
 public MyArrayWritable(String[] arg0) {
  super(arg0);
 }
 
}

5.SequenceFile

    在HDFS的學習中,提到了小文件的解決方案,其中一個便是這個SequenceFile。他是一種無序存儲,將kv對序列化到文件中,從而合并許多小文件并且支持壓縮。缺點是必須遍歷才能查看里面各個小文件。

public class SequenceFileTest {
 public static void main(String[] args) throws Exception{
  Configuration conf = new Configuration();
  FileSystem fileSystem = FileSystem.get(new URI("hdfs://115.28.138.100:9000"), conf, "hadoop");
  //Write(conf, fileSystem);
  Read(conf, fileSystem);
 }
 
 
 private static void Read(Configuration conf, FileSystem fileSystem) throws IOException {
  Reader reader=new SequenceFile.Reader(fileSystem, new Path("/sqtest"), conf);
  Text key=new Text();
  Text val=new Text();
  while(reader.next(key, val)){
   System.out.println(key.toString()+"----"+val.toString());
  }
  IOUtils.closeStream(reader);
 }
 
 
 private static void Write(Configuration conf, FileSystem fileSystem) throws IOException {
  Writer writer = SequenceFile.createWriter(fileSystem, conf, new Path("/sqtest"), Text.class, Text.class);
  Collection<File> files = FileUtils.listFiles(new File("F:\\ceshi1"), new String[] { "txt" }, false);
  for (File file : files) {
   Text text = new Text();
   text.set(FileUtils.readFileToString(file));
   writer.append(new Text(file.getName()), text);
  }
  IOUtils.closeStream(writer);
 }
}

到此,相信大家對“MapReduce怎么使用”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女