這篇文章主要為大家展示了“hbase如何編寫mapreduce”,內容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領大家一起研究并學習一下“hbase如何編寫mapreduce”這篇文章吧。
package com.hbase.test;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
public class HbaseMrTest {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = HBaseConfiguration.create();
//配置conf
conf.set("hbase.zookeeper.quorum", "bigdata01,bigdata02,bigdata03");
conf.set("hbase.zookeeper.property.clientPort", "2181");
Job job = Job.getInstance(conf, "word-count");
//指定執行job的主類
job.setJarByClass(HbaseMrTest.class);
Scan scan = new Scan();
//定義mapper需要掃描的列
scan.addColumn(Bytes.toBytes("content"), Bytes.toBytes("words"));
//配置mapper
TableMapReduceUtil.initTableMapperJob("wordcount", scan,HMapper.class , Text.class, IntWritable.class, job);
//配置recuder
TableMapReduceUtil.initTableReducerJob("result", HReducer.class, job);
//提交job
System.exit(job.waitForCompletion(true)?0:1);
}
}
// Text, IntWritable 為輸出類型
class HMapper extends TableMapper<Text, IntWritable>{
Text out = new Text();
IntWritable iw = new IntWritable(1);
@Override
protected void map(ImmutableBytesWritable key, Result value,
Mapper<ImmutableBytesWritable, Result, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
//通過result 直接過得content:words 的值
byte[] bytes = value.getValue(Bytes.toBytes("content"), Bytes.toBytes("words"));
if(bytes!=null) {
String words = Bytes.toString(bytes);
//對獲得的一行單詞進行分割
String[] ws = words.split(" ");
for(String wd : ws) {
out.set(wd);
//寫出值,如: you 1
context.write(out, iw);
}
}
}
}
// Text, IntWritable 為mapper的輸出類型
class HReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable>{
@Override
protected void reduce(Text text, Iterable<IntWritable> iter,
Reducer<Text, IntWritable, ImmutableBytesWritable, Mutation>.Context context)
throws IOException, InterruptedException {
int sum = 0 ;
//對iter遍歷
for(IntWritable intw : iter) {
sum+= intw.get();
}
//new 一個put 構造函數內的值為row key
Put put = new Put(Bytes.toBytes(text.toString()));
//put添加columnfamily 和column
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("wordcnt"), Bytes.toBytes(String.valueOf(sum)));
//將每個單詞當做row key 寫出,put是相加的總和
context.write(new ImmutableBytesWritable(Bytes.toBytes(text.toString())), put);
}
}
最后將java文件export為RaunableJar放到linux java -jar hbase.jar com.hbase.test.HbaseMrTest 運行原始數據:
運行結果:

以上是“hbase如何編寫mapreduce”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。