實驗名稱:Datajoin數據連接
實驗目的:
1、記錄我的Hadoop 實驗過程,我是NCU HANG TIAN BAN 的學生。將會附上完整可運行的代碼。程序中框架是一套模板百度的、書上也有但是重要算法是我自己寫的將會標注。 http://blog.csdn.net/wawmg/article/details/8759076 這是我參考的框架模板。
2、提示大致瀏覽可看加粗部分【1、2、3、4】
實驗要求:
任務1、多個數據源的內連接
【數據樣例】
輸入:
factory:
factoryname addressID
Beijing Red Star 1
Shenzhen Thunder 3
Guangzhou Honda 2
Beijing Rising 1
Guangzhou Development Bank 2
Tencent 3
Bank of Beijing 1
Nanchang Univ 5
address:
addressID addressname
1 Beijing
2 Guangzhou
3 Shenzhen
4 Xian
輸出:
factorynameaddressIDaddressname
Bank of Beijing1Beijing
Beijing Red Star1Beijing
Beijing Rising1eijing
Guangzhou Development Bank2 Guangzhou
Guangzhou Honda2 Guangzhou
Shenzhen Thunder3 Shenzhen
Tencent3 Shenzhen
[代碼開始了]【1】
// 先是TaggedWritable類 抄的不作改動
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
/*TaggedMapOutput是一個抽象數據類型,封裝了標簽與記錄內容
此處作為DataJoinMapperBase的輸出值類型,需要實現Writable接口,所以要實現兩個序列化方法
自定義輸入類型*/
public class TaggedWritable extends TaggedMapOutput {
private Writable data;
public TaggedWritable() {
this.tag = new Text();
}
public TaggedWritable(Writable data) // 構造函數
{
this.tag = new Text(); // tag可以通過setTag()方法進行設置
this.data = data;
}
@Override
public void readFields(DataInput in) throws IOException {
tag.readFields(in);
String dataClz = in.readUTF();
if (this.data == null
|| !this.data.getClass().getName().equals(dataClz)) {
try {
this.data = (Writable) ReflectionUtils.newInstance(
Class.forName(dataClz), null);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
data.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
tag.write(out);
out.writeUTF(this.data.getClass().getName());
data.write(out);
}
@Override
public Writable getData() {
return data;
}
}
// http://blog.csdn.net/wawmg/article/details/8759076
【2】Map階段 算法自己寫的
import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.io.Text;
public class JoinMapper extends DataJoinMapperBase {
// 這個在任務開始時調用,用于產生標簽
// 此處就直接以文件名作為標簽
@Override
protected Text generateInputTag(String inputFile) {
System.out.println("inputFile = " + inputFile);
return new Text(inputFile);
}
// 這里我們已經確定分割符為',',更普遍的,用戶應能自己指定分割符和組鍵。
// 設置組鍵
@Override
protected Text generateGroupKey(TaggedMapOutput record) {
String tag = ((Text) record.getTag()).toString();
if(tag.indexOf("factory") != -1){
String line = ((Text) record.getData()).toString();
String[] tokens = line.split(" ");
int len = tokens.length - 1;
return new Text(tokens[len]);
}else{
String line = ((Text) record.getData()).toString();
String[] tokens = line.split(" ");
return new Text(tokens[0]);
}
}
// 返回一個任何帶任何我們想要的Text標簽的TaggedWritable
@Override
protected TaggedMapOutput generateTaggedMapOutput(Object value) {
TaggedWritable retv = new TaggedWritable((Text) value);
retv.setTag(this.inputTag); // 不要忘記設定當前鍵值的標簽
return retv;
}
}
【3】reduce階段 算法也是自己寫的
import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.io.Text;
public class JoinReducer extends DataJoinReducerBase {
// 兩個參數數組大小一定相同,并且最多等于數據源個數
@Override
protected TaggedMapOutput combine(Object[] tags, Object[] values) {
if (tags.length < 2) return null; // 這一步,實現內聯結
String joinedStr = "";
String dd = " ";
for (int i = 0; i < values.length; i++) {
// 以逗號作為原兩個數據源記錄鏈接的分割符
TaggedWritable tw = (TaggedWritable) values[i];
String line = ((Text) tw.getData()).toString();
// 將一條記錄劃分兩組,去掉第一組的組鍵名。
if( i == 0){
String[] tokens = line.split(" ");
dd += tokens[1];
}
if(i == 1){
joinedStr += line;
System.out.println(joinedStr);
}
}
joinedStr += dd;
TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
retv.setTag((Text) tags[1]); // 這只retv的組鍵,作為最終輸出鍵。
return retv;
}
}
【4】Driver 驅動類 抄的不作改動
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DataJoinDriver extends Configured implements Tool {
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Path in = new Path("hdfs://localhost:9000/user/c/input/*.txt");
Path out = new Path("hdfs://localhost:9000/user/c/output2");
JobConf job = new JobConf(conf, DataJoinDriver.class);
job.setJobName("DataJoin");
FileSystem hdfs = FileSystem.get(conf);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TaggedWritable.class);
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new DataJoinDriver(),
args);
System.exit(res);
}
}
最后:輸出有點小問題,就是沒有做排序。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。