# 如何構建Apache Flink應用
## 1. 引言
### 1.1 Apache Flink概述
Apache Flink是一個開源的分布式流處理框架,由Apache軟件基金會開發并維護。它最初誕生于柏林工業大學的研究項目,后于2014年加入Apache孵化器,2015年正式成為頂級項目。Flink的核心設計理念是"流處理優先"(Streaming-first),將批處理視為流處理的特例,實現了真正的批流一體化處理。
### 1.2 Flink的核心特性
- **低延遲高吞吐**:毫秒級延遲下仍能保持高吞吐量
- **精確一次(Exactly-once)**狀態一致性保證
- **事件時間處理**與水位線(Watermark)機制
- **狀態管理**:支持超大狀態的高效存儲和訪問
- **容錯機制**:基于Chandy-Lamport算法的分布式快照
### 1.3 典型應用場景
- 實時ETL與數據管道
- 實時監控與異常檢測
- 事件驅動型應用
- 復雜事件處理(CEP)
- 機器學習模型實時推理
## 2. 環境準備與項目搭建
### 2.1 系統要求
- **Java**:JDK 8或11(推薦OpenJDK)
- **Maven**:3.0+(項目管理工具)
- **IDE**:IntelliJ IDEA或Eclipse(推薦前者)
- **集群環境**(可選):Standalone/YARN/Kubernetes
### 2.2 創建Maven項目
```xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.16.0</version>
<scope>provided</scope>
</dependency>
// 設置本地執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironmentWithWebUI(new Configuration());
// 設置并行度
env.setParallelism(4);
graph TD
A[DataSource] --> B[Transformation]
B --> C[Sink]
DataStream<String> text = env.socketTextStream("localhost", 9999);
DataStream<Tuple2<String, Integer>> counts = text
.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {
for (String word : value.split("\\s")) {
out.collect(new Tuple2<>(word, 1));
}
})
.keyBy(0)
.sum(1);
counts.print();
類型 | 實現類 | 描述 |
---|---|---|
文件 | readTextFile |
讀取文本文件 |
Socket | socketTextStream |
網絡套接字 |
集合 | fromCollection |
內存集合數據 |
Kafka | FlinkKafkaConsumer |
Kafka消息隊列 |
public class CustomSource implements SourceFunction<String> {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<String> ctx) {
while(isRunning) {
ctx.collect("event-" + System.currentTimeMillis());
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
// JDBC Sink示例
counts.addSink(JdbcSink.sink(
"INSERT INTO word_counts (word, count) VALUES (?, ?)",
(ps, t) -> {
ps.setString(1, t.f0);
ps.setInt(2, t.f1);
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/test")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("user")
.withPassword("pass")
.build()
));
// Map轉換
DataStream<Integer> lengths = text.map(String::length);
// Filter過濾
DataStream<String> filtered = text.filter(s -> s.startsWith("A"));
// FlatMap展開
DataStream<String> words = text.flatMap((String value, Collector<String> out) -> {
for (String word : value.split(" ")) {
out.collect(word);
}
});
DataStream<Tuple2<String, Double>> sales = ...;
// KeyBy分組
KeyedStream<Tuple2<String, Double>, String> keyedSales = sales.keyBy(0);
// Reduce聚合
DataStream<Tuple2<String, Double>> totalSales = keyedSales
.reduce((x, y) -> new Tuple2<>(x.f0, x.f1 + y.f1));
// Union合并
DataStream<String> stream1 = ...;
DataStream<String> stream2 = ...;
DataStream<String> unioned = stream1.union(stream2);
// Connect連接
ConnectedStreams<String, Integer> connected = stream1.connect(stream2);
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private transient ValueState<Tuple2<Long, Long>> sum;
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>("average",
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
sum = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) {
Tuple2<Long, Long> currentSum = sum.value();
currentSum.f0 += 1;
currentSum.f1 += input.f1;
sum.update(currentSum);
if (currentSum.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}
}
// 啟用檢查點(每10秒)
env.enableCheckpointing(10000);
// 高級配置
CheckpointConfig config = env.getCheckpointConfig();
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
config.setMinPauseBetweenCheckpoints(500);
config.setCheckpointTimeout(60000);
config.setMaxConcurrentCheckpoints(1);
config.enableExternalizedCheckpoints(
ExternalizedCheckpointCleanup.RETN_ON_CANCELLATION);
DataStream<Event> events = ...;
// 周期性水位線
DataStream<Event> withTimestamps = events
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp()));
// 自定義水位線
public class CustomWatermarkGenerator implements WatermarkGenerator<Event> {
private final long maxOutOfOrderness = 5000; // 5秒
private long currentMaxTimestamp;
@Override
public void onEvent(Event event, long eventTimestamp,
WatermarkOutput output) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness));
}
}
// 滾動窗口(Tumbling)
dataStream.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum(1);
// 滑動窗口(Sliding)
dataStream.keyBy(0)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.sum(1);
// 會話窗口(Session)
dataStream.keyBy(0)
.window(EventTimeSessionWindows.withGap(Time.minutes(5)))
.sum(1);
AsyncDataStream.unorderedWait(
inputStream,
new AsyncDatabaseRequest(),
1000, // 超時時間
TimeUnit.MILLISECONDS,
100 // 最大并發請求數
);
class AsyncDatabaseRequest extends RichAsyncFunction<String, String> {
private transient Connection connection;
@Override
public void open(Configuration parameters) {
connection = DriverManager.getConnection(DB_URL);
}
@Override
public void asyncInvoke(String key, ResultFuture<String> resultFuture) {
// 異步查詢
CompletableFuture.supplyAsync(() -> {
try (PreparedStatement stmt = connection.prepareStatement(SQL)) {
stmt.setString(1, key);
ResultSet rs = stmt.executeQuery();
return rs.getString(1);
}
}).thenAccept(resultFuture::complete);
}
}
final OutputTag<String> rejectedTag = new OutputTag<String>("rejected"){};
SingleOutputStreamOperator<String> mainStream = processStream
.process(new ProcessFunction<String, String>() {
@Override
public void processElement(
String value,
Context ctx,
Collector<String> out) {
if (value.startsWith("A")) {
out.collect(value);
} else {
ctx.output(rejectedTag, value);
}
}
});
DataStream<String> rejected = mainStream.getSideOutput(rejectedTag);
# flink-conf.yaml配置示例
taskmanager.numberOfTaskSlots: 4
parallelism.default: 8
taskmanager.memory.process.size: 4096m
jobmanager.memory.process.size: 2048m
org.apache.flink.api.common.typeutils.TypeSerializer
// RocksDB狀態后端(適合大狀態)
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:8020/flink/checkpoints", true));
// FsStateBackend(適合中小狀態)
env.setStateBackend(new FsStateBackend("hdfs://namenode:8020/flink/checkpoints"));
DataStream<Transaction> transactions = ...;
// 規則1:大額交易告警
transactions
.filter(t -> t.getAmount() > 10000)
.addSink(new AlertSink());
// 規則2:高頻交易檢測
transactions
.keyBy(Transaction::getAccountId)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.process(new FraudDetector());
// 用戶行為事件流
DataStream<UserBehavior> behaviors = ...;
// 物品點擊統計
behaviors
.filter(b -> b.getType().equals("click"))
.keyBy(UserBehavior::getItemId)
.window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
.aggregate(new CountAgg(), new WindowResultFunction());
// 實時關聯規則
behaviors
.keyBy(UserBehavior::getSessionId)
.window(TumblingEventTimeWindows.of(Time.minutes(30)))
.process(new SessionAnalysis());
<!-- Kafka連接器 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- JDBC連接器 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
”`
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。