本篇文章為大家展示了如何正確使用FlinkStreamSQL,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
項目路徑:https://github.com/DTStack/flinkStreamSQL
官方文檔:https://github.com/DTStack/flinkStreamSQL/blob/1.11_release/docs/quickStart.md
首先,需要將項目從Github上導入到IDEA中(導入方法較多,這里介紹一種常用的)
從IDEA菜單欄里,Git 選項 -> Clone -> FlinkStreamSQL 項目地址,點擊Clone即可獲取FlinkStreamSQL 源碼一份! 
項目下載好后,默認分支是 1.11_release,對應的Flink版本是Flink 1.11.x(FlinkStreamSQL 的 release版本對應著 Flink 的 release 版本),需要其他版本的自行切換,推薦使用 1.10_release。

項目下載下來后,第一次編譯之前,先將整個項目maven reimport 一次

如果有缺少JAR包,在某度或某歌上搜索即可(項目本身并不依賴什么獨有的JAR包,畢竟是開源項目),或者在官方釘釘群的文件中搜索看看,會有意外發現。
上面操作沒有問題后,就可以開始編譯了。
編譯命令:
mvn clean package -DskipTests
打包結束后會生成對應的插件包文件夾,1.8 版本對應的是plugins,1.10 及 之后的版本對應的sqlplugins

如果有用不到的插件,可以在項目的root路徑下的pom中,注釋掉不需要使用的插件
【?。?!注意?。?!】【?。?!注意?。?!】【?。?!注意?。?!】
部分插件之間有依賴關系,所以在注釋的時候,請小心別把相關依賴的插件注釋掉
rdb模塊被所有關系型數據庫所依賴,包括impala 模塊(雖然它不是關系型數據庫,但是它使用了JDBC)
core模塊是所有模塊所依賴的,不能注釋??!
Launcher模塊是任務提交必備,不能注釋??!
Kafka-base模塊是kafka插件的基礎,如果使用了kafka插件(不管什么版本),不能注釋??!
1.10 及之后的版本,新增了dirtyData模塊,是用來提供臟數據指定存儲功能(比如將臟數據存儲到指定mysql數據庫中),不能注釋??!
【?。?!注意?。?!】【?。?!注意?。?!】【?。?!注意?。?!】
項目編譯完之后,就可以提交任務了。任務提交的方式有local、standalone、yarn-session、yarn-per-job模式,后續會支持application(需要等到1.12版本)
如果以下概念中,有不懂的,自行查資料了解(學會查資料,比問別人更有效率)
使用的idea版本是2020.3 公開版,有不一樣的地方自行修改
這里以yarn-per-job模式為例,其他模式類似,可以看文檔自行配置任務提交參數
1.配置idea-application

有個快捷的方法,找到LauncherMain,然后運行,在idea自動生成的application中修改,或者直接"Modify Run Configuration"

這里貼下自己一直使用的任務提交參數,需要的自行修改,每個參數具體什么意思,在官方參數文檔中也有詳細說明。
-name
Test
-mode
yarnPer
-sql
/dtstack/sql/test/JoinDemoFour.sql
-localSqlPluginPath
/IdeaProjects/StreamSQLOne/sqlplugins
-flinkconf
/dtstack/conf/flink
-yarnconf
/dtstack/conf/yarn
-flinkJarPath
/dtstack/flink-1.10.1/lib
-confProp
{\"metrics.latency.interval\":\"30000\",\"metrics.latency.granularity\":\"operator\",\"time.characteristic\":\"ProcessingTime\",\"disableChain\":\"true\"}
-pluginLoadMode
shipfile
-queue
b任務SQL怎么寫?這個根據自己的插件,去看對應的插件文檔,最基本的任務SQL框架是:
CREATE Source(源表) -> CREATE Side(維表,根據自己業務來確定是否需要) -> CREATE Sink(結果表) -> INSERT INTO Sink blablabla...(實際執行的業務SQL,這個必須要,不然任務執行個????)
這里也貼下日常使用的SQL,需要自行修改。
CREATE TABLE SourceOne ( id int, name varchar, age bigint, phone bigint, birth timestamp, todayTime time, todayDate date, money decimal, price double, wechat varchar, proName varchar ) WITH ( type = 'kafka11', bootstrapServers = 'kudu1:9092', zookeeperQuorum = 'kudu1:2181/kafka', offsetReset = 'latest', topic = 'tiezhu_in_one', enableKeyPartitions = 'false', topicIsPattern = 'false', parallelism = '1' ); CREATE TABLE DimOne ( id int, age bigint, name varchar, birth timestamp, PRIMARY KEY (id, age, name), period for system_time ) WITH ( type = 'mysql', url = 'jdbc:mysql://k3:3306/tiezhu?useSSL=false', userName = 'root', password = 'admin123', tableName = 'TestOne', parallelism = '1', cache = 'LRU', asyncCapacity = '100', asyncTimeout = '1000', errorLimit = '10', cacheTTLMs = '1000' ); CREATE VIEW ViewOne AS SELECT DO.age as age, SO.todayTime as todayTime, SO.todayDate as todayDate, SO.name as name, DO.id as id, DO.birth as birth, SO.proName as proName FROM SourceOne SO LEFT JOIN DimOne DO ON SO.id = DO.id; CREATE TABLE DimTwo ( id int, proName varchar, createDate date, createTime time, PRIMARY KEY (id), period for system_time ) WITH ( type = 'mysql', url = 'jdbc:mysql://k3:3306/tiezhu?useSSL=false', userName = 'root', password = 'admin123', tableName = 'TestDemoTwo', parallelism = '1', cache = 'LRU', asyncCapacity = '100', errorLimit = '10' ); CREATE View ViewTwo AS SELECT DimTwo.proName as proName, DimTwo.createDate as createDate, DimTwo.createTime as createTime, ViewOne.todayTime as todayTime, ViewOne.todayDate as todayDate, ViewOne.name as name, ViewOne.birth as birth, ViewOne.age as age, DimTwo.id as id FROM ViewOne LEFT JOIN DimTwo DimTwo ON ViewOne.id = DimTwo.id and '2020-10-28' = DimTwo.createDate and DimTwo.id >= 2; CREATE TABLE SinkOne ( id int, name varchar, age bigint, birth timestamp, todayTime time, todayDate date, createTime time, createDate date, proName varchar ) WITH ( type = 'kafka11', bootstrapServers = 'kudu1:9092', topic = 'tiezhu_out', parallelism = '1', updateMode = 'upsert' ); INSERT INTO SinkOne SELECT * FROM ViewTwo;
如果需要遠程調試,那么需要在flink-conf.yaml中增加Flink 的遠程調試配置,然后在idea中配置”JVM Remote“,在代碼塊中打斷點(這種方法還能調試Flink 本身的代碼)
env.java.opts.jobmanager: -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 env.java.opts.taskmanager: -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006
只需要修改標記的這兩個地方,如果是HA集群,需要根據日志修改(怎么看日志,怎么修改,自行查資料)

至此,任務遠程提交流程就這些。
如果嫌棄遠程調試,那么可以試試FlinkStreamSQL的本地調試,LocalTest模塊(這個模塊默認是注釋掉的,如果有需要,自行打開即可),使用方法很簡單,修改對應的參數,然后執行RUN 即可

但是【注意?。?!】LocalTest模塊的pom文件中有大部分常用的插件模塊,但是如果出現了類似"ClassNotFoundException",大概率是pom中沒有對應的插件模塊,同時需要注意,Kafka模塊因為有類沖突的存在,所以在LocalTest模塊中,Kafka模塊只能存在一種

上述內容就是如何正確使用FlinkStreamSQL,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。