魯春利的工作筆記,誰說程序員不能有文藝范?
環境:
hadoop-2.6.0
hbase-1.0.1
zookeeper-3.4.6
1、Hadoop集群配置過程略;
2、Zookeeper集群配置過程略;
3、HBase集群配置過程略;
4、HBase作為輸入源示例
查看當前hbase表m_domain中的數據
[hadoop@dnode1 conf]$ hbase shell HBase Shell; enter 'help<RETURN>' for list of supported commands. Type "exit<RETURN>" to leave the HBase Shell Version 1.0.1, r66a93c09df3b12ff7b86c39bc8475c60e15af82d, Fri Apr 17 22:14:06 PDT 2015 hbase(main):001:0> list TABLE m_domain t_domain 2 row(s) in 0.9270 seconds => ["m_domain", "t_domain"] hbase(main):002:0> scan 'm_domain' ROW COLUMN+CELL alibaba.com_19990415_20220523 column=cf:access_server, timestamp=1440947490018, value=\xE6\x9D\xAD\xE5\xB7\x9E alibaba.com_19990415_20220523 column=cf:exp_date, timestamp=1440947490018, value=2022\xE5\xB9\xB405\xE6\x9C\x8823\xE6\x97\xA5 alibaba.com_19990415_20220523 column=cf:ipstr, timestamp=1440947490018, value=205.204.101.42 alibaba.com_19990415_20220523 column=cf:owner, timestamp=1440947490018, value=Hangzhou Alibaba Advertising Co. alibaba.com_19990415_20220523 column=cf:reg_date, timestamp=1440947490018, value=1999\xE5\xB9\xB404\xE6\x9C\x8815\xE6\x97\xA5 baidu.com_19991011_20151011 column=cf:access_server, timestamp=1440947489956, value=\xE5\x8C\x97\xE4\xBA\xAC baidu.com_19991011_20151011 column=cf:exp_date, timestamp=1440947489956, value=2015\xE5\xB9\xB410\xE6\x9C\x8811\xE6\x97\xA5 baidu.com_19991011_20151011 column=cf:ipstr, timestamp=1440947489956, value=220.181.57.217 baidu.com_19991011_20151011 column=cf:reg_date, timestamp=1440947489956, value=1999\xE5\xB9\xB410\xE6\x9C\x8811\xE6\x97\xA5 2 row(s) in 1.4560 seconds hbase(main):003:0> quit
實現Mapper端
package com.invic.mapreduce.hbase.source;
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Set;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
/**
*
* @author lucl
* TableMapper擴展自Mapper類,所有以HBase作為輸入源的Mapper類都需要繼承該類
*/
public class HBaseReaderMapper extends TableMapper<Writable, Writable> {
private Text key = new Text();
private Text value = new Text();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
}
@Override
protected void map(ImmutableBytesWritable row, Result result,Context context)
throws IOException, InterruptedException {
// 可以明確給定family
{
NavigableMap<byte[], byte[]> map = result.getFamilyMap("cf".getBytes());
Set<Entry<byte[], byte[]>> values = map.entrySet();
for (Entry<byte[], byte[]> entry : values) {
String columnQualifier = new String(entry.getKey());
String cellValue = new String(entry.getValue());
System.out.println(columnQualifier + "\t" + cellValue);
//
}
}
// 存在多個列族或者不確定列族名字
{
String rowKey = new String(row.get());
byte [] columnFamily = null;
byte [] columnQualifier = null;
byte [] cellValue = null;
StringBuffer sbf = new StringBuffer(1024);
for (Cell cell : result.listCells()) {
columnFamily = CellUtil.cloneFamily(cell);
columnQualifier = CellUtil.cloneQualifier(cell);
cellValue = CellUtil.cloneValue(cell);
sbf.append(Bytes.toString(columnFamily));
sbf.append(".");
sbf.append(Bytes.toString(columnQualifier));
sbf.append(":");
sbf.append(new String(cellValue, "UTF-8"));
}
key.set(rowKey);
value.set(sbf.toString());
context.write(key, value);
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException{
super.cleanup(context);
}
} 實現MapReduce的Driver類
package com.invic.mapreduce.hbase.source;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
*
* @author lucl
* HBase作為輸入源示例
*
*/
public class HBaseASDataSourceDriver extends Configured implements Tool {
/**
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// System.setProperty("hadoop.home.dir", "E:\\hadoop-2.6.0\\hadoop-2.6.0\\");
int exit = ToolRunner.run(new HBaseASDataSourceDriver(), args);
System.out.println("receive exit : " + exit);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
// hadoop的參數配置
/*conf.set("fs.defaultFS", "hdfs://cluster");
conf.set("dfs.nameservices", "cluster");
conf.set("dfs.ha.namenodes.cluster", "nn1,nn2");
conf.set("dfs.namenode.rpc-address.cluster.nn1", "nnode:8020");
conf.set("dfs.namenode.rpc-address.cluster.nn2", "dnode1:8020");
conf.set("dfs.client.failover.proxy.provider.cluster",
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");*/
// hbase master
// property "hbase.master" has been deprecated since 0.90
// Just passing the ZK configuration makes your client auto-discover the master
// conf.set("hbase.master", "nnode:60000");
// zookeeper quorum
getConf().set("hbase.zookeeper.property.clientport", "2181");
getConf().set("hbase.zookeeper.quorum", "nnode,dnode1,dnode2");
// 是否對Map Task啟用推測執行機制
getConf().setBoolean("mapreduce.map.speculative", false);
// 是否對Reduce Task啟用推測執行機制
getConf().setBoolean("mapreduce.reduce.speculative", false);
Job job = Job.getInstance(conf);
job.setJobName("MyBaseReaderFromHBase");
job.setJarByClass(HBaseASDataSourceDriver.class);
job.setOutputFormatClass(TextOutputFormat.class);
/**
* 從HBase讀取數據時數據會傳給下面定義的Mapper來,在Mapper類中進行了數據的處理
* 由于在job中未指定Reducer類,會調用默認的Reducer類來將Mapper的輸出原封不動的寫入;
* 如果需要在Reducer中再做些其他的單獨的處理,則可以自定義Reducer類再做些處理。
*/
Scan scan = new Scan();
// scan.addFamily(family);
// scan.addColumn(family, qualifier);
byte [] tableName = Bytes.toBytes("m_domain");
TableMapReduceUtil.initTableMapperJob(tableName, scan, HBaseReaderMapper.class, Text.class, Text.class, job);
Path path = new Path("/" + System.currentTimeMillis());
FileOutputFormat.setOutputPath(job, path);
return job.waitForCompletion(true) ? 0 : 1;
}
}查看結果:

問題記錄:
a. 通過Eclipse執行時報錯,但未分析出原因
b. 放到集群環境中運行時Mapper類如果定義在Driver類中,則報錯
ClassNotFound for HBaseASDataSourceDriver$HBaseReaderMapper init()
c. zookeeper連接符總是顯示連接的為127.0.0.1而非配置的zookeeper.quorum
如果zookeeper集群環境與hbase環境在不同的機器不知道是否會出現問題。
5、Hbase作為輸出源示例
文本文件內容如下:
2013-09-13 16:04:08 www.subnetc1.com 192.168.1.7 80 192.168.1.139 18863 HTTP www.subnetc1.com/index.html 2013-09-13 16:04:08 www.subnetc2.com 192.168.1.7 80 192.168.1.159 14100 HTTP www.subnetc2.com/index.html 2013-09-13 16:04:08 www.subnetc3.com 192.168.1.7 80 192.168.1.130 4927 HTTP www.subnetc3.com/index.html 2013-09-13 16:04:08 www.subnetc4.com 192.168.1.7 80 192.168.1.154 39044 HTTP www.subnetc4.com/index.html
Map端代碼:
package com.invic.mapreduce.hbase.target;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MyMapper extends Mapper<Object, Text, Text, Text> {
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// 用來實現wordcount功能,示例程序, Mapper<Object, Text, Text, IntWritable>
/*{
IntWritable one = new IntWritable(1);
Text word = new Text();
StringTokenizer token = new StringTokenizer(value.toString());
while (token.hasMoreTokens()) {
word.set(token.nextToken());
context.write(word, one);
}
}*/
// 將多列數據寫入hbase, Mapper<Object, Text, Text, Text>
{
String [] temps = value.toString().split("\t");
if (null != temps && temps.length == 8) {
Text word = new Text();
word.set(temps[1]);
context.write(word, value);
}
}
}
}Reducer端代碼:
package com.invic.mapreduce.hbase.target;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
/**
*
* @author lucl
*
*/
public class MyReducer extends TableReducer<Text, Text, ImmutableBytesWritable> {
@Override
public void reduce(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException {
// for wordcount
// TableReducer<Text, IntWritable, ImmutableBytesWritable>
// Iterable<IntWritable>
/*{
int sum = 0;
for (Iterator<IntWritable> it = value.iterator(); it.hasNext(); ) {
IntWritable val = it.next();
sum += val.get();
}
Put put = new Put(key.getBytes());
// sum為Integer類型,需要先轉為S他ring,然后再取byte值,否則查看數據時無法顯示sum的值
byte [] datas = Bytes.toBytes(String.valueOf(sum));
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("count"), datas);
context.write(new ImmutableBytesWritable(key.getBytes()), put);
}*/
// 需要將多列寫入HBase
// TableReducer<Text, Text, ImmutableBytesWritable>
// Iterable<Text> value
{
byte [] family = "cf".getBytes();
Put put = new Put(key.getBytes());
StringBuffer sbf = new StringBuffer();
for (Text text : value) {
sbf.append(text.toString());
}
put.addColumn(family, Bytes.toBytes("detail"), Bytes.toBytes(sbf.toString()));
context.write(new ImmutableBytesWritable(key.getBytes()), put);
}
}
}Driver驅動類:
package com.invic.mapreduce.hbase.target;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
*
* @author lucl
* HBase作為輸出源示例
*
*/
public class HBaseASDataTargetDriver extends Configured implements Tool {
private static final String TABLE_NAME = "t_inter_log";
private static final String COLUMN_FAMILY_NAME = "cf";
/**
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// for eclipse
// System.setProperty("hadoop.home.dir", "E:\\hadoop-2.6.0\\hadoop-2.6.0\\");
int exit = ToolRunner.run(new HBaseASDataTargetDriver(), args);
System.out.println("receive exit : " + exit);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create(getConf());
// hadoop的參數配置
conf.set("fs.defaultFS", "hdfs://cluster");
conf.set("dfs.nameservices", "cluster");
conf.set("dfs.ha.namenodes.cluster", "nn1,nn2");
conf.set("dfs.namenode.rpc-address.cluster.nn1", "nnode:8020");
conf.set("dfs.namenode.rpc-address.cluster.nn2", "dnode1:8020");
conf.set("dfs.client.failover.proxy.provider.cluster",
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
// hbase master
// property "hbase.master" has been deprecated since 0.90
// Just passing the ZK configuration makes your client auto-discover the master
// conf.set("hbase.master", "nnode:60000");
// zookeeper quorum
conf.set("hbase.zookeeper.property.clientport", "2181");
conf.set("hbase.zookeeper.quorum", "nnode,dnode1,dnode2");
// 是否對Map Task啟用推測執行機制
conf.setBoolean("mapreduce.map.speculative", false);
// 是否對Reduce Task啟用推測執行機制
conf.setBoolean("mapreduce.reduce.speculative", false);
/**
* HBase創建表
*/
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
TableName tableName = TableName.valueOf(TABLE_NAME);
boolean exists = admin.tableExists(tableName);
if (exists) {
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
HTableDescriptor tableDesc = new HTableDescriptor(tableName);
HColumnDescriptor columnDesc = new HColumnDescriptor(COLUMN_FAMILY_NAME);
tableDesc.addFamily(columnDesc);
admin.createTable(tableDesc);
/**
* 讀取文件內容
*/
String fileName = "http_interceptor_20130913.txt";
Job job = Job.getInstance(conf);
job.setJobName("MyBaseWriterToHBase");
job.setJarByClass(HBaseASDataTargetDriver.class);
job.setMapperClass(MyMapper.class);
/**
* MapReduce讀取文本文件時默認的的四個參數(KeyIn, ValueIn,KeyOut,ValueOut)
* 說明:
* 默認情況下KeyIn為IntWrite類型,為在文本文件中的偏移量,ValueIn為一行數據
* 第一次測試時未設置的設置map端輸出的key-value類型,程序執行正常
* 第二次增加map端輸出的key-value類型設置
* job.setMapOutputKeyClass
* job.setMapOutputValueClass
* Hadoop應用開發技術詳解2015年1月第1版P191頁寫的:
* map端輸出的key-value默認類型分別為LongWritable和Text
* 根據示例程序MyMapper中實現的map端輸出的key-value實際為Text和IntWritable
*
// job.setMapOutputKeyClass(LongWritable.class);
// job.setMapOutputValueClass(Text.class);
// 設置后頁面調用時報錯如下:
15/09/04 22:19:06 INFO mapreduce.Job: Task Id : attempt_1441346242717_0014_m_000000_0, Status : FAILED
Error: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.LongWritable, received org.apache.hadoop.io.Text
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1069)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:712)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
at com.invic.mapreduce.hbase.target.MyMapper.map(MyMapper.java:21)
at com.invic.mapreduce.hbase.target.MyMapper.map(MyMapper.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
第三次設置為與Mapper類中一致的,程序執行正確。
*/
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 下面這句話不能加,在測試中發現加了這句話竟然報錯找不到MyReducer類了。
// job.setReducerClass(MyReducer.class);
Path path = new Path(fileName);
FileInputFormat.addInputPath(job, path);
TableMapReduceUtil.initTableReducerJob(TABLE_NAME, MyReducer.class, job);
// for wordcount
// job.setOutputKeyClass(Text.class);
// job.setOutputValueClass(IntWritable.class);
// for multi columns
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
return job.waitForCompletion(true) ? 0 : 1;
}
} 未設置Map輸出的key-value的類型時報錯如下(wordcount的示例未報錯,在Hadoop應用開發技術詳解中說map端輸出的key-value默認類型為:LongWritable.class和Text.class,但是wordcount示例中map端輸出的key-value類型卻為Text.class和IntWritable):
15/09/04 21:15:54 INFO mapreduce.Job: map 0% reduce 0% 15/09/04 21:16:27 INFO mapreduce.Job: Task Id : attempt_1441346242717_0011_m_000000_0, Status : FAILED Error: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.IntWritable, received org.apache.hadoop.io.Text at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1074) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:712) at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89) at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112) at com.invic.mapreduce.hbase.target.MyMapper.map(MyMapper.java:29) at com.invic.mapreduce.hbase.target.MyMapper.map(MyMapper.java:1) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) # 由于出現錯誤時Map端為0%,所以分析問題出現在map端,且根據提示信息說明默認value應該是IntWritable,我第二次的示例與wordcount的差別主要在map端輸出的value由IntWritabe->Text,設置了如下參數后問題解決。 # job.setMapOutputKeyClass(Text.class); # job.setMapOutputValueClass(Text.class);
wordcount及數據入庫示例程序執行結果驗證:
hbase(main):005:0> scan 't_inter_log' ROW COLUMN+CELL 14100 column=cf:count, timestamp=1441370812728, value=1 16:04:08 column=cf:count, timestamp=1441370812728, value=4 18863:08 column=cf:count, timestamp=1441370812728, value=1 192.168.1.130 column=cf:count, timestamp=1441370812728, value=1 192.168.1.139 column=cf:count, timestamp=1441370812728, value=1 192.168.1.154 column=cf:count, timestamp=1441370812728, value=1 192.168.1.159 column=cf:count, timestamp=1441370812728, value=1 192.168.1.759 column=cf:count, timestamp=1441370812728, value=4 2013-09-13759 column=cf:count, timestamp=1441370812728, value=4 3904409-13759 column=cf:count, timestamp=1441370812728, value=1 4927409-13759 column=cf:count, timestamp=1441370812728, value=1 8027409-13759 column=cf:count, timestamp=1441370812728, value=4 HTTP409-13759 column=cf:count, timestamp=1441370812728, value=4 www.subnetc1.com column=cf:count, timestamp=1441370812728, value=1 www.subnetc1.com/index.html column=cf:count, timestamp=1441370812728, value=1 www.subnetc2.com/index.html column=cf:count, timestamp=1441370812728, value=1 www.subnetc3.com/index.html column=cf:count, timestamp=1441370812728, value=1 www.subnetc4.com/index.html column=cf:count, timestamp=1441370812728, value=1 18 row(s) in 1.2290 seconds # 每次執行時都會先刪除t_inter_log表 hbase(main):007:0> scan 't_inter_log' ROW COLUMN+CELL www.subnetc1.com column=cf:detail, timestamp=1441373481468, value=2013-09-13 16:04:08\x09www.subnetc1.com\x09192.168.1.7\x0980\x09192.168.1.139\x0918863\x09HTTP\x09www.subnetc1.com/index.html www.subnetc2.com column=cf:detail, timestamp=1441373481468, value=2013-09-13 16:04:08\x09www.subnetc2.com\x09192.168.1.7\x0980\x09192.168.1.159\x0914100\x09HTTP\x09www.subnetc2.com/index.html www.subnetc3.com column=cf:detail, timestamp=1441373481468, value=2013-09-13 16:04:08\x09www.subnetc3.com\x09192.168.1.7\x0980\x09192.168.1.130\x094927\x09HTTP\x09www.subnetc3.com/index.html www.subnetc4.com column=cf:detail, timestamp=1441373481468, value=2013-09-13 16:04:08\x09www.subnetc4.com\x09192.168.1.7\x0980\x09192.168.1.154\x0939044\x09HTTP\x09www.subnetc4.com/index.html 4 row(s) in 3.3280 seconds
6、HBase作為共享源示例
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。