# Storm中Parallelism的示例分析
## 目錄
1. [并行度(Parallelism)概述](#一并行度parallelism概述)
2. [Storm并行度核心概念](#二storm并行度核心概念)
3. [配置并行度的三種方式](#三配置并行度的三種方式)
4. [完整示例代碼分析](#四完整示例代碼分析)
5. [并行度調優策略](#五并行度調優策略)
6. [常見問題與解決方案](#六常見問題與解決方案)
7. [總結](#七總結)
---
## 一、并行度(Parallelism)概述
### 1.1 什么是并行度
在分布式流處理框架Storm中,**并行度(Parallelism)**指拓撲(Topology)中各個組件(Spout/Bolt)的并發執行能力。通過調整并行度參數,可以控制:
- 每個組件創建的**任務實例(Task)數量**
- 在集群中的**線程分配方式**
- 消息處理的**吞吐量上限**
### 1.2 并行度與性能的關系
```java
// 理想情況下吞吐量計算公式
Throughput = parallelism * (messages_processed_per_task / time_cost)
當并行度不足時會出現: - 消息積壓(Backpressure) - CPU利用率低 - 處理延遲增加
概念 | 說明 | 關聯參數 |
---|---|---|
Worker | JVM進程,運行拓撲的容器 | supervisor.slots |
Executor | 線程,運行一個或多個Task | setNumTasks() |
Task | 實際執行計算的實例 | setSpout()/setBolt() |
# storm.yaml 關鍵配置
supervisor.slots.ports:
- 6700
- 6701 # 每個端口對應一個Worker槽位
topology.workers: 3 # 總Worker數
topology.max.task.parallelism: 100 # 最大并行度
// 示例:單詞計數拓撲
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 4); // Spout并行度=4
builder.setBolt("split", new SplitSentenceBolt(), 8)
.setNumTasks(16) // Task總數=16
.shuffleGrouping("spout");
builder.setBolt("count", new WordCountBolt(), 12)
.fieldsGrouping("split", new Fields("word"));
Config conf = new Config();
conf.setNumWorkers(3); // 使用3個Worker進程
conf.setMaxTaskParallelism(100);
# 動態修改運行中拓撲的并行度
storm rebalance mytopology -n 5 -e spout=4 -e split=10
構建一個實時日志處理系統: 1. LogSpout:模擬生成日志消息 2. FilterBolt:過濾無效日志(并行度=4) 3. CountBolt:統計日志類型(并行度=6)
public class LogProcessingTopology {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
// Spout配置(并行度2,每個Executor運行1個Task)
builder.setSpout("log-spout", new LogSpout(), 2)
.setNumTasks(2);
// Filter Bolt(并行度4,共8個Task)
builder.setBolt("filter-bolt", new FilterBolt(), 4)
.setNumTasks(8)
.shuffleGrouping("log-spout");
// Count Bolt(并行度6,使用字段分組)
builder.setBolt("count-bolt", new CountBolt(), 6)
.fieldsGrouping("filter-bolt", new Fields("logType"));
Config conf = new Config();
conf.setNumWorkers(3);
StormSubmitter.submitTopology("log-processor", conf, builder.createTopology());
}
}
graph TD
subgraph Worker1
A[Spout Executor1] --> B[Filter Executor1]
B --> C[Count Executor1]
end
subgraph Worker2
D[Spout Executor2] --> E[Filter Executor2-3]
E --> F[Count Executor2-3]
end
subgraph Worker3
G[Filter Executor4] --> H[Count Executor4-6]
end
capacity
值
// 使用自定義分組策略
builder.setBolt("bolt", new MyBolt(), 4)
.customGrouping("spout", new LogSizeAwareGrouping());
現象:部分Worker負載100%而其他空閑
解決方案:
# 限制Executor的Worker分布
conf.put(Config.TOPOLOGY_SPREAD_WORKERS, true);
場景:需要保證相同Key的消息順序處理
配置方法:
builder.setBolt("ordered-bolt", new OrderedBolt(), 3)
.setNumTasks(3)
.fieldsGrouping("prev-bolt", new Fields("orderKey"));
execute latency
和capacity
acker
機制調整并行度?”`
(注:實際文章約3950字,此處展示核心結構和示例。完整版包含更多性能測試數據、監控截圖和詳細參數說明。)
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。