這篇文章主要講解了“Flink CDC怎么監聽MySQL表”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Flink CDC怎么監聽MySQL表”吧!
// 前景提要:開啟mysql binlog監控。(目錄:C:\ProgramData\MySQL\MySQL Server 5.6\my.ini)ProgramData 為隱藏目錄。注意:binlog_format=ROW // 創建Blink Streaming的TableEnvironmentEnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);// 創建表,connector使用mysql-cdcbsTableEnv.executeSql("CREATE TABLE mysql_binlog " +"(id STRING, " +"times STRING, " +"temp STRING) " +"WITH " +"('connector' = 'mysql-cdc', " +" 'hostname' = '127.0.0.1', " +" 'port' = '3306', " +" 'username' = 'root', " +" 'password' = '123456', " +" 'database-name' = 'test', " +" 'table-name' = 'sersor_temp'" +")");// 打印控制臺bsTableEnv.executeSql("CREATE TABLE sink_table " +"(id STRING, " +"times STRING, " +"temp DOUBLE) " +"WITH " +"('connector' = 'print'" +")");// 將CDC數據源和下游數據表對接起來bsTableEnv.executeSql("INSERT INTO sink_table SELECT id, times, temp FROM mysql_binlog");bsTableEnv.executeSql("CREATE TABLE sink_kafka_table " +"(id STRING, " +"times STRING, " +"temp DOUBLE " +") WITH (" +" 'connector' = 'kafka'," +" 'topic' = 'test_mysql_binlog'," +" 'scan.startup.mode' = 'earliest-offset'," +" 'properties.group.id' = 'testGroup'," +" 'properties.bootstrap.servers' = 'node2:9092', " +" 'format' = 'canal-json' " +")");// 將CDC數據與 kafka表對接起來bsTableEnv.executeSql("INSERT INTO sink_kafka_table SELECT id, times, temp FROM mysql_binlog");bsTableEnv.executeSql("CREATE TABLE hTable (" +" id STRING," +" f ROW<times STRING, temp STRING>," +" PRIMARY KEY (id) NOT ENFORCED" +") WITH (" +" 'connector' = 'hbase-2.2'," +" 'table-name' = 'regional:binlog'," +" 'zookeeper.quorum' = 'node2:2181'" +")");// 將CDC數據存儲到 Hbase中bsTableEnv.executeSql("INSERT INTO hTable SELECT id, ROW(times, temp) FROM mysql_binlog");
-- ----------------------------
-- Table structure for sersor_temp
-- ----------------------------
DROP TABLE IF EXISTS `sersor_temp`;
CREATE TABLE `sersor_temp` (
`id` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`temp` decimal(10, 2) NOT NULL,
`times` varchar(10) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Compact;
-- ----------------------------
-- Records of sersor_temp
-- ----------------------------
INSERT INTO `sersor_temp` VALUES ('sensor_1', 22.20, '1547718527');
INSERT INTO `sersor_temp` VALUES ('sensor_2', 25.20, '1547718214');
INSERT INTO `sersor_temp` VALUES ('sensor_3', 46.40, '1547718520');
INSERT INTO `sersor_temp` VALUES ('sensor_5', 32.62, '1547718325');
注意:此處 表中 temp 字段為 decimal 類型,在SQL中使用 DECIMAL 、DOUBLE 類型 存儲到hbase中都會出現亂碼問題,遂 都換成 STRING
感謝各位的閱讀,以上就是“Flink CDC怎么監聽MySQL表”的內容了,經過本文的學習后,相信大家對Flink CDC怎么監聽MySQL表這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。