溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

FLINK 1.12 upsertSql怎么使用

發布時間:2021-12-22 11:46:42 來源:億速云 閱讀:768 作者:iii 欄目:大數據
# FLINK 1.12 upsertSql 使用詳解

## 目錄
1. [Upsert 概念解析](#upsert-概念解析)  
2. [Flink SQL 中的 Upsert 模式](#flink-sql-中的-upsert-模式)  
3. [upsertSql 語法詳解](#upsertsql-語法詳解)  
4. [實戰:Kafka 到 MySQL 的 Upsert 示例](#實戰kafka-到-mysql-的-upsert-示例)  
5. [性能優化與常見問題](#性能優化與常見問題)  
6. [與 CDC 的集成實踐](#與-cdc-的集成實踐)  
7. [版本兼容性說明](#版本兼容性說明)  
8. [最佳實踐總結](#最佳實踐總結)  

---

## Upsert 概念解析

### 什么是 Upsert
Upsert(Update + Insert)是一種混合操作,當記錄存在時更新,不存在時插入。在分布式系統中,這是保證數據一致性的關鍵操作。

```sql
-- 傳統SQL示例
INSERT INTO table (id, value) 
VALUES (1, 'a') 
ON DUPLICATE KEY UPDATE value = 'a';

Flink 中的實現挑戰

  1. 無界數據流:需要處理持續變化的鍵狀態
  2. 精確一次語義:確保在故障恢復時不重復處理
  3. 狀態管理:高效維護鍵值索引

Flink SQL 中的 Upsert 模式

核心配置參數

-- 啟用upsert模式
CREATE TABLE upsert_table (
    id INT PRIMARY KEY,
    name STRING,
    cnt BIGINT
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3306/test',
    'table-name' = 'target_table',
    'username' = 'user',
    'password' = 'pass',
    'sink.upsert-materialize' = 'NONE' -- 可選:FORCE/NONE
);

支持的 Connector 類型

Connector 是否支持 特殊配置
JDBC ? 需定義PRIMARY KEY
Kafka ? 需配置key.format
HBase ? 需配置rowkey
Elasticsearch ? 需配置document-id.key-delimiter

upsertSql 語法詳解

完整語法結構

INSERT INTO [catalog_name.][db_name.]table_name
  { VALUES (value1 [, value2]*) | SELECT query }
  [ON DUPLICATE KEY UPDATE col1 = val1 [, col2 = val2]*]

典型場景示例

場景1:主鍵沖突更新

INSERT INTO user_actions
SELECT user_id, action_time, action_type 
FROM kafka_source
ON DUPLICATE KEY UPDATE 
    action_time = VALUES(action_time),
    action_type = VALUES(action_type);

場景2:條件更新

INSERT INTO order_states
SELECT order_id, status, update_time
FROM source_stream
ON DUPLICATE KEY UPDATE
    status = IF(VALUES(update_time) > update_time, 
              VALUES(status), 
              status);

實戰:Kafka 到 MySQL 的 Upsert 示例

完整 Pipeline 示例

-- 1. 創建Kafka源表
CREATE TABLE kafka_orders (
    order_id STRING,
    product_id STRING,
    amount DECIMAL(10,2),
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'json'
);

-- 2. 創建MySQL目標表
CREATE TABLE mysql_order_summary (
    product_id STRING PRIMARY KEY,
    total_amount DECIMAL(10,2),
    last_order_time TIMESTAMP(3)
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://mysql:3306/db',
    'table-name' = 'order_summary',
    'username' = 'user',
    'password' = 'pass'
);

-- 3. 執行Upsert操作
INSERT INTO mysql_order_summary
SELECT 
    product_id,
    SUM(amount) as total_amount,
    MAX(order_time) as last_order_time
FROM kafka_orders
GROUP BY product_id
ON DUPLICATE KEY UPDATE
    total_amount = VALUES(total_amount),
    last_order_time = VALUES(last_order_time);

關鍵配置說明

  1. kafka.source.auto.offset.reset: latest/earliest
  2. jdbc.sink.max-retries: 建議設置為3
  3. table.exec.state.ttl: 設置狀態保留時間

性能優化與常見問題

優化技巧

  1. 批量寫入

    'sink.buffer-flush.interval' = '1s',
    'sink.buffer-flush.max-rows' = '100'
    
  2. 異步提交

    'sink.parallelism' = '4'
    
  3. 狀態后端選擇

    state.backend: rocksdb
    state.checkpoints.dir: hdfs:///flink/checkpoints
    

常見錯誤排查

  1. 主鍵沖突

    org.apache.flink.table.api.TableException: Table sink requires primary key but no primary key is defined
    

    解決方案:確認目標表PRIMARY KEY定義

  2. 數據類型不匹配

    java.sql.SQLException: Incorrect datetime value
    

    解決方案:檢查TIMESTAMP精度設置


與 CDC 的集成實踐

Debezium + Flink Upsert 示例

CREATE TABLE cdc_source (
    id INT,
    name STRING,
    description STRING,
    update_ts TIMESTAMP(3),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'inventory.db.customers',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'debezium-json'
);

-- 將CDC變更同步到OLAP系統
INSERT INTO doris_output
SELECT * FROM cdc_source
ON DUPLICATE KEY UPDATE
    name = VALUES(name),
    description = VALUES(description);

變更日志處理策略

操作類型 處理方式
INSERT 直接插入
UPDATE 根據主鍵更新
DELETE 需配置sink.ignore-delete

版本兼容性說明

Flink 1.12 特性矩陣

功能 社區版 企業版增強
基本Upsert ? ?
自動重試 × ?
沖突解決策略 簡單覆蓋 條件更新

升級注意事項

  1. 從1.11升級:需要重建狀態
  2. 與Checkpoint兼容性:建議使用Savepoint遷移
  3. Connector API變化:部分參數需要調整

最佳實踐總結

設計原則

  1. 主鍵設計:選擇業務不可變字段
  2. 更新策略:明確業務沖突解決邏輯
  3. 監控指標:關注sink.num-records-outsink.num-records-out-errors

典型應用場景

  1. 實時聚合結果更新
  2. 維度表變更同步
  3. 跨系統數據一致性保證

擴展閱讀

”`

注:實際文檔應包含更多示例代碼、性能測試數據和架構圖。本文檔框架可根據需要擴展具體章節的詳細內容。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女