設置允許延遲的時間是通過allowedLateness(lateness: Time)設置
保存延遲數據則是通過sideOutputLateData(outputTag: OutputTag[T])保存
獲取延遲數據是通過DataStream.getSideOutput(tag: OutputTag[X])獲取
下面先分別講解這幾個方法,再給出具體的實例加深理解
1、allowedLateness(lateness: Time)
def allowedLateness(lateness: Time): WindowedStream[T, K, W] = {
? ? javaStream.allowedLateness(lateness)
? ? this
}
該方法傳入一個Time值,設置允許數據遲到的時間,這個時間和waterMark中的時間概念不同。再來回顧一下,
waterMark=數據的事件時間-允許亂序時間值
隨著新數據的到來,waterMark的值會更新為最新數據事件時間-允許亂序時間值,但是如果這時候來了一條歷史數據,waterMark值則不會更新??偟膩碚f,waterMark是為了能接收到盡可能多的亂序數據。
那這里的Time值呢?主要是為了等待遲到的數據,在一定時間范圍內,如果屬于該窗口的數據到來,仍會進行計算,后面會對計算方式仔細說明
注意:該方法只針對于基于event-time的窗口,如果是基于processing-time,并且指定了非零的time值則會拋出異常
2、sideOutputLateData(outputTag: OutputTag[T])
def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W] = {
? ? javaStream.sideOutputLateData(outputTag)
? ? this
}
該方法是將遲來的數據保存至給定的outputTag參數,而OutputTag則是用來標記延遲數據的一個對象。
3、DataStream.getSideOutput(tag: OutputTag[X])
通過window等操作返回的DataStream調用該方法,傳入標記延遲數據的對象來獲取延遲的數據
4、對延遲數據的理解
延遲數據是指:
在當前窗口【假設窗口范圍為10-15】已經計算之后,又來了一個屬于該窗口的數據【假設事件時間為13】,這時候仍會觸發window操作,這種數據就稱為延遲數據。
那么問題來了,延遲時間怎么計算呢?
假設窗口范圍為10-15,延遲時間為2s,則只要waterMark<15+2,并且屬于該窗口,就能觸發window操作。而如果來了一條數據使得waterMark>=15+2,10-15這個窗口就不能再觸發window操作,即使新來的數據的event-time<15+2+3
5、代碼實例講解
大概講解一下代碼的流程:
1、監聽某主機的9000端口,讀取socket數據(格式為? name:timestamp)
2、給當前進入flink程序的數據加上waterMark,值為eventTime-3s
3、根據name值進行分組,根據窗口大小為5s劃分窗口,設置允許遲到時間為2s,依次統計窗口中各name值的數據
4、輸出統計結果以及遲到數據
5、啟動Job
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
?
import scala.collection.mutable.ArrayBuffer
?
/**
? * 延遲測試
? * 詳細講解博客地址:https://blog.csdn.net/hlp4207/article/details/90717905
? */
object WaterMarkFunc02 {
? // 線程安全的時間格式化對象
? val sdf: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss:SSS")
?
? def main(args: Array[String]): Unit = {
? ? val hostName = "s102"
? ? val port = 9000
? ? val delimiter = '\n'
? ? val env = StreamExecutionEnvironment.getExecutionEnvironment
? ? // 將EventTime設置為流數據時間類型
? ? env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
? ? env.setParallelism(1)
? ? val streams: DataStream[String] = env.socketTextStream(hostName, port, delimiter)
? ? import org.apache.flink.api.scala._
? ? val data = streams.map(data => {
? ? ? // 輸入數據格式:name:時間戳
? ? ? // flink:1559223685000
? ? ? try {
? ? ? ? val items = data.split(":")
? ? ? ? (items(0), items(1).toLong)
? ? ? } catch {
? ? ? ? case _: Exception => println("輸入數據不符合格式:" + data)
? ? ? ? ? ("0", 0L)
? ? ? }
? ? }).filter(data => !data._1.equals("0") && data._2 != 0L)
?
? ? //為數據流中的元素分配時間戳,并定期創建水印以監控事件時間進度
? ? val waterStream: DataStream[(String, Long)] = data.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long)] {
? ? ? // 事件時間
? ? ? var currentMaxTimestamp = 0L
? ? ? val maxOutOfOrderness = 3000L
? ? ? var lastEmittedWatermark: Long = Long.MinValue
?
? ? ? // Returns the current watermark
? ? ? override def getCurrentWatermark: Watermark = {
? ? ? ? // 允許延遲三秒
? ? ? ? val potentialWM = currentMaxTimestamp - maxOutOfOrderness
? ? ? ? // 保證水印能依次遞增
? ? ? ? if (potentialWM >= lastEmittedWatermark) {
? ? ? ? ? lastEmittedWatermark = potentialWM
? ? ? ? }
? ? ? ? new Watermark(lastEmittedWatermark)
? ? ? }
?
? ? ? // Assigns a timestamp to an element, in milliseconds since the Epoch
? ? ? override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long = {
? ? ? ? // 將元素的時間字段值作為該數據的timestamp
? ? ? ? val time = element._2
? ? ? ? if (time > currentMaxTimestamp) {
? ? ? ? ? currentMaxTimestamp = time
? ? ? ? }
? ? ? ? val outData = String.format("key: %s? ? EventTime: %s? ? waterMark:? %s",
? ? ? ? ? element._1,
? ? ? ? ? sdf.format(time),
? ? ? ? ? sdf.format(getCurrentWatermark.getTimestamp))
? ? ? ? println(outData)
? ? ? ? time
? ? ? }
? ? })
? ? val lateData = new OutputTag[(String,Long)]("late")
? ? val result: DataStream[String] = waterStream.keyBy(0)// 根據name值進行分組
? ? ? .window(TumblingEventTimeWindows.of(Time.seconds(5L)))// 5s跨度的基于事件時間的翻滾窗口
? ? /**
? ? ? * 對于此窗口而言,允許2秒的遲到數據,即第一次觸發是在watermark > end-of-window時
? ? ? * 第二次(或多次)觸發的條件是watermark < end-of-window + allowedLateness時間內,這個窗口有late數據到達
? ? ? */
? ? ? .allowedLateness(Time.seconds(2L))
? ? ? .sideOutputLateData(lateData)
? ? ? .apply(new WindowFunction[(String, Long), String, Tuple, TimeWindow] {
? ? ? ? override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = {
? ? ? ? ? val timeArr = ArrayBuffer[String]()
? ? ? ? ? val iterator = input.iterator
? ? ? ? ? while (iterator.hasNext) {
? ? ? ? ? ? val tup2 = iterator.next()
? ? ? ? ? ? timeArr.append(sdf.format(tup2._2))
? ? ? ? ? }
? ? ? ? ? val outData = String.format("key: %s? ? data: %s? ? startTime:? %s? ? endTime:? %s",
? ? ? ? ? ? key.toString,
? ? ? ? ? ? timeArr.mkString("-"),
? ? ? ? ? ? sdf.format(window.getStart),
? ? ? ? ? ? sdf.format(window.getEnd))
? ? ? ? ? out.collect(outData)
? ? ? ? }
? ? ? })
? ? result.print("window計算結果:")
?
? ? val late = result.getSideOutput(lateData)
? ? late.print("遲到的數據:")
?
? ? env.execute(this.getClass.getName)
? }
}
接下來開始輸入數據進行測試驗證:
可以看到window范圍為【15-20】,這時候我們再輸入幾條屬于該范圍的數據:
輸入了事件時間為17、16、15三條數據,都觸發了window操作,那我們試著輸入一下窗口范圍為【10-15】的數據:
窗口范圍為【10-15】的數據則屬于遲到的數據,已經超過了最大等待時間,我們可以來試著計算一下允許上個窗口遲到數據的waterMark值
窗口結束時間+延遲時間=最大waterMark值
15 + 2 = 17
當前的waterMark值為20,大于17,所以窗口范圍為10-15的數據已經是遲到的數據了
再來計算一下窗口時間范圍為15-20的臨界值:
20 + 2 = 22
即當waterMark上漲到22,15-20窗口范圍內的數據就屬于遲到數據,不能再參與計算了
記住我們算出的臨界值22,繼續輸入數據測試:
輸入數據A時,waterMark上漲至21,此時輸入屬于15-20窗口范圍內的數據B,依然能觸發窗口操作;
輸入數據C,waterMark上漲至22,等于剛才我們算出來的臨界值,此時輸入,數據B,則已不能觸發窗口操作,屬于遲到的數據。
最后,總結一下flink對于延遲數據的處理:
如果延遲的數據有業務需要,則設置好允許延遲的時間,每個窗口都有屬于自己的最大等待延遲數據的時間限制:
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。