# Spark Streaming運行流程是怎樣的
## 一、Spark Streaming概述
Spark Streaming是Apache Spark核心API的擴展,用于構建可擴展、高吞吐量、容錯的實時數據流處理系統。它能夠將來自Kafka、Flume、Kinesis等數據源的實時數據流進行高效處理,并以微批次(Micro-Batch)的方式進行處理,最終將結果輸出到文件系統、數據庫或實時儀表盤。
### 1.1 核心特點
- **微批處理架構**:將實時數據流切分為小批次(通常0.5~2秒),轉換為Spark的RDD進行處理
- **Exactly-Once語義**:通過檢查點(Checkpoint)和預寫日志(WAL)保證數據一致性
- **與Spark生態無縫集成**:可直接復用Spark的機器學習(MLlib)、圖計算(GraphX)等能力
- **多語言支持**:提供Scala、Java、Python API
### 1.2 基本概念
| 術語 | 說明 |
|---------------|----------------------------------------------------------------------|
| DStream | 離散化流(Discretized Stream),Spark Streaming的基礎抽象 |
| Batch Interval| 批次時間間隔(如1秒),決定微批次的劃分粒度 |
| Receiver | 數據接收器,負責從外部源獲取數據并存儲到Spark內存中 |
## 二、系統架構與核心組件
### 2.1 整體架構
```mermaid
graph TD
A[數據源] --> B[Spark Streaming]
B --> C{核心組件}
C --> D[Receiver]
C --> E[DStream]
C --> F[JobGenerator]
B --> G[輸出操作]
StreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(1))
JobGenerator
spark.streaming.gracefulStopTimeout:優雅停止超時時間ReceiverTracker
Receiver
BlockGenerator
class BlockGenerator:
def __init__(self):
self.currentBuffer = []
self.blockInterval = 200ms # 默認值
創建StreamingContext
JavaStreamingContext jssc = new JavaStreamingContext(
new SparkConf().setAppName("NetworkWordCount"),
Durations.seconds(1)
);
定義輸入源(以Socket為例):
val lines = ssc.socketTextStream("localhost", 9999)
構建DStream轉換操作:
words = lines.flatMap(lambda line: line.split(" "))
wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda a,b: a+b)
Receiver啟動過程:
數據分塊存儲:
容錯機制:
ssc.checkpoint("hdfs://checkpoint_dir")
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
時間窗口劃分:
val windowedWordCounts = wordCounts.window(Seconds(30), Seconds(10))
作業生成時序:
timeline
title 批次處理時序
section Batch Interval
批次N-1 : 2023-01-01 12:00:00
批次N : 2023-01-01 12:00:01
批次N+1 : 2023-01-01 12:00:02
任務執行階段:
常見輸出方式對比:
| 輸出方式 | 特點 | 示例代碼 |
|---|---|---|
| print() | 調試使用,打印前10條記錄 | wordCounts.print() |
| saveAsTextFiles() | 保存到HDFS | dstream.saveAsTextFiles("hdfs://output") |
| foreachRDD | 靈活自定義輸出邏輯 | 見下方代碼示例 |
foreachRDD典型用法:
wordCounts.foreachRDD(rdd -> {
rdd.foreachPartition(partition -> {
// 創建數據庫連接
Connection conn = createConnection();
while (partition.hasNext()) {
Tuple2<String, Integer> record = partition.next();
// 寫入數據庫
insertRecord(conn, record);
}
conn.close();
});
});
Executor配置:
并行度調整:
sc.setLogLevel("WARN") // 減少日志量
ssc.sparkContext.setCheckpointDir("/tmp")
| 參數名 | 推薦值 | 說明 |
|---|---|---|
| spark.streaming.blockInterval | 200ms | 數據塊生成間隔 |
| spark.streaming.receiver.maxRate | 10000 | 單個Receiver最大接收速率(條/秒) |
| spark.streaming.backpressure.enabled | true | 啟用反壓機制 |
數據積壓:
spark.dynamicAllocation.enabled=true
Receiver故障:
val kafkaStreams = (1 to 3).map(_ => KafkaUtils.createStream(...))
val unifiedStream = ssc.union(kafkaStreams)
| 特性 | Spark Streaming | Structured Streaming |
|---|---|---|
| 處理模型 | 微批處理 | 微批/持續處理 |
| API層級 | RDD級 | DataFrame/DataSet級 |
| 事件時間處理 | 需手動實現 | 原生支持 |
graph LR
A[Spark Streaming] --> B[兼容模式]
B --> C[完全切換]
Spark Streaming通過將流數據離散化為一系列小批次RDD,實現了: 1. 高吞吐量的實時處理能力 2. 與批處理統一編程模型 3. 強大的容錯機制
典型應用場景包括: - 實時監控告警系統 - 用戶行為實時分析 - IoT設備數據處理
隨著Spark 3.0的發布,雖然Structured Streaming成為主流,但理解Spark Streaming的運行機制仍是掌握Spark流處理體系的重要基礎。 “`
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。