今天就跟大家聊聊有關如何理解flink 1.11 中的JDBC Catalog,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。
1.11.0 之前,用戶如果依賴 Flink 的 source/sink 讀寫關系型數據庫或讀取 changelog 時,必須要手動創建對應的 schema。但是這樣會有一個問題,當數據庫中的 schema 發生變化時,也需要手動更新對應的 Flink 任務以保持類型匹配,任何不匹配都會造成運行時報錯使作業失敗。這個操作冗余且繁瑣,體驗極差。
實際上對于任何和 Flink 連接的外部系統都可能有類似的上述問題,在 1.11.0 中重點解決了和關系型數據庫對接的這個問題。提供了 JDBC catalog 的基礎接口以及 Postgres catalog 的實現,這樣方便后續實現與其它類型的關系型數據庫的對接。
1.11.0 版本后,用戶使用 Flink SQL 時可以自動獲取表的 schema 而不再需要輸入 DDL。除此之外,任何 schema 不匹配的錯誤都會在編譯階段提前進行檢查報錯,避免了之前運行時報錯造成的作業失敗。
目前對于jdbc catalog,flink僅提供了postgres catalog,我們基于postgres的catalog講解一下如何使用flink的catalog ,
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.5</version>
</dependency>
通過JdbcCatalogUtils.createCatalog構造PostgresCatalog時這五個參數都是必填項,其中baseUrl要求是不能帶有數據庫名的
String catalogName = "mycatalog";
String defaultDatabase = "postgres";
String username = "postgres";
String pwd = "postgres";
String baseUrl = "jdbc:postgresql://localhost:5432/";
PostgresCatalog postgresCatalog = (PostgresCatalog) JdbcCatalogUtils.createCatalog(
catalogName,
defaultDatabase,
username,
pwd,
baseUrl);
訪問postgres 數據庫指定表名的時候完整的路徑名應該是以下格式:
<catalog>.<db>.`<schema.table>`
其中schema默認是public,如果是使用缺省值,public是可以省略的。比如下面的查詢語句:
SELECT * FROM mypg.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;
如果非缺省schema,則不能被省略:
SELECT * FROM mypg.mydb.`custom_schema.test_table2`
SELECT * FROM mydb.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;
我們PostgresCatalog將注冊到StreamTableEnvironment 的變量tEnv中,然后就可以用tEnv進行一些操作了。
tEnv.registerCatalog(postgresCatalog.getName(), postgresCatalog);
tEnv.useCatalog(postgresCatalog.getName());
System.out.println("list databases :");
String[] databases = tEnv.listDatabases();
Stream.of(databases).forEach(System.out::println);
tEnv.useDatabase(defaultDatabase);
System.out.println("list tables :");
String[] tables = tEnv.listTables(); // 也可以使用 postgresCatalog.listTables(defaultDatabase);
Stream.of(tables).forEach(System.out::println);
System.out.println("list functions :");
String[] functions = tEnv.listFunctions();
Stream.of(functions).forEach(System.out::println);
CatalogBaseTable catalogBaseTable = postgresCatalog.getTable(new ObjectPath(
defaultDatabase,
"table1"));
TableSchema tableSchema = catalogBaseTable.getSchema();
System.out.println("tableSchema --------------------- :");
System.out.println(tableSchema);
List<Row> results = Lists.newArrayList(tEnv.sqlQuery("select * from table1")
.execute()
.collect());
results.stream().forEach(System.out::println);
tEnv.executeSql("insert into table1 values (3,'c')");
完整的代碼請參考:
https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/catalog/PostgresCatalogTest.java
這個類主要是對jdbc catalog一些公共的操作做了抽象.目前實現了實際功能的只有一個方法:getPrimaryKey,其他方式主要是對于Catalog的一些其他實現類做了特殊處理,比如類似create table 或者 alter table是不支持的,listView只是返回一個空列表,因為我們使用jdbc catalog主要是來做一些DML操作。
@Override
public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
throw new UnsupportedOperationException();
}
@Override
public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException {
return Collections.emptyList();
}
在這里面,主要是實現了一些常用的操作數據庫的方法,比如getTable、listTables、listDatabases等等,其實簡單的來說就是從postgres元數據庫里查詢出來相應的信息,然后組裝成flink的相關對象,返回給調用方。以一個簡單的方法listDatabases為例:
從元數據表pg_database中查詢所有的tablename,然后去掉內置的數據庫,也就是template0和template1,然后封裝到一個list對象里,返回。
@Override
public List<String> listDatabases() throws CatalogException {
List<String> pgDatabases = new ArrayList<>();
try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
PreparedStatement ps = conn.prepareStatement("SELECT datname FROM pg_database;");
ResultSet rs = ps.executeQuery();
while (rs.next()) {
String dbName = rs.getString(1);
if (!builtinDatabases.contains(dbName)) {
pgDatabases.add(rs.getString(1));
}
}
return pgDatabases;
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing database in catalog %s", getName()), e);
}
}
有不兼容的地方需要做一些轉換,比如getTable方法,有些數據類型是不匹配的,要做一些類型的匹配,如postgres里面的serial和int4都會轉成flink的int類型,具體的參考下PostgresCatalog#fromJDBCType方法。
private DataType fromJDBCType(ResultSetMetaData metadata, int colIndex) throws SQLException {
String pgType = metadata.getColumnTypeName(colIndex);
int precision = metadata.getPrecision(colIndex);
int scale = metadata.getScale(colIndex);
switch (pgType) {
case PG_BOOLEAN:
return DataTypes.BOOLEAN();
case PG_BOOLEAN_ARRAY:
return DataTypes.ARRAY(DataTypes.BOOLEAN());
case PG_BYTEA:
return DataTypes.BYTES();
.........................
看完上述內容,你們對如何理解flink 1.11 中的JDBC Catalog有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。