小編給大家分享一下hadoop-reduce的示例分析,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
Map的結果,會通過partition分發到Reducer上,Reducer做完Reduce操作后,通過OutputFormat,進行輸出。
* Licensed to the Apache Software Foundation (ASF) under one package org.apache.hadoop.mapreduce; import java.io.IOException; * Reduces a set of intermediate values which share a key to a smaller set of public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { public class Context extends ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { public Context(Configuration conf, TaskAttemptID taskid, RawKeyValueIterator input, Counter inputKeyCounter, Counter inputValueCounter, RecordWriter<KEYOUT,VALUEOUT> output, OutputCommitter committer, StatusReporter reporter, RawComparator<KEYIN> comparator, Class<KEYIN> keyClass, Class<VALUEIN> valueClass ) throws IOException, InterruptedException { super(conf, taskid, input, inputKeyCounter, inputValueCounter, output, committer, reporter, comparator, keyClass, valueClass); } } /** * Called once at the start of the task. */ protected void setup(Context context ) throws IOException, InterruptedException { // NOTHING } /** * This method is called once for each key. Most applications will define * their reduce class by overriding this method. The default implementation * is an identity function. */ @SuppressWarnings("unchecked") protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context ) throws IOException, InterruptedException { for(VALUEIN value: values) { context.write((KEYOUT) key, (VALUEOUT) value); } } /** * Called once at the end of the task. */ protected void cleanup(Context context ) throws IOException, InterruptedException { // NOTHING } /** * Advanced application writers can use the * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to * control how the reduce task works. */ public void run(Context context) throws IOException, InterruptedException { setup(context); while (context.nextKey()) { reduce(context.getCurrentKey(), context.getValues(), context); } cleanup(context); } }
Mapper的結果,可能送到可能的Combiner做合并,Combiner在系統中并沒有自己的基類,而是用Reducer作為Combiner的基類,他們對外的功能是一樣的,只是使用的位置和使用時的上下文不太一樣而已。
Mapper最終處理的結果對<key, value>,是需要送到Reducer去合并的,合并的時候,有相同key的鍵/值對會送到同一個Reducer那,哪個key到哪個Reducer的分配過程,是由Partitioner規定的,它只有一個方法,輸入是Map的結果對<key, value>和Reducer的數目,輸出則是分配的Reducer(整數編號)。系統缺省的Partitioner是HashPartitioner,它以key的Hash值對Reducer的數目取模,得到對應的Reducer。
* Licensed to the Apache Software Foundation (ASF) under one package org.apache.hadoop.mapreduce; * Partitions the key space. public abstract class Partitioner<KEY, VALUE> { /** * Get the partition number for a given key (hence record) given the total * number of partitions i.e. number of reduce-tasks for the job. * * <p>Typically a hash function on a all or a subset of the key.</p> * * @param key the key to be partioned. * @param value the entry value. * @param numPartitions the total number of partitions. * @return the partition number for the <code>key</code>. */ public abstract int getPartition(KEY key, VALUE value, int numPartitions); } * Licensed to the Apache Software Foundation (ASF) under one package org.apache.hadoop.mapreduce.lib.partition; import org.apache.hadoop.mapreduce.Partitioner; /** Partition keys by their {@link Object#hashCode()}. */ public class HashPartitioner<K, V> extends Partitioner<K, V> { /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
Reducer是所有用戶定制Reducer類的基類,和Mapper類似,它也有setup,reduce,cleanup和run方法,其中setup和cleanup含義和Mapper相同,reduce是真正合并Mapper結果的地方,它的輸入是key和這個key對應的所有value的一個迭代器,同時還包括Reducer的上下文。系統中定義了兩個非常簡單的Reducer,IntSumReducer和LongSumReducer,分別用于對整形/長整型的value求和。
* Licensed to the Apache Software Foundation (ASF) under one package org.apache.hadoop.mapreduce.lib.reduce; import java.io.IOException; public class IntSumReducer<Key> extends Reducer<Key,IntWritable, Key,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Key key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
Reduce的結果,通過Reducer.Context的方法collect輸出到文件中,和輸入類似,Hadoop引入了OutputFormat。OutputFormat依賴兩個輔助接口:RecordWriter和OutputCommitter,來處理輸出。RecordWriter提供了write方法,用于輸出<key, value>和close方法,用于關閉對應的輸出。OutputCommitter提供了一系列方法,用戶通過實現這些方法,可以定制OutputFormat生存期某些階段需要的特殊操作。我們在TaskInputOutputContext中討論過這些方法(明顯,TaskInputOutputContext是OutputFormat和Reducer間的橋梁)。
OutputFormat和RecordWriter分別對應著InputFormat和RecordReader,系統提供了空輸出NullOutputFormat(什么結果都不輸出,NullOutputFormat.RecordWriter只是示例,系統中沒有定義),LazyOutputFormat(沒在類圖中出現,不分析),FilterOutputFormat(不分析)和基于文件FileOutputFormat的SequenceFileOutputFormat和TextOutputFormat輸出。
基于文件的輸出FileOutputFormat利用了一些配置項配合工作,包括mapred.output.compress:是否壓縮;mapred.output.compression.codec:壓縮方法;mapred.output.dir:輸出路徑;mapred.work.output.dir:輸出工作路徑。FileOutputFormat還依賴于FileOutputCommitter,通過FileOutputCommitter提供一些和Job,Task相關的臨時文件管理功能。如FileOutputCommitter的setupJob,會在輸出路徑下創建一個名為_temporary的臨時目錄,cleanupJob則會刪除這個目錄。
SequenceFileOutputFormat輸出和TextOutputFormat輸出分別對應輸入的SequenceFileInputFormat和TextInputFormat
以上是“hadoop-reduce的示例分析”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。