本篇內容主要講解“MapReduce怎么使用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“MapReduce怎么使用”吧!
什么是MR
MR是一種分布計算模型,主要用來解決海量數據的計算問題的。它包含了兩種計算函數,一個是Mapping,另外一個是Reducing。Mapping對集合內的每個目標做同一個操作,Reduceing則是遍歷集合中的元素返回一個綜合的結果。我們操作代碼時,只需要重寫map和reduce方法就行,十分簡單。這兩個函數的形參都是k,v對,當數據量到達10PB以上時,則會速度變慢。
MR執行過程
MR程序啟動時,會把輸入文件轉化成<k1,v1>鍵值對傳給map函數,有幾個鍵值對就執行幾次map函數,但不是說有幾個鍵值對就有幾個Mapper進程,這是不對的。經過map函數處理,變成<k2,v2>鍵值對。由<k2,v2>轉變成reduce函數的輸入<k2,{v2,.....}>的過程被稱之為shuffle。shuffle并不是象map和reduce這樣的某個函數,不是需要單獨拿出節點運行的,它僅僅只是一個過程。<k2,{v2...}>進過reduce函數處理,變成了最后的輸出<k3,v3>。在到達reduce函數之前,鍵值對的數目是不變的。
Map階段
(1).根據輸入文件解析成<k1,v1>對,每一對調用一次map函數
(2).根據自己編寫的map函數,將鍵值對處理,變成新的<k2,v2>鍵值對輸出
(3).對輸出的鍵值對進行分區,不同分區對應著不同的Reducer進程
(4).每個分區中的鍵值對,根據key進行排序,分組。然后把相同key的val放到同一個集合中。
(5).進行規約(可選)
Reduce階段
(1).多個map函數輸出的kv對,按照不同分區,傳輸到不同的reduce節點上。
(2).將多個map函數輸出的kv對合并,排序。根據reduce函數邏輯,處理<k2,{v2..}>,轉換成新的鍵值對輸出
(3).輸出保存文件
3.簡單例子
Wordcount
public class WordCount { public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ Text k2=new Text(); LongWritable v2=new LongWritable(); @Override protected void map(LongWritable k1, Text v1,Context context) throws IOException, InterruptedException { String[] words=v1.toString().split("\t"); for (String string : words) { k2.set(string); v2.set(1L); context.write(k2, v2); } } } public static class MyReduce extends Reducer<Text, LongWritable, Text, LongWritable>{ LongWritable v3=new LongWritable(); @Override protected void reduce(Text k2, Iterable<LongWritable> v2s,Context context) throws IOException, InterruptedException { long sum=0; for (LongWritable longWritable : v2s) { sum=sum+longWritable.get(); } v3.set(sum); context.write(k2, v3); } } public static void main(String[] args) throws Exception { Configuration conf=new Configuration(); Job job=Job.getInstance(conf, WordCount.class.getSimpleName()); job.setJarByClass(WordCount.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.setInputPaths(job, new Path("hdfs://115.28.138.100:9000/a.txt")); FileOutputFormat.setOutputPath(job, new Path("hdfs://115.28.138.100:9000/out4")); job.waitForCompletion(true); } }
4.MR的序列化
序列化就是把結構化的對象轉換為字節流,在MR中,他沒有用java自己的序列化,而是自己實現了一套序列化。因為相比較而言,hadoop的序列化有著諸多優點。在mr程序中,我們的參數和輸出的鍵值對全都是實現了序列化的對象,當我們需要自訂一個序列化對象,該如何操作呢?只需要實現Writable接口即可,當然key需要實現WritableComparable接口,因為需要根據key來排序和分組。
接著有個小例子來展示序列化。就是電信流量的處理例子。
public class LiuLiang { public static class MyMapper extends Mapper<LongWritable, Text, Text, MyArrayWritable>{ Text k2=new Text(); MyArrayWritable v2=new MyArrayWritable(); LongWritable v21=new LongWritable(); LongWritable v22=new LongWritable(); LongWritable v23=new LongWritable(); LongWritable v24=new LongWritable(); LongWritable[] values=new LongWritable[4]; @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { String[] words=v1.toString().split("\t"); k2.set(words[1]); v21.set(Long.parseLong(words[6])); v22.set(Long.parseLong(words[7])); v23.set(Long.parseLong(words[8])); v24.set(Long.parseLong(words[9])); values[0]=v21; values[1]=v22; values[2]=v23; values[3]=v24; v2.set(values); context.write(k2, v2); } } public static class MyReduce extends Reducer<Text, MyArrayWritable, Text, Text>{ Text v3=new Text(); @Override protected void reduce(Text k2, Iterable<MyArrayWritable> v2s, Context context) throws IOException, InterruptedException { long sum1=0; long sum2=0; long sum3=0; long sum4=0; for (MyArrayWritable myArrayWritable : v2s) { Writable[] values= myArrayWritable.get(); sum1=sum1+((LongWritable)values[0]).get(); sum2=sum2+((LongWritable)values[1]).get(); sum3=sum3+((LongWritable)values[2]).get(); sum4=sum4+((LongWritable)values[3]).get(); } v3.set("\t"+sum1+"\t"+sum2+"\t"+sum3+"\t"+sum4); context.write(k2, v3); } } public static void main(String[] args) throws Exception { Configuration conf=new Configuration(); Job job=Job.getInstance(conf, LiuLiang.class.getSimpleName()); job.setJarByClass(LiuLiang.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(MyArrayWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path("hdfs://115.28.138.100:9000/HTTP_20130313143750.dat")); FileOutputFormat.setOutputPath(job, new Path("hdfs://115.28.138.100:9000/ceshi3")); job.waitForCompletion(true); } } class MyArrayWritable extends ArrayWritable{ public MyArrayWritable(){ super(LongWritable.class); } public MyArrayWritable(String[] arg0) { super(arg0); } }
5.SequenceFile
在HDFS的學習中,提到了小文件的解決方案,其中一個便是這個SequenceFile。他是一種無序存儲,將kv對序列化到文件中,從而合并許多小文件并且支持壓縮。缺點是必須遍歷才能查看里面各個小文件。
public class SequenceFileTest { public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI("hdfs://115.28.138.100:9000"), conf, "hadoop"); //Write(conf, fileSystem); Read(conf, fileSystem); } private static void Read(Configuration conf, FileSystem fileSystem) throws IOException { Reader reader=new SequenceFile.Reader(fileSystem, new Path("/sqtest"), conf); Text key=new Text(); Text val=new Text(); while(reader.next(key, val)){ System.out.println(key.toString()+"----"+val.toString()); } IOUtils.closeStream(reader); } private static void Write(Configuration conf, FileSystem fileSystem) throws IOException { Writer writer = SequenceFile.createWriter(fileSystem, conf, new Path("/sqtest"), Text.class, Text.class); Collection<File> files = FileUtils.listFiles(new File("F:\\ceshi1"), new String[] { "txt" }, false); for (File file : files) { Text text = new Text(); text.set(FileUtils.readFileToString(file)); writer.append(new Text(file.getName()), text); } IOUtils.closeStream(writer); } }
到此,相信大家對“MapReduce怎么使用”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。