這篇文章將為大家詳細講解有關Flink中Watermarks怎么用,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
Watermarks水?。簽檩斎氲臄祿鞯脑O置一個時間事件(時間戳),對窗口內的數據輸入流無序與延遲提供解決方案
示例環境
java.version: 1.8.xflink.version: 1.11.1
TimestampsAndWatermarks.java
import com.flink.examples.DataSource;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
/**
* @Description Watermarks水?。簽檩斎氲臄祿鞯脑O置一個時間事件(時間戳),對窗口內的數據輸入流無序與延遲提供解決方案
*/
public class TimestampsAndWatermarks {
/**
* 官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html
*/
/**
* 遍歷集合,分別打印不同性別的信息,對于執行超時,自動觸發定時器
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/*
TimeCharacteristic有三種時間類型:
ProcessingTime:以operator處理的時間為準,它使用的是機器的系統時間來作為data stream的時間;
IngestionTime:以數據進入flink streaming data flow的時間為準;
EventTime:以數據自帶的時間戳字段為準,應用程序需要指定如何從record中抽取時間戳字段;需要實現assignTimestampsAndWatermarks方法,并設置時間水位線;
*/
//使用event time,需要指定事件的時間戳
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
//設置自動生成水印的時間周期,避免數據流量大的情況下,頻繁添加水印導致計算性能降低。
env.getConfig().setAutoWatermarkInterval(1000L);
List<Tuple3<String, String, Integer>> tuple3List = DataSource.getTuple3ToList();
DataStream<Tuple3<String, String, Integer>> inStream = env.addSource(new MyRichSourceFunction());
DataStream<Tuple2<String, Integer>> dataStream = inStream
//為一個水位線,這個Watermarks在不斷的變化,一旦Watermarks大于了某個window的end_time,就會觸發此window的計算,Watermarks就是用來觸發window計算的。
//Duration.ofSeconds(2),到數據流到達flink后,再水位線中設置延遲時間,也就是在所有數據流的最大的事件時間比window窗口結束時間大或相等時,再延遲多久觸發window窗口結束;
// .assignTimestampsAndWatermarks(
// WatermarkStrategy.<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(2))
// .withTimestampAssigner((element, timestamp) -> {
// long times = System.currentTimeMillis() ;
// System.out.println(element.f1 + ","+ element.f0 + "的水位線為:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss"));
// return times;
// })
// )
.assignTimestampsAndWatermarks(new MyWatermarkStrategy()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Integer>>() {
@Override
public long extractTimestamp(Tuple3<String, String, Integer> element, long timestamp) {
long times = System.currentTimeMillis();
System.out.println(element.f1 + "," + element.f0 + "的水位線為:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss"));
return times;
}
}))
//分區窗口
.keyBy((KeySelector<Tuple3<String, String, Integer>, String>) k -> k.f1)
//觸發3s滾動窗口
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
//執行窗口數據,對keyBy數據流批量處理
.apply(new WindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>(){
@Override
public void apply(String s, TimeWindow window, Iterable<Tuple3<String, String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
long times = System.currentTimeMillis() ;
System.out.println();
System.out.println("窗口處理時間:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss"));
Iterator<Tuple3<String, String, Integer>> iterator = input.iterator();
int total = 0;
int size = 0;
String sex = "";
while (iterator.hasNext()){
Tuple3<String, String, Integer> tuple3 = iterator.next();
total += tuple3.f2;
size ++;
sex = tuple3.f1;
}
out.collect(new Tuple2<>(sex, total / size));
}
});
dataStream.print();
env.execute("flink Filter job");
}
/**
* 定期水印生成器
*/
public static class MyWatermarkStrategy implements WatermarkStrategy<Tuple3<String, String, Integer>>{
@Override
public WatermarkGenerator<Tuple3<String, String, Integer>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<Tuple3<String, String, Integer>>() {
//設置固定的延遲量3.5 seconds
private final long maxOutOfOrderness = 3500;
private long currentMaxTimestamp;
/**
* 事件處理
* @param event 數據流對象
* @param eventTimestamp 事件水位線時間
* @param output 輸出
*/
@Override
public void onEvent(Tuple3<String, String, Integer> event, long eventTimestamp, WatermarkOutput output) {
currentMaxTimestamp = Math.max(System.currentTimeMillis(), eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 拿上一個水印時間 - 延遲量 = 等于給的窗口最終數據最后時間(如果在窗口到期內,未發生新的水印事件,則按window正常結束時間計算,當在最后水印時間-延遲量的時間范圍內,有新的數據流進入,則會重新觸發窗口內對全部數據流計算)
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
}
};
}
}
/**
* 模擬數據持續輸出
*/
public static class MyRichSourceFunction extends RichSourceFunction<Tuple3<String, String, Integer>> {
@Override
public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
List<Tuple3<String, String, Integer>> tuple3List = DataSource.getTuple3ToList();
int j = 0;
for (int i=0;i<100;i++){
if (i%6 == 0){
j=0;
}
ctx.collect(tuple3List.get(j));
//1秒鐘輸出一個
Thread.sleep(1 * 1000);
j ++;
}
}
@Override
public void cancel() {
try{
super.close();
}catch (Exception e){
e.printStackTrace();
}
}
}
}打印結果
man,張三的水位線為:2020-12-27 10:28:20 girl,李四的水位線為:2020-12-27 10:28:21 man,王五的水位線為:2020-12-27 10:28:22 girl,劉六的水位線為:2020-12-27 10:28:23 girl,伍七的水位線為:2020-12-27 10:28:24 窗口處理時間:2020-12-27 10:28:25 (man,20) man,吳八的水位線為:2020-12-27 10:28:25 man,張三的水位線為:2020-12-27 10:28:26 girl,李四的水位線為:2020-12-27 10:28:27 窗口處理時間:2020-12-27 10:28:28 (girl,28) 窗口處理時間:2020-12-27 10:28:28 (man,29)
關于“Flink中Watermarks怎么用”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。