溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Hadoop中怎么實現MapReduce的數據輸入

發布時間:2021-12-22 17:22:10 來源:億速云 閱讀:192 作者:iii 欄目:云計算
# Hadoop中怎么實現MapReduce的數據輸入

## 1. MapReduce數據輸入概述

MapReduce作為Hadoop的核心計算框架,其數據處理流程始于數據輸入階段。數據輸入機制決定了MapReduce作業如何讀取原始數據并將其轉化為可供Mapper處理的鍵值對形式。在Hadoop生態系統中,數據輸入過程具有以下核心特點:

1. **分布式特性**:輸入數據通常存儲在HDFS上,被自動劃分為多個數據塊(默認128MB)
2. **并行處理**:每個數據分片(InputSplit)由一個獨立的Mapper任務處理
3. **格式無關性**:通過InputFormat抽象類支持多種數據格式處理
4. **位置感知**:遵循"移動計算比移動數據更高效"原則,盡量在數據所在節點執行計算

## 2. 核心組件與工作機制

### 2.1 InputFormat抽象類

作為數據輸入的最高層抽象,InputFormat定義了兩個核心職責:

```java
public abstract class InputFormat<K, V> {
    // 獲取輸入數據的分片信息
    public abstract List<InputSplit> getSplits(JobContext context);
    
    // 創建RecordReader用于讀取分片數據
    public abstract RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context);
}

2.2 InputSplit實現

InputSplit表示邏輯上的數據分片,包含以下關鍵信息:

  1. 分片長度(字節數)
  2. 分片位置信息(存儲節點列表)
  3. 分片數據的位置標識
public abstract class InputSplit {
    public abstract long getLength();
    public abstract String[] getLocations();
}

2.3 RecordReader組件

RecordReader負責將InputSplit轉化為具體的鍵值對記錄:

public abstract class RecordReader<KEYIN, VALUEIN> {
    // 初始化方法
    public abstract void initialize(InputSplit split, TaskAttemptContext context);
    
    // 讀取下一條記錄
    public abstract boolean nextKeyValue();
    
    // 獲取當前鍵
    public abstract KEYIN getCurrentKey();
    
    // 獲取當前值
    public abstract VALUEIN getCurrentValue();
    
    // 獲取進度
    public abstract float getProgress();
    
    // 關閉資源
    public abstract void close();
}

3. 內置輸入格式實現

3.1 TextInputFormat(默認文本輸入)

處理純文本文件的默認實現,特點包括: - 每行文本記錄 - 鍵為LongWritable類型(字節偏移量) - 值為Text類型(行內容)

// 典型使用方式
Job job = Job.getInstance(conf);
job.setInputFormatClass(TextInputFormat.class);

3.2 KeyValueTextInputFormat

處理鍵值對文本的特殊格式: - 每行格式為”key[分隔符]value” - 默認分隔符是制表符(\t) - 可通過mapreduce.input.keyvaluelinerecordreader.key.value.separator配置

3.3 SequenceFileInputFormat

二進制文件輸入格式,支持三種類型: 1. SequenceFileInputFormat: 通用型 2. SequenceFileAsTextInputFormat: 將鍵值轉為Text對象 3. SequenceFileAsBinaryInputFormat: 原始二進制格式

3.4 復合輸入格式

CompositeInputFormat 用于多路徑輸入場景: - 支持多個輸入路徑的聯合查詢 - 通過join表達式定義數據關聯方式 - 典型應用場景:Map端join操作

// 配置示例
conf.set("mapreduce.join.expr", 
    CompositeInputFormat.compose("inner", 
    KeyValueTextInputFormat.class, "/path1", "/path2"));

4. 自定義輸入格式實現

4.1 實現場景

需要自定義輸入格式的典型場景: - 處理非標準格式數據(如二進制協議) - 需要特殊分片邏輯(如不可分割的壓縮格式) - 多數據源組合輸入 - 需要預處理原始數據

4.2 實現步驟

4.2.1 繼承InputFormat

public class CustomInputFormat extends FileInputFormat<LongWritable, Text> {
    @Override
    public RecordReader<LongWritable, Text> createRecordReader(
        InputSplit split, TaskAttemptContext context) {
        return new CustomRecordReader();
    }
    
    // 可選:覆蓋分片邏輯
    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        return false; // 示例:禁止分片
    }
}

4.2.2 實現RecordReader

public class CustomRecordReader extends RecordReader<LongWritable, Text> {
    private LineReader in;
    private LongWritable key;
    private Text value;
    
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) {
        // 初始化讀取器
        FileSplit fileSplit = (FileSplit)split;
        Configuration conf = context.getConfiguration();
        Path path = fileSplit.getPath();
        FileSystem fs = path.getFileSystem(conf);
        FSDataInputStream fileIn = fs.open(path);
        in = new LineReader(fileIn, conf);
    }
    
    @Override
    public boolean nextKeyValue() {
        // 讀取下一條記錄
        key = new LongWritable();
        value = new Text();
        int bytesRead = in.readLine(value);
        if (bytesRead == 0) return false;
        key.set(offset); // 設置偏移量
        return true;
    }
    // 其他方法實現...
}

4.2.3 配置使用

job.setInputFormatClass(CustomInputFormat.class);
FileInputFormat.addInputPath(job, new Path("input/"));

5. 高級輸入控制技術

5.1 輸入路徑過濾

通過FileInputFormat設置路徑過濾器:

// 自定義過濾器
public class RegexFilter extends PathFilter {
    private final String regex;
    
    public RegexFilter(String regex) {
        this.regex = regex;
    }
    
    @Override
    public boolean accept(Path path) {
        return path.toString().matches(regex);
    }
}

// 使用配置
FileInputFormat.setInputPathFilter(job, RegexFilter.class);
conf.set("filter.pattern", ".*\\.data$");

5.2 多路徑輸入處理

// 添加多個輸入路徑
FileInputFormat.addInputPath(job, new Path("input1/"));
FileInputFormat.addInputPath(job, new Path("input2/"));

// 為不同路徑設置不同InputFormat
MultipleInputs.addInputPath(job, new Path("input1/"), 
    TextInputFormat.class, Mapper1.class);
MultipleInputs.addInputPath(job, new Path("input2/"),
    SequenceFileInputFormat.class, Mapper2.class);

5.3 輸入采樣與分片優化

通過InputSampler實現數據采樣,優化分片:

// 創建采樣器
InputSampler.Sampler<Text, Text> sampler = 
    new InputSampler.RandomSampler<>(0.1, 1000, 10);

// 寫入分區文件
InputSampler.writePartitionFile(job, sampler);

// 配置TotalOrderPartitioner
job.setPartitionerClass(TotalOrderPartitioner.class);
String partitionFile = TotalOrderPartitioner.getPartitionFile(conf);

6. 性能優化實踐

6.1 分片大小調優

計算公式:

splitSize = max(minSize, min(maxSize, blockSize))

配置參數:

<!-- 最小分片大小 -->
<property>
    <name>mapreduce.input.fileinputformat.split.minsize</name>
    <value>0</value>
</property>

<!-- 最大分片大小 -->
<property>
    <name>mapreduce.input.fileinputformat.split.maxsize</name>
    <value>256000000</value>
</property>

6.2 壓縮數據輸入處理

支持壓縮格式: - DEFLATE (.deflate) - gzip (.gz) - bzip2 (.bz2) - LZO (.lzo) - Snappy (.snappy)

自動檢測機制:

CompressionCodecFactory codecFactory = 
    new CompressionCodecFactory(conf);
CompressionCodec codec = codecFactory.getCodec(filePath);

6.3 本地化優化

通過以下配置提高數據本地化:

<property>
    <name>mapreduce.job.split.metainfo.maxsize</name>
    <value>10000000</value>
</property>
<property>
    <name>mapreduce.input.fileinputformat.split.maxsize</name>
    <value>134217728</value> <!-- 匹配HDFS塊大小 -->
</property>

7. 常見問題解決方案

7.1 小文件問題

解決方案對比:

方案 優點 缺點
CombineFileInputFormat 自動合并小文件 需要內存緩沖
HAR歸檔文件 減少NameNode壓力 需要額外歸檔步驟
SequenceFile合并 高效二進制格式 不可直接查看內容

實現示例:

job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 256000000);

7.2 數據傾斜處理

應對策略: 1. 預處理階段數據采樣 2. 自定義分區器(Partitioner) 3. 使用Combiner減少數據傳輸 4. 傾斜鍵單獨處理

7.3 格式異常處理

增強魯棒性的方法:

// 在RecordReader中實現容錯邏輯
try {
    // 解析記錄
} catch (MalformedRecordException e) {
    context.getCounter("Error", "BadRecords").increment(1);
    if (skipBadRecords) continue;
    else throw e;
}

8. 未來發展趨勢

  1. 向量化輸入:Apache ORC/Parquet等列式存儲的向量化讀取
  2. 云原生存儲適配:對S3、OSS等對象存儲的優化支持
  3. 智能分片:基于機器學習預測的自動分片大小調整
  4. 流批一體:與Flink等流處理框架的統一輸入接口

通過深入理解MapReduce數據輸入機制,開發者可以針對不同業務場景選擇最優的輸入策略,構建高效可靠的大數據處理管道。 “`

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女