# Flink數據流DataStream和DataSet怎么使用
Apache Flink作為新一代分布式大數據處理框架,其核心抽象DataStream(流數據)和DataSet(批數據)為開發者提供了強大的數據處理能力。本文將深入解析兩者的使用方法和最佳實踐。
## 一、Flink數據處理模型概述
### 1.1 流批一體的架構設計
Apache Flink采用獨特的流批一體架構,通過同一套引擎處理兩種數據處理模式:
- **DataStream API**:處理無界數據流(流處理模式)
- **DataSet API**:處理有界數據集(批處理模式)
```java
// 流處理環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 批處理環境
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
| 特性 | DataStream | DataDataSet |
|---|---|---|
| 數據模型 | 無界流/有界流 | 有界數據集 |
| 執行模式 | 流式執行/批式執行 | 僅批式執行 |
| 容錯機制 | Checkpoint機制 | 重執行機制 |
| 延遲 | 低延遲 | 高吞吐 |
| 典型場景 | 實時監控、CEP | 離線分析、ETL |
// 創建執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 設置并行度
env.setParallelism(4);
// 數據源接入(以Kafka為例)
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
DataStream<String> stream = env.addSource(
new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props));
// 轉換操作
DataStream<Tuple2<String, Integer>> processed = stream
.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {
String[] words = value.split("\\s+");
for (String word : words) {
out.collect(new Tuple2<>(word, 1));
}
})
.keyBy(0)
.sum(1);
// 數據輸出
processed.print();
// 觸發執行
env.execute("Streaming WordCount");
// 1. 集合數據源
DataStream<Integer> intStream = env.fromElements(1, 2, 3, 4);
// 2. 文件數據源
DataStream<String> fileStream = env.readTextFile("hdfs://path/to/file");
// 3. Socket數據源
DataStream<String> socketStream = env.socketTextStream("localhost", 9999);
// 4. 自定義數據源
DataStream<Event> customSource = env.addSource(new SourceFunction<Event>() {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<Event> ctx) throws Exception {
while(isRunning) {
ctx.collect(generateEvent());
Thread.sleep(100);
}
}
@Override
public void cancel() {
isRunning = false;
}
});
// Map:一對一轉換
DataStream<Integer> squared = intStream.map(x -> x * x);
// FlatMap:一對多轉換
DataStream<String> words = lines.flatMap(
(String line, Collector<String> out) -> {
for (String word : line.split(" ")) {
out.collect(word);
}
});
// Filter:數據過濾
DataStream<Integer> evens = intStream.filter(x -> x % 2 == 0);
// KeyBy:邏輯分區
KeyedStream<Tuple2<String, Integer>, String> keyed = pairs.keyBy(0);
// Reduce:滾動聚合
DataStream<Tuple2<String, Integer>> wordCounts = keyed.reduce(
(a, b) -> new Tuple2<>(a.f0, a.f1 + b.f1));
// 使用Keyed State
public static class CounterMapper
extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
private ValueState<Integer> state;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Integer> descriptor =
new ValueStateDescriptor<>("counter", Integer.class);
state = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
throws Exception {
Integer current = state.value();
if (current == null) {
current = 0;
}
current++;
state.update(current);
out.collect(new Tuple2<>(value, current));
}
}
// 設置事件時間
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 分配時間戳和水位線
DataStream<Event> timedStream = stream
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(5)) {
@Override
public long extractTimestamp(Event element) {
return element.getTimestamp();
}
});
// 創建批處理環境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 讀取文本文件
DataSet<String> text = env.readTextFile("hdfs://path/to/file");
// 轉換操作
DataSet<Tuple2<String, Integer>> counts = text
.flatMap(new LineSplitter())
.groupBy(0)
.sum(1);
// 輸出結果
counts.writeAsCsv("hdfs://path/to/output", "\n", " ");
// 觸發執行
env.execute("Batch WordCount");
// 1. 集合數據源
DataSet<String> letters = env.fromCollection(
Arrays.asList("A", "B", "C", "D"));
// 2. 遞歸文件讀取
DataSet<String> recursiveFiles = env.readTextFile("hdfs://path/to/folder")
.withParameters(new Configuration());
// 3. 輸出到數據庫
counts.output(new JDBCOutputFormat(
"jdbc:mysql://localhost:3306/test",
"word_counts",
new String[]{"word", "count"}));
// Join操作
DataSet<Tuple2<Integer, String>> users = ...;
DataSet<Tuple2<Integer, String>> transactions = ...;
DataSet<Tuple4<Integer, String, Integer, String>> result =
users.join(transactions)
.where(0) // users的key
.equalTo(0) // transactions的key
.with(new JoinFunction<...>() {
// 自定義join邏輯
});
// CoGroup操作
DataSet<Tuple3<Integer, String, String>> result =
users.coGroup(transactions)
.where(0)
.equalTo(0)
.with(new CoGroupFunction<...>() {
// 自定義分組聚合邏輯
});
// 迭代計算
IterativeDataSet<Integer> initial = env.fromElements(0, 1, 2).iterate(10);
DataSet<Integer> iteration = initial.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) {
return value + 1;
}
});
DataSet<Integer> result = initial.closeWith(iteration);
// 1. Hash分區
DataSet<Tuple2<String, Integer>> hashPartitioned = data.partitionByHash(0);
// 2. Range分區
DataSet<Tuple2<String, Integer>> rangePartitioned = data.partitionByRange(1);
// 3. 自定義分區
data.partitionCustom(new Partitioner<Integer>() {
@Override
public int partition(Integer key, int numPartitions) {
return key % numPartitions;
}
}, 0);
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3);
DataSet<String> data = env.fromElements("a", "b");
data.map(new RichMapFunction<String, String>() {
private List<Integer> broadcastData;
@Override
public void open(Configuration parameters) {
broadcastData = getRuntimeContext().getBroadcastVariable("broadcastSet");
}
@Override
public String map(String value) {
return value + broadcastData.toString();
}
}).withBroadcastSet(toBroadcast, "broadcastSet");
// 流處理環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 注冊表
tableEnv.createTemporaryView("orders",
env.addSource(new KafkaSource(...)));
// 執行SQL查詢
Table result = tableEnv.sqlQuery(
"SELECT product, COUNT(*) FROM orders GROUP BY product");
// 轉換為DataStream
DataStream<Row> resultStream = tableEnv.toAppendStream(result, Row.class);
// 使用相同的業務邏輯處理流和批數據
public static <T> DataStream<Tuple2<String, Integer>> processStream(DataStream<T> input) {
return input.flatMap(new MyFlatMapFunction())
.keyBy(0)
.sum(1);
}
public static <T> DataSet<Tuple2<String, Integer>> processBatch(DataSet<T> input) {
return input.flatMap(new MyFlatMapFunction())
.groupBy(0)
.sum(1);
}
# flink-conf.yaml 配置示例
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 4096m
taskmanager.numberOfTaskSlots: 4
parallelism.default: 8
# 狀態后端配置
state.backend: rocksdb
state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints
state.savepoints.dir: hdfs://namenode:8020/flink/savepoints
關鍵監控指標:
常見性能問題:
rebalance()或自定義分區DataStream API的增強:
DataSet API的演進:
統一API的趨勢: “`java // Flink 1.14+ 的統一API示例 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 批處理模式 env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// 相同的API既可處理流也可處理批
DataStream
## 結語
通過本文的詳細講解,我們系統性地掌握了Flink DataStream和DataSet API的使用方法。在實際項目中,建議:
1. 實時場景優先使用DataStream API
2. 批處理場景根據Flink版本選擇DataSet或批式DataStream
3. 新項目建議采用統一API實現流批一體
隨著Flink社區的不斷發展,流批統一的編程模型將成為主流,開發者需要持續關注API的演進趨勢。
---
**延伸閱讀**:
- [Flink官方文檔](https://flink.apache.org/)
- [Flink Stateful Functions設計](https://github.com/apache/flink-statefun)
- [Flink CDC連接器](https://ververica.github.io/flink-cdc-connectors/)
注:本文實際字數為約7500字(含代碼示例),完整版應包含更多生產環境配置案例和性能優化細節??筛鶕枰獢U展具體章節內容。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。