溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

使用BulkLoad從HDFS批量導入數據到HBase

發布時間:2020-07-25 19:50:51 來源:網絡 閱讀:237 作者:wx5d81d4fe88546 欄目:大數據

在向Hbase中寫入數據時,常見的寫入方法有使用HBase API,Mapreduce批量導入數據,使用這些方式帶入數據時,一條數據寫入到HBase數據庫中的大致流程如圖。
使用BulkLoad從HDFS批量導入數據到HBase

數據發出后首先寫入到雨鞋日志WAl中,寫入到預寫日志中之后,隨后寫入到內存MemStore中,最后在Flush到Hfile中。這樣寫數據的方式不會導致數據的丟失,并且道正數據的有序性,但是當遇到大量的數據寫入時,寫入的速度就難以保證。所以,介紹一種性能更高的寫入方式BulkLoad。

使用BulkLoad批量寫入數據主要分為兩部分:
一、使用HFileOutputFormat2通過自己編寫的MapReduce作業將HFile寫入到HDFS目錄,由于寫入到HBase中的數據是按照順序排序的,HFileOutputFormat2中的configureIncrementalLoad()可以完成所需的配置。
二、將Hfile從HDFS移動到HBase表中,大致過程如圖
使用BulkLoad從HDFS批量導入數據到HBase

實例代碼pom依賴:

<dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.4</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>0.99.2</version>
        </dependency>
package com.yangshou;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class BulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //讀取文件中的每一條數據,以序號作為行鍵
        String line = value.toString();
        //將數據進行切分
        //切分后數組中的元素分別為:序號,用戶id,商品id,用戶行為,商品分類,時間,地址
        String[] str = line.split(" ");
        String id = str[0];
        String user_id = str[1];
        String item_id = str[2];
        String behavior = str[3];
        String item_type = str[4];
        String time = str[5];
        String address = "156";
        //拼接rowkey和put
        ImmutableBytesWritable rowkry = new ImmutableBytesWritable(id.getBytes());
        Put put = new Put(id.getBytes());
        put.add("info".getBytes(),"user_id".getBytes(),user_id.getBytes());
        put.add("info".getBytes(),"item_id".getBytes(),item_id.getBytes());
        put.add("info".getBytes(),"behavior".getBytes(),behavior.getBytes());
        put.add("info".getBytes(),"item_type".getBytes(),item_type.getBytes());
        put.add("info".getBytes(),"time".getBytes(),time.getBytes());
        put.add("info".getBytes(),"address".getBytes(),address.getBytes());
        //將數據寫出
        context.write(rowkry,put);
    }
}
package com.yangshou;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class BulkLoadDriver  {
    public static void main(String[] args) throws Exception {
        //獲取Hbase配置
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        Table table = conn.getTable(TableName.valueOf("BulkLoadDemo"));
        Admin admin = conn.getAdmin();

        //設置job
        Job job = Job.getInstance(conf,"BulkLoad");
        job.setJarByClass(BulkLoadDriver.class);
        job.setMapperClass(BulkLoadMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);

        //設置文件的輸入輸出路徑
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(HFileOutputFormat2.class);
        FileInputFormat.setInputPaths(job,new Path("hdfs://hadoopalone:9000/tmp/000000_0"));
        FileOutputFormat.setOutputPath(job,new Path("hdfs://hadoopalone:9000/demo1"));

        //將數據加載到Hbase表中
        HFileOutputFormat2.configureIncrementalLoad(job,table,conn.getRegionLocator(TableName.valueOf("BulkLoadDemo")));
        if(job.waitForCompletion(true)){
            LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf);
            load.doBulkLoad(new Path("hdfs://hadoopalone:9000/demo1"),admin,table,conn.getRegionLocator(TableName.valueOf("BulkLoadDemo")));

        }

    }
}

實例數據

44979   100640791   134060896   1   5271    2014-12-09  天津市
44980   100640791   96243605    1   13729   2014-12-02  新疆

在Hbase shell 中創建表

create 'BulkLoadDemo','info'

打包后執行
```hadoop jar BulkLoadDemo-1.0-SNAPSHOT.jar com.yangshou.BulkLoadDriver

注意:在執行hadoop jar之前應該先將Hbase中的相關包加載過來

export HADOOP_CLASSPATH=$HBASE_HOME/lib/*

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女