溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Hadoop2.6.0學習筆記(五)自定義InputFormat和RecordReader

發布時間:2020-05-01 05:53:33 來源:網絡 閱讀:6353 作者:luchunli1985 欄目:大數據

魯春利的工作筆記,誰說程序員不能有文藝范?


 

TextInputFormat提供了對文本文件的處理方式,通過InputSplit進行分片(FileSplit),每一個分片分別new一個LineRecordReader進行讀取解析,解析得到的每一行以<key, value>的形式傳給Mapper的map()函數。


應用示例:隨機生成100個小數并求最大值。

MapReduce自帶的輸入類型都是基于HDFS的,本示例不從HDFS讀取數據,而是從內存中生成100個小數,然后求最大值。


自定義InputFormat

package com.lucl.hadoop.mapreduce.rand;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

/**
 * @author luchunli
 * @description 自定義InputFormat
 */
public class RandomInputFormat extends InputFormat<IntWritable, ArrayWritable> {

    public static float [] floatValues = null;
    
    /** 自定義分片規則 **/
    @Override
    public List<InputSplit> getSplits(JobContext context) throws IOException,
            InterruptedException {
        // 初始化數組的長度
        int NumOfValues = context.getConfiguration().getInt("lucl.random.nums", 100);
        floatValues = new float[NumOfValues];
        
        Random random = new Random ();
        for (int i = 0; i < NumOfValues; i++) {
            floatValues[i] = random.nextFloat();
        }
        System.out.println("生成的隨機數的值如下:");
        for (float f : floatValues) {
            System.out.println(f);
        }
        System.out.println("====================");
        
        // 如下代碼表示指定兩個map task來處理這100個小數,每個map task處理50個小數
        // 初始化split分片數目,split分片的數量等于map任務的數量,但是也可以通過配置參數mapred.map.tasks來指定
        // 如果該參數和splits的切片數不一致時,map task的數目如何確定,后續再通過代碼分析
        int NumSplits = context.getConfiguration().getInt("mapreduce.job.maps", 2);
        int begin = 0;
        // Math.floor是為了下取整,這里是100剛好整除,如果是99的話Math.floor的值是49.0
        // 50
        int length = (int)Math.floor(NumOfValues / NumSplits);    
        // end = 49,第一個split的范圍就是0~49
        int end = length - 1;    
        
        // 默認的FileInputFormat類的getSplits方法中是通過文件數目和blocksize進行分的,
        // 文件超過一個塊會分成多個split,否則一個文件一個split分片
        List<InputSplit> splits = new ArrayList<InputSplit>();
        for (int i = 0; i < NumSplits - 1; i++) {    // 2個splits分片,分別為0和1
            RandomInputSplit split = new RandomInputSplit(begin, end);
            splits.add(split);
            
            // begin是上一個split切片的下一個值
            begin = end + 1;        // 50
            // 切片的長度不變,結束位置為起始位置+分片的長度,而數組下標是從0開始的,
            // 因此結束位置應該是begin加長度-1
            end = begin + (length - 1);    // 50 + (50 -1) = 99
        }
        RandomInputSplit split = new RandomInputSplit(begin, end);
        splits.add(split);
        
        /**
         * <pre>
         *     通過默認的TextInputFormat來實現的時候,如果有兩個小文件,則splits=2,參見:
         *     http://luchunli.blog.51cto.com/2368057/1676185
         * </pre>
         */
        
        return splits;
    }

    @Override
    public RecordReader<IntWritable, ArrayWritable> createRecordReader(InputSplit split,
            TaskAttemptContext context) throws IOException, InterruptedException {
        return new RandomRecordReader();
    }
}

自定義InputSplit

package com.lucl.hadoop.mapreduce.rand;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
/**
 * @author luchunli
 * @description 
 *     自定義InputSplit,參照了{@link org.apache.hadoop.mapreduce.lib.input.Filesplit}
 *  <br/>
 *     FileSplit是針對HDFS上文件的實現,因此其屬性包括文件絕對路徑(Path)、分片起始位置(start)、
 *     分片長度(length)、副本信息(保存Block復本數據到的主機數組)。
 *     <br/>
 *  自定義的InputSplit是針對內存中的數組數據進行的處理,因此無需記錄文件路徑及副本信息,只需要記錄對數組分片的起始位置、分片長度即可。
 * </pre>
 */
public class RandomInputSplit extends InputSplit implements Writable {
    private int start;
    private int end;
    private ArrayWritable floatArray = new ArrayWritable(FloatWritable.class);
    
    public RandomInputSplit () {}
    
    /**
     * Constructs a split 
     * 
     * @param start
     * @param end 
     *
     */
    public RandomInputSplit (int start, int end) {
        this.start = start;
        this.end = end;
        
        int len = this.end - this.start + 1;
        int index = start;
        FloatWritable [] result = new FloatWritable[len];
        for (int i = 0; i < len; i++) {
            float f = RandomInputFormat.floatValues[index];
            FloatWritable fw = new FloatWritable(f);
            
            result[i] = fw;
            
            index++;
        }
        floatArray.set(result);
        
//        System.out.println("查看分片數據:");
//        for (FloatWritable fw : (FloatWritable[])floatArray.toArray()) {
//            System.out.println(fw.get());
//        }
//        System.out.println("=====================");
    }

    @Override
    public long getLength() throws IOException, InterruptedException {
        return this.end - this.start;
    }

    @Override
    public String[] getLocations() throws IOException, InterruptedException {
        return new String[]{"dnode1", "dnode2"};
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.start = in.readInt();
        this.end = in.readInt();
        this.floatArray.readFields(in);        
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(this.getStart());
        out.writeInt(this.getEnd());
        this.floatArray.write(out);
    }

    public int getStart() {
        return start;
    }

    public void setStart(int start) {
        this.start = start;
    }

    public int getEnd() {
        return end;
    }

    public void setEnd(int end) {
        this.end = end;
    }

    public ArrayWritable getFloatArray() {
        return floatArray;
    }

    @Override
    public String toString() {
        return this.getStart() + "-" + this.getEnd();
    }
}

自定義RecordReader

package com.lucl.hadoop.mapreduce.rand;

import java.io.IOException;

import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

/**
 * @author luchunli
 * @description 自定義RecordReader
 *
 */
public class RandomRecordReader extends RecordReader<IntWritable, ArrayWritable> {
    private int start;
    private int end;
    private int index;

    private IntWritable key = null;
    private ArrayWritable value = null;
    private RandomInputSplit rsplit = null;
    
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {
        this.rsplit = (RandomInputSplit)split;
        this.start = this.rsplit.getStart();
        this.end = this.rsplit.getEnd();
        this.index = this.start;
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (null == key) {
            key = new IntWritable();
        }
        if (null == value) {
            value = new ArrayWritable(FloatWritable.class);
        }
        if (this.index <= this.end) {
            key.set(this.index);
            value = rsplit.getFloatArray();
            index = end + 1;
            return true;
        } 
        return false;
    }

    @Override
    public IntWritable getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    @Override
    public ArrayWritable getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        if (this.index == this.end) {
            return 0F;
        }
        return Math.min(1.0F, (this.index - this.start) / (float)(this.end - this.start));
    }

    @Override
    public void close() throws IOException {
        // ......
    }

}

實現Mapper

package com.lucl.hadoop.mapreduce.rand;

import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * @author luchunli
 * @description Mapper
 */
public class RandomMapper extends Mapper<IntWritable, ArrayWritable, IntWritable, FloatWritable> {
    private static final IntWritable one = new IntWritable(1);
    
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        // 為了查看當前map是在那臺機器上執行的,在該機器上創建個隨機文件,
        // 執行完成后到DN節點對應目錄下查看即可
        SimpleDateFormat format = new SimpleDateFormat("yyyyMMddhhmmss");
        File file = new File("/home/hadoop", "Mapper-" + format.format(new Date()));
        if (!file.exists()) {
            file.createNewFile();
        }
    }
    
    @Override
    protected void map(IntWritable key, ArrayWritable value, Context context)
            throws IOException, InterruptedException {
        FloatWritable [] floatArray = (FloatWritable[])value.toArray();
        float maxValue = floatArray[0].get();
        float tmp = 0;
        
        for (int i = 1; i < floatArray.length; i++) {
            tmp = floatArray[i].get();
            if (tmp > maxValue) {
                maxValue = tmp;
            }
        }
        
        // 這里必須要保證多個map輸出的key是一樣的,否則reduce處理時會認為是不同的數據,
        // shuffle會分成多個組,導致每個map task算出一個最大值
        context.write(one, new FloatWritable(maxValue));
    }
}

實現Reducer

package com.lucl.hadoop.mapreduce.rand;

import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * @author luchunli
 * @description Rducer
 */
public class RandomReducer extends Reducer<IntWritable, FloatWritable, Text, FloatWritable> {
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        SimpleDateFormat format = new SimpleDateFormat("yyyyMMddhhmmss");
        // 為了查看當前reduce是在那臺機器上執行的,在該機器上創建個隨機文件
        File file = new File("/home/hadoop", "Reducer-" + format.format(new Date()));
        if (!file.exists()) {
            file.createNewFile();
        }
    }
    
    @Override
    protected void reduce(IntWritable key, Iterable<FloatWritable> value, Context context)
            throws IOException, InterruptedException {
        Iterator<FloatWritable> it = value.iterator();
        float maxValue = 0;
        float tmp = 0;
        if (it.hasNext()) {
            maxValue = it.next().get();
        } else {
            context.write(new Text("The max value is : "), new FloatWritable(maxValue));
            return;
        }
        
        while (it.hasNext()) {
            tmp = it.next().get();
            if (tmp > maxValue) {
                maxValue = tmp;
            }
        }
        context.write(new Text("The max value is : "), new FloatWritable(maxValue));
    }
}

定義驅動類

package com.lucl.hadoop.mapreduce.rand;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;

/**
 * @author luchunli
 * @description MapReduce自帶的輸入類都是基于HDFS的,如下示例代碼不用從HDFS上面讀取內容,
 * 而是在內存里面隨機生成100個(0-1)float類型的小數,然后求這100個小數的最大值。
 */
public class RandomDriver extends Configured implements Tool {

    public static void main(String[] args) {
        try {
            ToolRunner.run(new RandomDriver(), args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = this.getConf();
        conf.set("lucl.random.nums", "100");
        conf.set("mapreduce.job.maps", "2");
        
        Job job = Job.getInstance(getConf(), this.getClass().getSimpleName());
        
        job.setJarByClass(RandomDriver.class);
        
        job.setInputFormatClass(RandomInputFormat.class);
        
        job.setMapperClass(RandomMapper.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(FloatWritable.class);
        
        job.setReducerClass(RandomReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FloatWritable.class);
        
        FileOutputFormat.setOutputPath(job, new Path(args[0]));
        
        return job.waitForCompletion(true) ? 0 : 1;
    }
    
}

打包運行

[hadoop@nnode code]$ hadoop jar RandomMR.jar /201512020027
15/12/02 00:28:07 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:8032
生成的隨機數的值如下:
0.020075738
0.700349
0.9617876
0.8286018
0.03357637
0.55033255
0.112645924
0.43312508
0.33184355
0.6960902
0.23912054
0.8523424
0.4133852
0.028242588
0.9031814
0.39397871
0.38278967
0.5842654
0.4569224
0.4008881
0.2230537
0.92889327
0.20127994
0.09574646
0.23173904
0.4365906
0.11567855
0.027944028
0.6965957
0.78311944
0.2365641
0.8575301
0.07472658
0.5219022
0.9409952
0.7122519
0.8722465
0.30288923
0.51773626
0.91211754
0.93172425
0.38484365
0.44844115
0.24589789
0.83361626
0.40983224
0.9444963
0.12061542
0.8446641
0.5303581
0.11295539
0.094003916
0.11822218
0.4997149
0.98296344
0.48746037
0.31420535
0.1151396
0.7904337
0.80005115
0.18344402
0.8171619
0.8749699
0.48023254
0.0044505
0.43879867
0.22367835
0.62924916
0.6998315
0.222148
0.7392884
0.4174865
0.4528237
0.70034826
0.3057149
0.29177833
0.22782367
0.8182611
0.46680295
0.4778521
0.6365823
0.43971914
0.27055055
0.26839674
0.5263245
0.8824649
0.51182485
0.20494783
0.7679403
0.31936407
0.13476872
0.47281688
0.3402111
0.28706527
0.038203478
0.7351879
0.6165404
0.41761196
0.5229257
0.7284225
====================
15/12/02 00:28:08 INFO mapreduce.JobSubmitter: number of splits:2
15/12/02 00:28:08 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1448981819300_0014
15/12/02 00:28:09 INFO impl.YarnClientImpl: Submitted application application_1448981819300_0014
15/12/02 00:28:09 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1448981819300_0014/
15/12/02 00:28:09 INFO mapreduce.Job: Running job: job_1448981819300_0014
15/12/02 00:28:38 INFO mapreduce.Job: Job job_1448981819300_0014 running in uber mode : false
15/12/02 00:28:38 INFO mapreduce.Job:  map 0% reduce 0%
15/12/02 00:29:13 INFO mapreduce.Job:  map 100% reduce 0%
15/12/02 00:29:32 INFO mapreduce.Job:  map 100% reduce 100%
15/12/02 00:29:32 INFO mapreduce.Job: Job job_1448981819300_0014 completed successfully
15/12/02 00:29:32 INFO mapreduce.Job: Counters: 49
        File System Counters
                FILE: Number of bytes read=26
                FILE: Number of bytes written=323256
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=520
                HDFS: Number of bytes written=31
                HDFS: Number of read operations=7
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters 
                Launched map tasks=2
                Launched reduce tasks=1
                Data-local map tasks=2
                Total time spent by all maps in occupied slots (ms)=64430
                Total time spent by all reduces in occupied slots (ms)=16195
                Total time spent by all map tasks (ms)=64430
                Total time spent by all reduce tasks (ms)=16195
                Total vcore-seconds taken by all map tasks=64430
                Total vcore-seconds taken by all reduce tasks=16195
                Total megabyte-seconds taken by all map tasks=65976320
                Total megabyte-seconds taken by all reduce tasks=16583680
        Map-Reduce Framework
                Map input records=2
                Map output records=2
                Map output bytes=16
                Map output materialized bytes=32
                Input split bytes=520
                Combine input records=0
                Combine output records=0
                Reduce input groups=1
                Reduce shuffle bytes=32
                Reduce input records=2
                Reduce output records=1
                Spilled Records=4
                Shuffled Maps =2
                Failed Shuffles=0
                Merged Map outputs=2
                GC time elapsed (ms)=356
                CPU time spent (ms)=1940
                Physical memory (bytes) snapshot=513851392
                Virtual memory (bytes) snapshot=2541506560
                Total committed heap usage (bytes)=257171456
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters 
                Bytes Read=0
        File Output Format Counters 
                Bytes Written=31
[hadoop@nnode code]$

查看輸出結果

[hadoop@nnode code]$ hdfs dfs -ls /201512020027
Found 2 items
-rw-r--r--   2 hadoop hadoop          0 2015-12-02 00:29 /201512020027/_SUCCESS
-rw-r--r--   2 hadoop hadoop         31 2015-12-02 00:29 /201512020027/part-r-00000
[hadoop@nnode code]$ hdfs dfs -text /201512020027/part-r-00000
The max value is :      0.98296344
[hadoop@nnode code]$


向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女