溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何理解flink 1.11 中的JDBC Catalog

發布時間:2021-11-23 18:14:01 來源:億速云 閱讀:290 作者:柒染 欄目:大數據

今天就跟大家聊聊有關如何理解flink 1.11 中的JDBC Catalog,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。

 

背景

1.11.0 之前,用戶如果依賴 Flink 的 source/sink 讀寫關系型數據庫或讀取 changelog 時,必須要手動創建對應的 schema。但是這樣會有一個問題,當數據庫中的 schema 發生變化時,也需要手動更新對應的 Flink 任務以保持類型匹配,任何不匹配都會造成運行時報錯使作業失敗。這個操作冗余且繁瑣,體驗極差。

實際上對于任何和 Flink 連接的外部系統都可能有類似的上述問題,在 1.11.0 中重點解決了和關系型數據庫對接的這個問題。提供了 JDBC catalog 的基礎接口以及 Postgres catalog 的實現,這樣方便后續實現與其它類型的關系型數據庫的對接。

1.11.0 版本后,用戶使用 Flink SQL 時可以自動獲取表的 schema 而不再需要輸入 DDL。除此之外,任何 schema 不匹配的錯誤都會在編譯階段提前進行檢查報錯,避免了之前運行時報錯造成的作業失敗。

 

示例

目前對于jdbc catalog,flink僅提供了postgres catalog,我們基于postgres的catalog講解一下如何使用flink的catalog ,

  • 引入pom
   <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>42.2.5</version>
        </dependency>

 
  • 新建PostgresCatalog    
    目前flink通過一個靜態類來創建相相應的jdbc  catalog,對于PostgresCatalog,沒有提供public類型的構造方法。

通過JdbcCatalogUtils.createCatalog構造PostgresCatalog時這五個參數都是必填項,其中baseUrl要求是不能帶有數據庫名的

  String catalogName = "mycatalog";
  String defaultDatabase = "postgres";
  String username = "postgres";
  String pwd = "postgres";
  String baseUrl = "jdbc:postgresql://localhost:5432/";

  PostgresCatalog postgresCatalog = (PostgresCatalog) JdbcCatalogUtils.createCatalog(
    catalogName,
    defaultDatabase,
    username,
    pwd,
    baseUrl);
 

訪問postgres 數據庫指定表名的時候完整的路徑名應該是以下格式:

<catalog>.<db>.`<schema.table>`
 

其中schema默認是public,如果是使用缺省值,public是可以省略的。比如下面的查詢語句:

SELECT * FROM mypg.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;
 

如果非缺省schema,則不能被省略:

SELECT * FROM mypg.mydb.`custom_schema.test_table2`
SELECT * FROM mydb.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;
 
  • 常見操作

我們PostgresCatalog將注冊到StreamTableEnvironment 的變量tEnv中,然后就可以用tEnv進行一些操作了。

 tEnv.registerCatalog(postgresCatalog.getName(), postgresCatalog);
  tEnv.useCatalog(postgresCatalog.getName());
 
  1. 列出來所有的數據庫:
        System.out.println("list databases :");
  String[] databases = tEnv.listDatabases();
  Stream.of(databases).forEach(System.out::println);
 
  1. 列出來所有的table
     tEnv.useDatabase(defaultDatabase);
  System.out.println("list tables :");
  String[] tables = tEnv.listTables(); // 也可以使用  postgresCatalog.listTables(defaultDatabase);
  Stream.of(tables).forEach(System.out::println);
 
  1. 列出所有函數
        System.out.println("list functions :");
  String[] functions = tEnv.listFunctions();
  Stream.of(functions).forEach(System.out::println);
 
  1. 獲取table的schema
 CatalogBaseTable catalogBaseTable = postgresCatalog.getTable(new ObjectPath(
    defaultDatabase,
    "table1"));

  TableSchema tableSchema = catalogBaseTable.getSchema();
  System.out.println("tableSchema --------------------- :");
  System.out.println(tableSchema);
 
  1. 查詢表的數據
  List<Row> results = Lists.newArrayList(tEnv.sqlQuery("select * from table1")
                                             .execute()
                                             .collect());
  results.stream().forEach(System.out::println);
 
  1. 插入數據
tEnv.executeSql("insert into table1 values (3,'c')");
 

完整的代碼請參考:

https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/catalog/PostgresCatalogTest.java

 

源碼解析

 

AbstractJdbcCatalog

這個類主要是對jdbc catalog一些公共的操作做了抽象.目前實現了實際功能的只有一個方法:getPrimaryKey,其他方式主要是對于Catalog的一些其他實現類做了特殊處理,比如類似create table 或者 alter table是不支持的,listView只是返回一個空列表,因為我們使用jdbc catalog主要是來做一些DML操作。

 @Override
 public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
  throw new UnsupportedOperationException();
 }

 @Override
 public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException {
  return Collections.emptyList();
 }
   

PostgresCatalog

在這里面,主要是實現了一些常用的操作數據庫的方法,比如getTable、listTables、listDatabases等等,其實簡單的來說就是從postgres元數據庫里查詢出來相應的信息,然后組裝成flink的相關對象,返回給調用方。以一個簡單的方法listDatabases為例:

從元數據表pg_database中查詢所有的tablename,然后去掉內置的數據庫,也就是template0和template1,然后封裝到一個list對象里,返回。

 @Override
 public List<String> listDatabases() throws CatalogException {
  List<String> pgDatabases = new ArrayList<>();

  try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {

   PreparedStatement ps = conn.prepareStatement("SELECT datname FROM pg_database;");

   ResultSet rs = ps.executeQuery();

   while (rs.next()) {
    String dbName = rs.getString(1);
    if (!builtinDatabases.contains(dbName)) {
     pgDatabases.add(rs.getString(1));
    }
   }

   return pgDatabases;
  } catch (Exception e) {
   throw new CatalogException(
    String.format("Failed listing database in catalog %s", getName()), e);
  }
 }
 

有不兼容的地方需要做一些轉換,比如getTable方法,有些數據類型是不匹配的,要做一些類型的匹配,如postgres里面的serial和int4都會轉成flink的int類型,具體的參考下PostgresCatalog#fromJDBCType方法。

 private DataType fromJDBCType(ResultSetMetaData metadata, int colIndex) throws SQLException {
  String pgType = metadata.getColumnTypeName(colIndex);

  int precision = metadata.getPrecision(colIndex);
  int scale = metadata.getScale(colIndex);

  switch (pgType) {
   case PG_BOOLEAN:
    return DataTypes.BOOLEAN();
   case PG_BOOLEAN_ARRAY:
    return DataTypes.ARRAY(DataTypes.BOOLEAN());
   case PG_BYTEA:
    return DataTypes.BYTES();
    .........................


看完上述內容,你們對如何理解flink 1.11 中的JDBC Catalog有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女