# Flink SQL怎么實時計算當天PV寫入MySQL
## 前言
在大數據實時計算領域,Apache Flink已經成為事實上的標準之一。其中Flink SQL憑借其聲明式的編程方式和較低的入門門檻,讓開發者能夠快速實現實時數據處理需求。本文將詳細介紹如何使用Flink SQL實時計算當天的頁面訪問量(PV)并將結果寫入MySQL數據庫。
## 一、環境準備
### 1.1 所需組件
- Apache Flink 1.13+(支持SQL語法)
- MySQL 5.7+
- Kafka(作為數據源)
### 1.2 Maven依賴
```xml
<dependencies>
<!-- Flink相關依賴 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.13.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.13.6</version>
</dependency>
<!-- MySQL連接器 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.13.6</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency>
</dependencies>
假設我們有一個Kafka topic user_events
,其中包含用戶訪問日志:
{
"user_id": "user123",
"page_url": "/product/123",
"access_time": "2023-07-20 14:30:00",
"ip_address": "192.168.1.1"
}
CREATE TABLE user_events (
user_id STRING,
page_url STRING,
access_time TIMESTAMP(3),
ip_address STRING,
WATERMARK FOR access_time AS access_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user_events',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'pv-group',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
);
CREATE TABLE daily_pv (
calc_date DATE,
pv_count BIGINT,
update_time TIMESTAMP(3),
PRIMARY KEY (calc_date) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://mysql:3306/analytics',
'table-name' = 'daily_pv_stats',
'username' = 'flink_user',
'password' = 'flink_pwd',
'sink.buffer-flush.interval' = '1s',
'sink.buffer-flush.max-rows' = '100'
);
-- 計算當天的PV(按自然日)
INSERT INTO daily_pv
SELECT
CAST(access_time AS DATE) AS calc_date,
COUNT(*) AS pv_count,
CURRENT_TIMESTAMP AS update_time
FROM user_events
WHERE
-- 可選:過濾無效數據
user_id IS NOT NULL
AND page_url IS NOT NULL
-- 只處理當天的數據(假設是T+1計算可以去掉這個條件)
AND CAST(access_time AS DATE) = CURRENT_DATE
GROUP BY CAST(access_time AS DATE);
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class RealtimePVCalculator {
public static void main(String[] args) throws Exception {
// 1. 創建執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2. 設置檢查點(確保Exactly-Once語義)
env.enableCheckpointing(30000);
// 3. 執行SQL
tableEnv.executeSql("CREATE TABLE user_events (...)");
tableEnv.executeSql("CREATE TABLE daily_pv (...)");
tableEnv.executeSql("INSERT INTO daily_pv SELECT ...");
// 4. 啟動任務
env.execute("Realtime PV Calculation Job");
}
}
-- 修改源表定義,允許遲到數據
CREATE TABLE user_events (
...
WATERMARK FOR access_time AS access_time - INTERVAL '5' SECOND
) WITH (...);
-- 使用窗口函數處理遲到數據
INSERT INTO daily_pv
SELECT
window_date AS calc_date,
pv_count,
CURRENT_TIMESTAMP AS update_time
FROM (
SELECT
TUMBLE_START(access_time, INTERVAL '1' DAY) AS window_date,
COUNT(*) AS pv_count
FROM user_events
GROUP BY TUMBLE(access_time, INTERVAL '1' DAY)
);
MySQL結果表需要設置主鍵(已在前面定義),Flink JDBC連接器會自動處理:
PRIMARY KEY (calc_date) NOT ENFORCED
并行度設置:
env.setParallelism(4);
狀態后端配置:
env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints"));
MySQL批量寫入:
'sink.buffer-flush.max-rows' = '500',
'sink.buffer-flush.interval' = '2s'
通過Flink Web UI或Metric Reporter監控: - 輸入記錄數 - 輸出記錄數 - 延遲情況
Kafka消費位點管理:
'scan.startup.mode' = 'group-offsets'
MySQL寫入重試:
'sink.max-retries' = '3'
檢查點超時設置:
env.getCheckpointConfig().setCheckpointTimeout(60000);
數據準確性校驗:
資源隔離:
版本管理:
通過本文的介紹,我們實現了一個完整的Flink SQL實時PV計算方案。這種方案具有以下優勢:
實際生產中,還需要考慮業務高峰期處理、集群資源分配等問題。希望本文能為您的實時計算實踐提供有價值的參考。 “`
注:本文實際約2000字,包含了從環境準備到生產實踐的完整流程。如需調整字數或內容細節,可以進一步修改。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。