溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

hbase怎么在不同版本hdfs集群之間轉移數據

發布時間:2021-09-15 18:20:17 來源:億速云 閱讀:114 作者:chen 欄目:云計算

這篇文章主要講解了“hbase怎么在不同版本hdfs集群之間轉移數據”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“hbase怎么在不同版本hdfs集群之間轉移數據”吧!

很多人會有這樣一個需求:將一個hdfs集群上的數據寫入另一個hdfs集群所在的hbase數據庫。通常情況下兩個hdfs集群的版本差距并不大,這樣的程序會很容易寫。但有時會跨大版本。比如作者所在的廠子,數據都在基于hadoop0.19.2版本修改的hdfs集群上,要將這樣的數據導入版本為0.20.2+的hdfs集群,就不能使用同一個hadoop jar包來完成了。如何實現呢?

    最簡單的辦法就是把src集群的數據導到本地,然后起另一個進程將本地數據傳到des集群上去。
    不過這有幾個問題:

  • 效率降低

  • 占用本地磁盤空間

  • 不能應付實時導數據需求

  • 兩個進程需要協調,復雜度增加


    更好的辦法是在同一個進程內一邊讀src數據,一邊寫des集群。不過這相當于在同一個進程空間內加載兩個版本的hadoop jar包,這就需要在程序中使用兩個classloader來實現。
    以下代碼可以實現classloader加載自定義的jar包,并生成需要的Configuration對象:

Java代碼

  1. URL[] jarUrls = new URL[1];   

  2. jarUrls[0]=new File(des_jar_path).toURI().toURL();   

  3. ClassLoader jarloader = new URLClassLoader(jarUrls, null);   

  4. Class Proxy = Class.forName("yourclass", true, jarloader);   

  5. Configuration conf = (Configuration)Proxy.newInstance();  

URL[] jarUrls = new URL[1];

jarUrls[0]=new File(des_jar_path).toURI().toURL();

ClassLoader jarloader = new URLClassLoader(jarUrls, null);

Class Proxy = Class.forName("yourclass", true, jarloader);

Configuration conf = (Configuration)Proxy.newInstance();



    但是由于在生成HTable對象時,需要使用這個conf對象,而加載這個conf對象的代碼本身是由默認的classloader加載的,也就是0.19.2的jar包。所以在以上代碼最后一行所強制轉換的Configuration對象仍然是0.19.2版本的。那怎么辦呢?
    琢磨了一會,發現如果要實現以上功能,必須將生成HTable對象,以及以后的所有hbase操作都使用這個新的classloader,因此這個新的classloader必須加載除了0.19.2的jar包外所有需要用到的jar包,然后把所有操作都封裝進去。在外面用反射來調用。
    這樣的話,通常構造函數都不為空了,因此需要用到Constructor來構造一個自定義的構造函數
    代碼段如下:

Java代碼  

  1. main.java   

  2. void init(){   

  3.     ClassLoader jarloader = generateJarLoader();   

  4.     Class Proxy = Class.forName("test.writer.hbasewriter.HBaseProxy", true, jarloader);   

  5.     Constructor con = Proxy.getConstructor(new Class[]{String.class, String.class, boolean.class});   

  6.     Boolean autoflush = param.getBoolValue(ParamsKey.HbaseWriter.autoFlush, true);   

  7.     proxy = con.newInstance(new Object[]{path, tablename, autoflush});   

  8. }   

  9. void put(){   

  10. ...   

  11.     while((line = getLine()) != null) {   

  12.         proxy.getClass().getMethod("generatePut",String.class).invoke(proxy, line.getField(rowkey));   

  13.         Method addPut = proxy.getClass().getMethod("addPut",   

  14.                 new Class[]{String.class, String.class, String.class});   

  15.         addPut.invoke(proxy, new Object[]{field, column, encode});   

  16.         proxy.getClass().getMethod("putLine").invoke(proxy);   

  17.     }   

  18. }   

  19.   

  20. ClassLoader generateJarLoader() throws IOException {   

  21.       String libPath = System.getProperty("java.ext.dirs");   

  22.       FileFilter filter = new FileFilter() {   

  23.       @Override  

  24.       public boolean accept(File pathname) {   

  25.         if(pathname.getName().startsWith("hadoop-0.19.2"))   

  26.           return false;   

  27.         else  

  28.             return pathname.getName().endsWith(".jar");   

  29.       }   

  30.       };   

  31.       File[] jars = new File(libPath).listFiles(filter);   

  32.       URL[] jarUrls = new URL[jars.length+1];   

  33.            

  34.       int k = 0;   

  35.       for (int i = 0; i < jars.length; i++) {   

  36.         jarUrls[k++] = jars[i].toURI().toURL();   

  37.       }   

  38.       jarUrls[k] = new File("hadoop-0.20.205.jar")   

  39.       ClassLoader jarloader = new URLClassLoader(jarUrls, null);   

  40.       return jarloader;   

  41. }  

main.java

void init(){

  ClassLoader jarloader = generateJarLoader();

  Class Proxy = Class.forName("test.writer.hbasewriter.HBaseProxy", true, jarloader);

  Constructor con = Proxy.getConstructor(new Class[]{String.class, String.class, boolean.class});

  Boolean autoflush = param.getBoolValue(ParamsKey.HbaseWriter.autoFlush, true);

  proxy = con.newInstance(new Object[]{path, tablename, autoflush});

}

void put(){

...

  while((line = getLine()) != null) {

   proxy.getClass().getMethod("generatePut",String.class).invoke(proxy, line.getField(rowkey));

   Method addPut = proxy.getClass().getMethod("addPut",

      new Class[]{String.class, String.class, String.class});

   addPut.invoke(proxy, new Object[]{field, column, encode});

   proxy.getClass().getMethod("putLine").invoke(proxy);

  }

}


ClassLoader generateJarLoader() throws IOException {

      String libPath = System.getProperty("java.ext.dirs");

      FileFilter filter = new FileFilter() {

      @Override

      public boolean accept(File pathname) {

        if(pathname.getName().startsWith("hadoop-0.19.2"))

          return false;

        else

         return pathname.getName().endsWith(".jar");

      }

      };

      File[] jars = new File(libPath).listFiles(filter);

      URL[] jarUrls = new URL[jars.length+1];

      int k = 0;

      for (int i = 0; i < jars.length; i++) {

        jarUrls[k++] = jars[i].toURI().toURL();

      }

      jarUrls[k] = new File("hadoop-0.20.205.jar")

      ClassLoader jarloader = new URLClassLoader(jarUrls, null);

      return jarloader;

}

Java代碼 

  1. HBaseProxy.java   

  2. public HBaseProxy(String hbase_conf, String tableName, boolean autoflush)   

  3.      throws IOException{   

  4.         Configuration conf = new Configuration();   

  5.         conf.addResource(new Path(hbase_conf));   

  6.         config = new Configuration(conf);   

  7.         htable = new HTable(config, tableName);   

  8.         admin = new HBaseAdmin(config);   

  9.         htable.setAutoFlush(autoflush);   

  10.     }   

  11. public void addPut(String field, String column, String encode) throws IOException {   

  12.     try {   

  13.             p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),   

  14.                     field.getBytes(encode));   

  15.         } catch (UnsupportedEncodingException e) {   

  16.             p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),   

  17.                     field.getBytes());   

  18.         }   

  19.            

  20.     }   

  21.     public void generatePut(String rowkey){   

  22.         p = new Put(rowkey.getBytes());   

  23.     }   

  24.        

  25.     public void putLine() throws IOException{   

  26.         htable.put(p);   

  27.     }  

HBaseProxy.java

public HBaseProxy(String hbase_conf, String tableName, boolean autoflush)

     throws IOException{

   Configuration conf = new Configuration();

   conf.addResource(new Path(hbase_conf));

   config = new Configuration(conf);

   htable = new HTable(config, tableName);

   admin = new HBaseAdmin(config);

   htable.setAutoFlush(autoflush);

  }

public void addPut(String field, String column, String encode) throws IOException {

    try {

     p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),

        field.getBytes(encode));

   } catch (UnsupportedEncodingException e) {

     p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),

        field.getBytes());

   }

  }

    public void generatePut(String rowkey){

   p = new Put(rowkey.getBytes());

  }

    public void putLine() throws IOException{

   htable.put(p);

  }


    總之,在同一個進程中加載多個classloader時一定要注意,classloader A所加載的對象是不能轉換成classloader B的對象的,當然也不能使用。兩個空間的相互調用只能用java的基本類型或是反射。

感謝各位的閱讀,以上就是“hbase怎么在不同版本hdfs集群之間轉移數據”的內容了,經過本文的學習后,相信大家對hbase怎么在不同版本hdfs集群之間轉移數據這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女