# Hadoop中怎么實現MapReduce的數據輸入
## 1. MapReduce數據輸入概述
MapReduce作為Hadoop的核心計算框架,其數據處理流程始于數據輸入階段。數據輸入機制決定了MapReduce作業如何讀取原始數據并將其轉化為可供Mapper處理的鍵值對形式。在Hadoop生態系統中,數據輸入過程具有以下核心特點:
1. **分布式特性**:輸入數據通常存儲在HDFS上,被自動劃分為多個數據塊(默認128MB)
2. **并行處理**:每個數據分片(InputSplit)由一個獨立的Mapper任務處理
3. **格式無關性**:通過InputFormat抽象類支持多種數據格式處理
4. **位置感知**:遵循"移動計算比移動數據更高效"原則,盡量在數據所在節點執行計算
## 2. 核心組件與工作機制
### 2.1 InputFormat抽象類
作為數據輸入的最高層抽象,InputFormat定義了兩個核心職責:
```java
public abstract class InputFormat<K, V> {
// 獲取輸入數據的分片信息
public abstract List<InputSplit> getSplits(JobContext context);
// 創建RecordReader用于讀取分片數據
public abstract RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context);
}
InputSplit表示邏輯上的數據分片,包含以下關鍵信息:
public abstract class InputSplit {
public abstract long getLength();
public abstract String[] getLocations();
}
RecordReader負責將InputSplit轉化為具體的鍵值對記錄:
public abstract class RecordReader<KEYIN, VALUEIN> {
// 初始化方法
public abstract void initialize(InputSplit split, TaskAttemptContext context);
// 讀取下一條記錄
public abstract boolean nextKeyValue();
// 獲取當前鍵
public abstract KEYIN getCurrentKey();
// 獲取當前值
public abstract VALUEIN getCurrentValue();
// 獲取進度
public abstract float getProgress();
// 關閉資源
public abstract void close();
}
處理純文本文件的默認實現,特點包括: - 每行文本記錄 - 鍵為LongWritable類型(字節偏移量) - 值為Text類型(行內容)
// 典型使用方式
Job job = Job.getInstance(conf);
job.setInputFormatClass(TextInputFormat.class);
處理鍵值對文本的特殊格式:
- 每行格式為”key[分隔符]value”
- 默認分隔符是制表符(\t)
- 可通過mapreduce.input.keyvaluelinerecordreader.key.value.separator
配置
二進制文件輸入格式,支持三種類型:
1. SequenceFileInputFormat
CompositeInputFormat 用于多路徑輸入場景: - 支持多個輸入路徑的聯合查詢 - 通過join表達式定義數據關聯方式 - 典型應用場景:Map端join操作
// 配置示例
conf.set("mapreduce.join.expr",
CompositeInputFormat.compose("inner",
KeyValueTextInputFormat.class, "/path1", "/path2"));
需要自定義輸入格式的典型場景: - 處理非標準格式數據(如二進制協議) - 需要特殊分片邏輯(如不可分割的壓縮格式) - 多數據源組合輸入 - 需要預處理原始數據
public class CustomInputFormat extends FileInputFormat<LongWritable, Text> {
@Override
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit split, TaskAttemptContext context) {
return new CustomRecordReader();
}
// 可選:覆蓋分片邏輯
@Override
protected boolean isSplitable(JobContext context, Path file) {
return false; // 示例:禁止分片
}
}
public class CustomRecordReader extends RecordReader<LongWritable, Text> {
private LineReader in;
private LongWritable key;
private Text value;
@Override
public void initialize(InputSplit split, TaskAttemptContext context) {
// 初始化讀取器
FileSplit fileSplit = (FileSplit)split;
Configuration conf = context.getConfiguration();
Path path = fileSplit.getPath();
FileSystem fs = path.getFileSystem(conf);
FSDataInputStream fileIn = fs.open(path);
in = new LineReader(fileIn, conf);
}
@Override
public boolean nextKeyValue() {
// 讀取下一條記錄
key = new LongWritable();
value = new Text();
int bytesRead = in.readLine(value);
if (bytesRead == 0) return false;
key.set(offset); // 設置偏移量
return true;
}
// 其他方法實現...
}
job.setInputFormatClass(CustomInputFormat.class);
FileInputFormat.addInputPath(job, new Path("input/"));
通過FileInputFormat設置路徑過濾器:
// 自定義過濾器
public class RegexFilter extends PathFilter {
private final String regex;
public RegexFilter(String regex) {
this.regex = regex;
}
@Override
public boolean accept(Path path) {
return path.toString().matches(regex);
}
}
// 使用配置
FileInputFormat.setInputPathFilter(job, RegexFilter.class);
conf.set("filter.pattern", ".*\\.data$");
// 添加多個輸入路徑
FileInputFormat.addInputPath(job, new Path("input1/"));
FileInputFormat.addInputPath(job, new Path("input2/"));
// 為不同路徑設置不同InputFormat
MultipleInputs.addInputPath(job, new Path("input1/"),
TextInputFormat.class, Mapper1.class);
MultipleInputs.addInputPath(job, new Path("input2/"),
SequenceFileInputFormat.class, Mapper2.class);
通過InputSampler實現數據采樣,優化分片:
// 創建采樣器
InputSampler.Sampler<Text, Text> sampler =
new InputSampler.RandomSampler<>(0.1, 1000, 10);
// 寫入分區文件
InputSampler.writePartitionFile(job, sampler);
// 配置TotalOrderPartitioner
job.setPartitionerClass(TotalOrderPartitioner.class);
String partitionFile = TotalOrderPartitioner.getPartitionFile(conf);
計算公式:
splitSize = max(minSize, min(maxSize, blockSize))
配置參數:
<!-- 最小分片大小 -->
<property>
<name>mapreduce.input.fileinputformat.split.minsize</name>
<value>0</value>
</property>
<!-- 最大分片大小 -->
<property>
<name>mapreduce.input.fileinputformat.split.maxsize</name>
<value>256000000</value>
</property>
支持壓縮格式: - DEFLATE (.deflate) - gzip (.gz) - bzip2 (.bz2) - LZO (.lzo) - Snappy (.snappy)
自動檢測機制:
CompressionCodecFactory codecFactory =
new CompressionCodecFactory(conf);
CompressionCodec codec = codecFactory.getCodec(filePath);
通過以下配置提高數據本地化:
<property>
<name>mapreduce.job.split.metainfo.maxsize</name>
<value>10000000</value>
</property>
<property>
<name>mapreduce.input.fileinputformat.split.maxsize</name>
<value>134217728</value> <!-- 匹配HDFS塊大小 -->
</property>
解決方案對比:
方案 | 優點 | 缺點 |
---|---|---|
CombineFileInputFormat | 自動合并小文件 | 需要內存緩沖 |
HAR歸檔文件 | 減少NameNode壓力 | 需要額外歸檔步驟 |
SequenceFile合并 | 高效二進制格式 | 不可直接查看內容 |
實現示例:
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 256000000);
應對策略: 1. 預處理階段數據采樣 2. 自定義分區器(Partitioner) 3. 使用Combiner減少數據傳輸 4. 傾斜鍵單獨處理
增強魯棒性的方法:
// 在RecordReader中實現容錯邏輯
try {
// 解析記錄
} catch (MalformedRecordException e) {
context.getCounter("Error", "BadRecords").increment(1);
if (skipBadRecords) continue;
else throw e;
}
通過深入理解MapReduce數據輸入機制,開發者可以針對不同業務場景選擇最優的輸入策略,構建高效可靠的大數據處理管道。 “`
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。