# FLINK 1.12 upsertSql 使用詳解
## 目錄
1. [Upsert 概念解析](#upsert-概念解析)
2. [Flink SQL 中的 Upsert 模式](#flink-sql-中的-upsert-模式)
3. [upsertSql 語法詳解](#upsertsql-語法詳解)
4. [實戰:Kafka 到 MySQL 的 Upsert 示例](#實戰kafka-到-mysql-的-upsert-示例)
5. [性能優化與常見問題](#性能優化與常見問題)
6. [與 CDC 的集成實踐](#與-cdc-的集成實踐)
7. [版本兼容性說明](#版本兼容性說明)
8. [最佳實踐總結](#最佳實踐總結)
---
## Upsert 概念解析
### 什么是 Upsert
Upsert(Update + Insert)是一種混合操作,當記錄存在時更新,不存在時插入。在分布式系統中,這是保證數據一致性的關鍵操作。
```sql
-- 傳統SQL示例
INSERT INTO table (id, value)
VALUES (1, 'a')
ON DUPLICATE KEY UPDATE value = 'a';
-- 啟用upsert模式
CREATE TABLE upsert_table (
id INT PRIMARY KEY,
name STRING,
cnt BIGINT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/test',
'table-name' = 'target_table',
'username' = 'user',
'password' = 'pass',
'sink.upsert-materialize' = 'NONE' -- 可選:FORCE/NONE
);
Connector | 是否支持 | 特殊配置 |
---|---|---|
JDBC | ? | 需定義PRIMARY KEY |
Kafka | ? | 需配置key.format |
HBase | ? | 需配置rowkey |
Elasticsearch | ? | 需配置document-id.key-delimiter |
INSERT INTO [catalog_name.][db_name.]table_name
{ VALUES (value1 [, value2]*) | SELECT query }
[ON DUPLICATE KEY UPDATE col1 = val1 [, col2 = val2]*]
場景1:主鍵沖突更新
INSERT INTO user_actions
SELECT user_id, action_time, action_type
FROM kafka_source
ON DUPLICATE KEY UPDATE
action_time = VALUES(action_time),
action_type = VALUES(action_type);
場景2:條件更新
INSERT INTO order_states
SELECT order_id, status, update_time
FROM source_stream
ON DUPLICATE KEY UPDATE
status = IF(VALUES(update_time) > update_time,
VALUES(status),
status);
-- 1. 創建Kafka源表
CREATE TABLE kafka_orders (
order_id STRING,
product_id STRING,
amount DECIMAL(10,2),
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
);
-- 2. 創建MySQL目標表
CREATE TABLE mysql_order_summary (
product_id STRING PRIMARY KEY,
total_amount DECIMAL(10,2),
last_order_time TIMESTAMP(3)
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://mysql:3306/db',
'table-name' = 'order_summary',
'username' = 'user',
'password' = 'pass'
);
-- 3. 執行Upsert操作
INSERT INTO mysql_order_summary
SELECT
product_id,
SUM(amount) as total_amount,
MAX(order_time) as last_order_time
FROM kafka_orders
GROUP BY product_id
ON DUPLICATE KEY UPDATE
total_amount = VALUES(total_amount),
last_order_time = VALUES(last_order_time);
批量寫入:
'sink.buffer-flush.interval' = '1s',
'sink.buffer-flush.max-rows' = '100'
異步提交:
'sink.parallelism' = '4'
狀態后端選擇:
state.backend: rocksdb
state.checkpoints.dir: hdfs:///flink/checkpoints
主鍵沖突:
org.apache.flink.table.api.TableException: Table sink requires primary key but no primary key is defined
解決方案:確認目標表PRIMARY KEY定義
數據類型不匹配:
java.sql.SQLException: Incorrect datetime value
解決方案:檢查TIMESTAMP精度設置
CREATE TABLE cdc_source (
id INT,
name STRING,
description STRING,
update_ts TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'inventory.db.customers',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'debezium-json'
);
-- 將CDC變更同步到OLAP系統
INSERT INTO doris_output
SELECT * FROM cdc_source
ON DUPLICATE KEY UPDATE
name = VALUES(name),
description = VALUES(description);
操作類型 | 處理方式 |
---|---|
INSERT | 直接插入 |
UPDATE | 根據主鍵更新 |
DELETE | 需配置sink.ignore-delete |
功能 | 社區版 | 企業版增強 |
---|---|---|
基本Upsert | ? | ? |
自動重試 | × | ? |
沖突解決策略 | 簡單覆蓋 | 條件更新 |
Savepoint
遷移sink.num-records-out
和sink.num-records-out-errors
”`
注:實際文檔應包含更多示例代碼、性能測試數據和架構圖。本文檔框架可根據需要擴展具體章節的詳細內容。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。