# 如何用Flink Connectors讀寫txt文件
## 1. 引言
Apache Flink作為一款開源的流批一體數據處理框架,其核心優勢在于對實時數據流的高效處理能力。但在實際業務場景中,文本文件(txt格式)仍然是常見的數據載體之一。本文將深入探討如何通過Flink Connectors實現txt文件的讀寫操作,涵蓋從基礎API使用到高級配置的完整解決方案。
## 2. 環境準備
### 2.1 依賴配置
在Maven項目中需添加以下依賴:
```xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1); // 開發階段建議設為1便于調試
DataStream<String> textData = env.readTextFile("input/example.txt");
textData.print();
FileSource<String> source = FileSource.forRecordStreamFormat(
new TextLineInputFormat(),
new Path("input/example.txt")
).build();
DataStream<String> lines = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"file-source"
);
FileSource<String> monitoringSource = FileSource.forRecordStreamFormat(
new TextLineInputFormat(),
new Path("input/")
)
.monitorContinuously(Duration.ofSeconds(5)) // 每5秒檢查新文件
.build();
DataStream<String> outputData = ...; // 你的數據流
outputData.writeAsText("output/result.txt");
final FileSink<String> sink = FileSink.forRowFormat(
new Path("output"),
new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(15))
.withInactivityInterval(Duration.ofMinutes(5))
.withMaxPartSize(1024 * 1024 * 128)
.build())
.build();
outputData.sinkTo(sink);
參數 | 說明 | 示例值 |
---|---|---|
withRolloverInterval | 滾動時間間隔 | Duration.ofHours(1) |
withInactivityInterval | 不活躍間隔 | Duration.ofMinutes(10) |
withMaxPartSize | 最大分區大小 | 128MB |
public class CustomParser implements MapFunction<String, POJO> {
@Override
public POJO map(String value) {
String[] parts = value.split("\\|");
return new POJO(parts[0], Integer.parseInt(parts[1]));
}
}
對于JSON格式的文本行:
DataStream<JSONObject> jsonData = textData.map(line -> {
try {
return new JSONObject(line);
} catch (JSONException e) {
return new JSONObject();
}
});
// 讀取階段
FileSource<String> source = ...;
source.setParallelism(4);
// 寫入階段
FileSink<String> sink = ...;
sink.setParallelism(2);
OutputFileConfig config = OutputFileConfig.builder()
.withPartPrefix("batch")
.withPartSuffix(".tmp")
.build();
FileSink<String> sink = FileSink.forRowFormat(...)
.withOutputFileConfig(config)
.build();
FileSource<String> source = FileSource.forRecordStreamFormat(
new TextLineInputFormat() {
@Override
public void open(FileInputSplit split) {
// 驗證文件頭
if (!validHeader(split.getPath())) {
throw new RuntimeException("Invalid file format");
}
}
},
path
).build();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 最大重試次數
Time.of(10, TimeUnit.SECONDS) // 重試間隔
));
DataStream<LogEntry> logs = env.readTextFile("logs/access.log")
.filter(line -> !line.startsWith("#")) // 跳過注釋行
.map(new LogParser()); // 自定義日志解析
logs.keyBy(entry -> entry.getStatusCode())
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new ErrorCounter())
.sinkTo(FileSink.forRowFormat(...));
DataStream<String> rawData = env.fromSource(
fileSource,
WatermarkStrategy.noWatermarks(),
"file-source"
).process(new DataCleanser());
rawData.sinkTo(FileSink.forRowFormat(...));
hdfs://
vs file://
)通過本文的詳細講解,我們掌握了: - 使用FileSource/FileSink API進行高效文件操作 - 各種場景下的配置優化技巧 - 生產環境中需要注意的關鍵事項
Flink的文件連接器提供了高度靈活的接口,開發者可以根據具體業務需求組合不同的策略,構建健壯的數據處理管道。
附錄:完整示例代碼
public class TxtFileProcessing {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 讀取配置
FileSource<String> source = FileSource.forRecordStreamFormat(
new TextLineInputFormat(),
new Path("input/data.txt"))
.build();
// 處理邏輯
DataStream<String> processed = env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source")
.map(line -> line.toUpperCase());
// 寫入配置
FileSink<String> sink = FileSink.forRowFormat(
new Path("output"),
new SimpleStringEncoder<>("UTF-8"))
.withRollingPolicy(DefaultRollingPolicy.builder().build())
.build();
processed.sinkTo(sink);
env.execute("Txt File Processing");
}
}
”`
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。