# 如何編寫MapReduce程序
## 1. MapReduce概述
### 1.1 什么是MapReduce
MapReduce是一種分布式計算編程模型,由Google在2004年提出,主要用于大規模數據集(大于1TB)的并行運算。其核心思想是將計算過程分解為兩個主要階段:
- **Map階段**:對輸入數據進行分割和處理
- **Reduce階段**:對Map結果進行匯總
### 1.2 工作原理
1. 輸入數據被自動分割成固定大小的塊(通常64MB或128MB)
2. Master節點將Map任務分配給Worker節點
3. Map任務處理輸入數據并生成中間鍵值對
4. 系統對中間結果進行排序和分組
5. Reduce任務處理分組后的數據
6. 最終結果寫入分布式文件系統
### 1.3 適用場景
- 大規模日志分析
- 網頁索引構建
- 數據挖掘
- 機器學習特征提取
## 2. 開發環境搭建
### 2.1 基礎環境要求
- Java JDK 1.8+
- Hadoop 2.7+(推薦3.x版本)
- Maven(項目管理工具)
- IDE(IntelliJ IDEA或Eclipse)
### 2.2 Hadoop安裝配置
```bash
# 下載Hadoop
wget https://archive.apache.org/dist/hadoop/core/hadoop-3.3.1/hadoop-3.3.1.tar.gz
# 解壓并配置環境變量
export HADOOP_HOME=/path/to/hadoop
export PATH=$PATH:$HADOOP_HOME/bin
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.3.1</version>
</dependency>
</dependencies>
組件 | 職責 |
---|---|
InputFormat | 定義輸入數據格式和分割方式 |
Mapper | 實現map()方法處理輸入記錄 |
Partitioner | 決定中間結果的Reduce節點分配 |
Reducer | 實現reduce()方法匯總結果 |
OutputFormat | 定義輸出數據格式 |
原始數據 → InputSplit → RecordReader →
Mapper → Partitioner → Shuffle & Sort →
Reducer → RecordWriter → 輸出文件
public class WordCount {
// Mapper實現
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
// Reducer實現
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
// 主驅動程序
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Mapper類:
Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
map()
方法處理每條記錄Reducer類:
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
reduce()
方法匯總相同鍵的值Driver程序:
Combiner是本地Reduce操作,可減少網絡傳輸:
job.setCombinerClass(IntSumReducer.class);
實現數據均衡分布:
public class CustomPartitioner extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text key, IntWritable value, int numPartitions) {
// 按首字母分區
return key.toString().charAt(0) % numPartitions;
}
}
實現Writable接口:
public class WebLogRecord implements Writable {
private Text ip;
private LongWritable timestamp;
// 實現write()和readFields()方法
@Override
public void write(DataOutput out) throws IOException {
ip.write(out);
timestamp.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
ip.readFields(in);
timestamp.readFields(in);
}
}
參數 | 默認值 | 建議值 | 說明 |
---|---|---|---|
mapreduce.task.io.sort.mb | 100 | 200 | 排序緩沖區大小 |
mapreduce.map.sort.spill.percent | 0.8 | 0.9 | 溢出比例 |
mapreduce.reduce.shuffle.parallelcopies | 5 | 20 | 并行拷貝數 |
輸入文件處理:
內存配置:
<!-- mapred-site.xml -->
<property>
<name>mapreduce.map.memory.mb</name>
<value>2048</value>
</property>
壓縮中間結果:
conf.set("mapreduce.map.output.compress", "true");
conf.set("mapreduce.map.output.compress.codec",
"org.apache.hadoop.io.compress.SnappyCodec");
Configuration conf = new Configuration();
conf.set("mapreduce.framework.name", "local");
conf.set("fs.defaultFS", "file:///");
查看任務日志:
yarn logs -applicationId <app_id>
內存溢出:
數據傾斜:
任務超時:
<property>
<name>mapreduce.task.timeout</name>
<value>600000</value>
</property>
分析Nginx日志統計: - 每個URL的訪問量 - 每個IP的訪問頻率 - 高峰時段統計
192.168.1.1 - - [10/Oct/2023:14:32:01 +0800] "GET /index.html HTTP/1.1" 200 2326
public class LogMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text url = new Text();
private final static IntWritable one = new IntWritable(1);
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] parts = line.split(" ");
if(parts.length > 6) {
url.set(parts[6]); // 提取URL
context.write(url, one);
}
}
}
框架 | 特點 | 適用場景 |
---|---|---|
Spark | 內存計算 | 迭代算法 |
Flink | 流批一體 | 實時處理 |
Hive | SQL接口 | 數據倉庫 |
注意:實際運行MapReduce程序前,需確保Hadoop集群已正確配置。建議先在偽分布式環境下測試,再部署到生產集群。 “`
(全文約4200字,包含代碼示例、配置參數和實用技巧)
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。