溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Storm筆記整理(四):Storm核心概念與驗證——并行度與流式分組

發布時間:2020-06-11 10:31:15 來源:網絡 閱讀:13667 作者:xpleaf 欄目:大數據

[TOC]


Storm核心概念之并行度

Storm筆記整理(四):Storm核心概念與驗證——并行度與流式分組

Work

1個worker進程執行的是1個topology的子集(注:不會出現1個worker為多個topology服務)。1個worker進程會啟動1個或多個executor線程來執行1個topology的(spout或bolt)。因此,1個運行中的topology就是由集群中多臺(可能是一臺)物理機上的一個或者多個worker進程組成的。

Executor

executor是worker進程啟動的一個單獨線程。

每個executor只會運行1個topology的1個或者多個(spout或bolt)task(注:task可以是1個或多個,storm默認是1個(spout或bolt)只生成1個task,executor線程會在每次循環里順序調用所有task實例)。

Task

task是最終運行spout或bolt中代碼的執行單元(注:1個task即為spout或bolt的1個實例,executor線程在執行期間會調用該task的nextTuple或execute方法)。topology啟動后,1個(spout或bolt)的task數目是固定不變的,但該(spout或bolt)使用的executor線程數可以動態調整(例如:1個executor線程可以執行該(spout或bolt)的1個或多個task實例)。這意味著,對于1個(spout或bolt)存在這樣的條件:#threads<=#tasks(即:線程數小于等于task數目)。默認情況下task的數目等于executor線程數目,即1個executor線程只運行1個task。

默認情況下,一個supervisor節點最多可以啟動4個worker進程,每一個topology默認占用一個worker進程,每個spout或者bolt會占用1個executor,每個executor啟動1個task。

并行度調整之work進程個數

前面提交作業到集群時,worker、executor和task的數量情況如下:

之前是1個worker進程 3個executor線程 3個task任務
     3個executor,分別為:
         id_num_spout
         id_sum_bolt
         __acker

現在在代碼中將其worker個數設置為2,如下:

package cn.xpleaf.bigdata.storm.parallelism;

import cn.xpleaf.bigdata.storm.utils.StormUtil;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Date;
import java.util.Map;

/**
 * 1°、實現數字累加求和的案例:數據源不斷產生遞增數字,對產生的數字累加求和。
 * <p>
 * Storm組件:Spout、Bolt、數據是Tuple,使用main中的Topology將spout和bolt進行關聯
 * MapReduce的組件:Mapper和Reducer、數據是Writable,通過一個main中的job將二者關聯
 * <p>
 * 適配器模式(Adapter):BaseRichSpout,其對繼承接口中一些沒必要的方法進行了重寫,但其重寫的代碼沒有實現任何功能。
 * 我們稱這為適配器模式
 */
public class ParallelismWorkerSumTopology {

    /**
     * 數據源
     */
    static class OrderSpout extends BaseRichSpout {

        private Map conf;   // 當前組件配置信息
        private TopologyContext context;    // 當前組件上下文對象
        private SpoutOutputCollector collector; // 發送tuple的組件

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        /**
         * 接收數據的核心方法
         */
        @Override
        public void nextTuple() {
            long num = 0;
            while (true) {
                num++;
                StormUtil.sleep(1000);
                System.out.println("當前時間" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "產生的訂單金額:" + num);
                this.collector.emit(new Values(num));
            }
        }

        /**
         * 是對發送出去的數據的描述schema
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("order_cost"));
        }
    }

    /**
     * 計算和的Bolt節點
     */
    static class SumBolt extends BaseRichBolt {

        private Map conf;   // 當前組件配置信息
        private TopologyContext context;    // 當前組件上下文對象
        private OutputCollector collector; // 發送tuple的組件

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        private Long sumOrderCost = 0L;

        /**
         * 處理數據的核心方法
         */
        @Override
        public void execute(Tuple input) {
            Long orderCost = input.getLongByField("order_cost");
            sumOrderCost += orderCost;

            System.out.println("商城網站到目前" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "的商品總交易額" + sumOrderCost);
            StormUtil.sleep(1000);
        }

        /**
         * 如果當前bolt為最后一個處理單元,該方法可以不用管
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }

    /**
     * 構建拓撲,相當于在MapReduce中構建Job
     */
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        /**
         * 設置spout和bolt的dag(有向無環圖)
         */
        builder.setSpout("id_order_spout", new OrderSpout());
        builder.setBolt("id_sum_bolt", new SumBolt())
                .shuffleGrouping("id_order_spout"); // 通過不同的數據流轉方式,來指定數據的上游組件
        // 使用builder構建topology
        StormTopology topology = builder.createTopology();
        String topologyName = ParallelismWorkerSumTopology.class.getSimpleName();  // 拓撲的名稱
        Config config = new Config();   // Config()對象繼承自HashMap,但本身封裝了一些基本的配置
        /**
         * 之前是1個worker進程 3個executor線程 3個task任務
         *      3個executor,分別為:
         *          id_num_spout
         *          id_sum_bolt
         *          __acker
         *
         * 將worker進程修改為2個后:
         *      executor線程數:4個
         *      task任務數:4個
         *      分析:
         *          最簡單的原因就是,我們的應用程序太小了,完全沒有必要開啟多個executor線程。
         *          也就是說不會簡單的進行worker的副本拷貝,這里多出來的一個executor線程是每一個worker進程都有的
         *          一個默認的系統級別的bolt,就是__acker
         */
        config.setNumWorkers(2);    // 設置當前topology啟動需要幾個worker進程
        // config.setNumAckers(0);     // 設置__acker數量為0個,這樣就不會有其executor線程
        // 啟動topology,本地啟動使用LocalCluster,集群啟動使用StormSubmitter
        if (args == null || args.length < 1) {  // 沒有參數時使用本地模式,有參數時使用集群模式
            LocalCluster localCluster = new LocalCluster(); // 本地開發模式,創建的對象為LocalCluster
            localCluster.submitTopology(topologyName, config, topology);
        } else {
            StormSubmitter.submitTopology(topologyName, config, topology);
        }
    }
}

打包后上傳到集群中并提交,在storm ui中查看其狀態,如下:

Storm筆記整理(四):Storm核心概念與驗證——并行度與流式分組

Storm筆記整理(四):Storm核心概念與驗證——并行度與流式分組

可以看到調整后,三者的數量情況為:

將worker進程修改為2個后:
     executor線程數:4個
     task任務數:4個
     分析:
         最簡單的原因就是,我們的應用程序太小了,完全沒有必要開啟多個executor線程。
         也就是說不會簡單的進行worker的副本拷貝,這里多出來的一個executor線程是每一個worker進程都有的
         一個默認的系統級別的bolt,就是__acker

如果不希望系統級別的__acker運行,可以在代碼中打開注釋:

config.setNumAckers(0);

即將其個數設置為0個,然后再上傳到集群中運行即可。

并行度調整之executor線程個數

需要在設置spoutbolt時指定:

builder.setSpout("id_order_spout", new OrderSpout(), 2);
builder.setBolt("id_sum_bolt", new SumBolt(), 3)
    .shuffleGrouping("id_order_spout"); // 通過不同的數據流轉方式,來指定數據的上游組件

完整程序代碼如下:

package cn.xpleaf.bigdata.storm.parallelism;

import cn.xpleaf.bigdata.storm.utils.StormUtil;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Date;
import java.util.Map;

/**
 * 1°、實現數字累加求和的案例:數據源不斷產生遞增數字,對產生的數字累加求和。
 * <p>
 * Storm組件:Spout、Bolt、數據是Tuple,使用main中的Topology將spout和bolt進行關聯
 * MapReduce的組件:Mapper和Reducer、數據是Writable,通過一個main中的job將二者關聯
 * <p>
 * 適配器模式(Adapter):BaseRichSpout,其對繼承接口中一些沒必要的方法進行了重寫,但其重寫的代碼沒有實現任何功能。
 * 我們稱這為適配器模式
 */
public class ParallelismExecutorSumTopology {

    /**
     * 數據源
     */
    static class OrderSpout extends BaseRichSpout {

        private Map conf;   // 當前組件配置信息
        private TopologyContext context;    // 當前組件上下文對象
        private SpoutOutputCollector collector; // 發送tuple的組件

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        /**
         * 接收數據的核心方法
         */
        @Override
        public void nextTuple() {
            long num = 0;
            while (true) {
                num++;
                StormUtil.sleep(1000);
                System.out.println("當前時間" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "產生的訂單金額:" + num);
                this.collector.emit(new Values(num));
            }
        }

        /**
         * 是對發送出去的數據的描述schema
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("order_cost"));
        }
    }

    /**
     * 計算和的Bolt節點
     */
    static class SumBolt extends BaseRichBolt {

        private Map conf;   // 當前組件配置信息
        private TopologyContext context;    // 當前組件上下文對象
        private OutputCollector collector; // 發送tuple的組件

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        private Long sumOrderCost = 0L;

        /**
         * 處理數據的核心方法
         */
        @Override
        public void execute(Tuple input) {
            Long orderCost = input.getLongByField("order_cost");
            sumOrderCost += orderCost;

            System.out.println("商城網站到目前" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "的商品總交易額" + sumOrderCost);
            StormUtil.sleep(1000);
        }

        /**
         * 如果當前bolt為最后一個處理單元,該方法可以不用管
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }

    /**
     * 構建拓撲,相當于在MapReduce中構建Job
     */
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        /**
         * 設置spout和bolt的dag(有向無環圖)
         */
        builder.setSpout("id_order_spout", new OrderSpout(), 2);
        builder.setBolt("id_sum_bolt", new SumBolt(), 3)
                .shuffleGrouping("id_order_spout"); // 通過不同的數據流轉方式,來指定數據的上游組件
        // 使用builder構建topology
        StormTopology topology = builder.createTopology();
        String topologyName = ParallelismExecutorSumTopology.class.getSimpleName();  // 拓撲的名稱
        Config config = new Config();   // Config()對象繼承自HashMap,但本身封裝了一些基本的配置
        /**
         * 之前是1個worker進程 3個executor線程 3個task任務
         *      3個executor,分別為:
         *          id_num_spout
         *          id_sum_bolt
         *          __acker
         *
         * 現在修改為:builder.setSpout("id_order_spout", new OrderSpout(), 2);
         *             builder.setBolt("id_sum_bolt", new SumBolt(), 3)
         *             所以應該有6個executor,分別為:
         *                 id_num_spout 2個
         *                 id_sum_bolt  3個
         *                 __acker      1個
         *                 同時task也為6個
         *
         */
        // 啟動topology,本地啟動使用LocalCluster,集群啟動使用StormSubmitter
        if (args == null || args.length < 1) {  // 沒有參數時使用本地模式,有參數時使用集群模式
            LocalCluster localCluster = new LocalCluster(); // 本地開發模式,創建的對象為LocalCluster
            localCluster.submitTopology(topologyName, config, topology);
        } else {
            StormSubmitter.submitTopology(topologyName, config, topology);
        }
    }
}

上傳到集群中提交作業后,情況如下:

所以應該有6個executor,分別為:
    id_num_spout 2個
    id_sum_bolt  3個
    __acker      1個
    同時task也為6個

另外,如果這時查看輸出的log,會發現spout的輸出,很多情況下都是一下子輸出兩條信息,因為此時有兩個線程在運行,而bolt的情況也是類似的。

......
2018-04-13 10:22:30.406 STDIO [INFO] 當前時間20180413102230產生的訂單金額:422
2018-04-13 10:22:30.517 STDIO [INFO] 當前時間20180413102230產生的訂單金額:422
2018-04-13 10:22:30.519 STDIO [INFO] 商城網站到目前20180413102230的商品總交易額59013
2018-04-13 10:22:30.520 STDIO [INFO] 商城網站到目前20180413102230的商品總交易額55716
2018-04-13 10:22:31.407 STDIO [INFO] 當前時間20180413102231產生的訂單金額:423
2018-04-13 10:22:31.411 STDIO [INFO] 商城網站到目前20180413102231的商品總交易額62936
2018-04-13 10:22:31.518 STDIO [INFO] 當前時間20180413102231產生的訂單金額:423
2018-04-13 10:22:31.520 STDIO [INFO] 商城網站到目前20180413102231的商品總交易額59434
2018-04-13 10:22:32.408 STDIO [INFO] 當前時間20180413102232產生的訂單金額:424
2018-04-13 10:22:32.411 STDIO [INFO] 商城網站到目前20180413102232的商品總交易額63360
2018-04-13 10:22:32.519 STDIO [INFO] 當前時間20180413102232產生的訂單金額:424
2018-04-13 10:22:32.521 STDIO [INFO] 商城網站到目前20180413102232的商品總交易額59855
2018-04-13 10:22:32.523 STDIO [INFO] 商城網站到目前20180413102232的商品總交易額56140
2018-04-13 10:22:33.409 STDIO [INFO] 當前時間20180413102233產生的訂單金額:425
2018-04-13 10:22:33.520 STDIO [INFO] 當前時間20180413102233產生的訂單金額:425
2018-04-13 10:22:33.521 STDIO [INFO] 商城網站到目前20180413102233的商品總交易額60277
2018-04-13 10:22:33.523 STDIO [INFO] 商城網站到目前20180413102233的商品總交易額63785
2018-04-13 10:22:34.410 STDIO [INFO] 當前時間20180413102234產生的訂單金額:426
2018-04-13 10:22:34.521 STDIO [INFO] 當前時間20180413102234產生的訂單金額:426
2018-04-13 10:22:34.535 STDIO [INFO] 商城網站到目前20180413102234的商品總交易額60700
2018-04-13 10:22:34.535 STDIO [INFO] 商城網站到目前20180413102234的商品總交易額64211
2018-04-13 10:22:35.411 STDIO [INFO] 當前時間20180413102235產生的訂單金額:427
2018-04-13 10:22:35.522 STDIO [INFO] 當前時間20180413102235產生的訂單金額:427
......

并行度調整之task任務個數

需要在設置spoutbolt時指定:

builder.setSpout("id_order_spout", new OrderSpout()).setNumTasks(2);
builder.setBolt("id_sum_bolt", new SumBolt()).setNumTasks(3)
    .shuffleGrouping("id_order_spout"); // 通過不同的數據流轉方式,來指定數據的上游組件

完整程序代碼如下:

package cn.xpleaf.bigdata.storm.parallelism;

import cn.xpleaf.bigdata.storm.utils.StormUtil;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Date;
import java.util.Map;

/**
 * 1°、實現數字累加求和的案例:數據源不斷產生遞增數字,對產生的數字累加求和。
 * <p>
 * Storm組件:Spout、Bolt、數據是Tuple,使用main中的Topology將spout和bolt進行關聯
 * MapReduce的組件:Mapper和Reducer、數據是Writable,通過一個main中的job將二者關聯
 * <p>
 * 適配器模式(Adapter):BaseRichSpout,其對繼承接口中一些沒必要的方法進行了重寫,但其重寫的代碼沒有實現任何功能。
 * 我們稱這為適配器模式
 */
public class ParallelismTaskSumTopology {

    /**
     * 數據源
     */
    static class OrderSpout extends BaseRichSpout {

        private Map conf;   // 當前組件配置信息
        private TopologyContext context;    // 當前組件上下文對象
        private SpoutOutputCollector collector; // 發送tuple的組件

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        /**
         * 接收數據的核心方法
         */
        @Override
        public void nextTuple() {
            long num = 0;
            while (true) {
                num++;
                StormUtil.sleep(1000);
                System.out.println("當前時間" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "產生的訂單金額:" + num);
                this.collector.emit(new Values(num));
            }
        }

        /**
         * 是對發送出去的數據的描述schema
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("order_cost"));
        }
    }

    /**
     * 計算和的Bolt節點
     */
    static class SumBolt extends BaseRichBolt {

        private Map conf;   // 當前組件配置信息
        private TopologyContext context;    // 當前組件上下文對象
        private OutputCollector collector; // 發送tuple的組件

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        private Long sumOrderCost = 0L;

        /**
         * 處理數據的核心方法
         */
        @Override
        public void execute(Tuple input) {
            Long orderCost = input.getLongByField("order_cost");
            sumOrderCost += orderCost;

            System.out.println("商城網站到目前" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "的商品總交易額" + sumOrderCost);
            StormUtil.sleep(1000);
        }

        /**
         * 如果當前bolt為最后一個處理單元,該方法可以不用管
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }

    /**
     * 構建拓撲,相當于在MapReduce中構建Job
     */
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        /**
         * 設置spout和bolt的dag(有向無環圖)
         */
        builder.setSpout("id_order_spout", new OrderSpout()).setNumTasks(2);
        builder.setBolt("id_sum_bolt", new SumBolt()).setNumTasks(3)
                .shuffleGrouping("id_order_spout"); // 通過不同的數據流轉方式,來指定數據的上游組件
        // 使用builder構建topology
        StormTopology topology = builder.createTopology();
        String topologyName = ParallelismTaskSumTopology.class.getSimpleName();  // 拓撲的名稱
        Config config = new Config();   // Config()對象繼承自HashMap,但本身封裝了一些基本的配置
        /**
         * 之前是1個worker進程 3個executor線程 3個task任務
         *      3個executor,分別為:
         *          id_num_spout
         *          id_sum_bolt
         *          __acker
         *
         * 現在修改為:builder.setSpout("id_order_spout", new OrderSpout()).setNumTasks(2);
         *             builder.setBolt("id_sum_bolt", new SumBolt()).setNumTasks(3)
         *             所以應該有3個executor,分別為:
         *                 id_num_spout 1個
         *                 id_sum_bolt  1個
         *                 __acker      1個
         *                 同時task為6個:
         *                      id_num_spout 2個
         *                      id_sum_bolt  3個
         *                      __acker      1個
         *
         */
        // 啟動topology,本地啟動使用LocalCluster,集群啟動使用StormSubmitter
        if (args == null || args.length < 1) {  // 沒有參數時使用本地模式,有參數時使用集群模式
            LocalCluster localCluster = new LocalCluster(); // 本地開發模式,創建的對象為LocalCluster
            localCluster.submitTopology(topologyName, config, topology);
        } else {
            StormSubmitter.submitTopology(topologyName, config, topology);
        }
    }
}

運行后,其情況如下:

所以應該有3個executor,分別為:
    id_num_spout 1個
    id_sum_bolt  1個
    __acker      1個
    同時task為6個:
         id_num_spout 2個
         id_sum_bolt  3個
         __acker      1個

并行度調整總結

  • Worker(slot)

    • 默認一個從節點上面可以啟動4個worker進程,參數是supervisor.slots.ports。在storm配置文件中已經配置過了,默認是在strom-core.jar包中的defaults.yaml中配置的有。
    • 默認一個topology只使用一個worker進程,可以通過代碼來設置使用多個worker進程。
    • 通過config.setNumWorkers(workers)設置
    • 通過conf.setNumAckers(0);可以取消acker任務(點擊topology頁面最下面的show system stats,可以顯示系統級別的bolt,可以驗證acker線程的存在)
    • 最好一臺機器上的一個topology只使用一個worker,主要原因是減少了worker之間的數據傳輸
    • 如果worker使用完的話再提交topology就不會執行,會處于等待狀態

注意:worker之間通信是通過Netty?進行通信的

  • Executor
    • 默認情況下一個executor運行一個task,可以通過在代碼中設置
    • builder.setSpout(id, spout, parallelism_hint);
    • builder.setBolt(id, bolt, parallelism_hint);
  • Task
    • 通過boltDeclarer.setNumTasks(num);來設置實例的個數
    • executor的數量會小于等于task的數量(為了rebalance)

并行度動態調整

  • 通過UI調整
不推薦使用
  • 通過代碼調整
topologyBuilder.setBolt("green-bolt", new GreenBolt(),2)
    .setNumTasks(4).shuffleGrouping("blue-spout);
  • 通過shell調整
# 10秒之后開始調整
# Reconfigure the topology "mytopology" to use 5 worker processes,
# the spout "blue-spout" to use 3 executors and
# the bolt "yellow-bolt" to use 10 executors.
storm rebalance topologyName -w 10 -n 5 -e spout_id=3 -e id_bolt=10

注意:acker數目運行時是不會變化的,所以多指定幾個worker進程,acker線程數也不會增加。 -w:表示超時時間,rebalance首先會在一個超時時間內注銷掉拓撲,然后在整個集群中重新分配 worker。

問題:

-e spout_id=3 -e id_bolt=10 有時不會增加并發度

原因:

You can only increase the parallelism (number of executors) to the number of tasks. So if your 
component is having for example (number of executors: 50, number of tasks: 50) then you can not 
increase the parallelism, however you can decrease it.
就是說spout和bolt的并行數,最多可以調整到它的taskNum,默認情況下,taskNum是和你設置的paralismNum相同的。
#threads<=#tasks

并行度設置參考

那么到底并行度設置多少合適呢,理論參考值:

  • 單spout每秒大概可以發送500個tuple
  • 單bolt每秒大概可以接收2000個tuple
  • 單acker每秒大概可以接收6000個tuple
  • 根據上面的指標可以根據當前業務的數據量對并行度進行動態調整。

Storm筆記整理(四):Storm核心概念與驗證——并行度與流式分組

上面的案例對于分析storm的并行度會有非常大的幫助,同時也非常清晰地說明了worker、executor、task三者之間的關系。

Storm核心概念之流式分組(storm grouping)

假設storm集群現在有三個節點。一個作為nimbus,兩個作為supervisor。到這里先介紹一下storm邏輯上有兩個component,一個是Spout,另一個是Bolt。stream由Spout發出,在不同的Bolt之間進行處理,在其中傳遞的是storm的基本處理單位:Tuple。由Spout發出一個一個Tuple,然后Bolt接收Tuple進行各種各樣的處理。這一整個過程構成一個DAG(有向無環圖)。在storm里面叫做Topology。

Storm筆記整理(四):Storm核心概念與驗證——并行度與流式分組

上圖中spout的處理邏輯是將一句話發出給下一個Bolt,然后下一個Bolt做句子的單詞分割,下一個做計數,最后的Bolt做匯總顯示。這里可以有多個Bolt或者Spout進行并行處理。

那么這里有一個問題,數據是如何從spout到bolt中的呢,如果bolt是多個情況呢?這就是我們所說的流分組,也就是在Spout與Bolt、Bolt與Bolt之間傳遞Tuple的方式,我們稱之為流分組storm grouping。

Storm Group分類

  • Shuffle Grouping

    隨機分組, 隨機派發stream里面的tuple, 保證bolt中的每個任務接收到的tuple數目相同.(它能實現較好的負載均衡)

  • Fields Grouping

    按字段分組, 比如按userid來分組, 具有同樣userid的tuple會被分到同一任務, 而不同的userid則會被分配到不同的任務

  • All Grouping

    廣播發送,對于每一個tuple,Bolts中的所有任務都會收到.

  • Global Grouping

    全局分組,這個tuple被分配到storm中的一個bolt的其中一個task.再具體一點就是分配給id值最低的那個task.

  • Non Grouping

    隨機分派,意思是說stream不關心到底誰會收到它的tuple.目前他和Shuffle grouping是一樣的效果,

  • Direct Grouping

    直接分組,這是一種比較特別的分組方法,用這種分組意味著消息的發送者具體由消息接收者的哪個task處理這個消息.只有被聲明為Direct Stream的消息流可以聲明這種分組方法.而且這種消息tuple必須使用emitDirect方法來發射.消息處理者可以通過TopologyContext來或者處理它的消息的taskid (OutputCollector.emit方法也會返回taskid)

  • localOrShuffleGrouping

    是指如果目標Bolt 中的一個或者多個Task 和當前產生數據的Task 在同一個Worker 進程里面,那么就走內部的線程間通信,將Tuple 直接發給在當前Worker 進程的目的Task。否則,同shuffleGrouping。(在工作中使用的頻率還是比較高的)

  • CustomStreamGrouping

    自定義流式分組。

Storm流式分組之Shuffle Grouping

將計算總和的例子,spout并行度設置為1,bolt并行度設置為3,group方式設置為Shuffle Grouping,程序代碼如下:

package cn.xpleaf.bigdata.storm.group;

import cn.xpleaf.bigdata.storm.utils.StormUtil;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Date;
import java.util.Map;

/**
 * 1°、實現數字累加求和的案例:數據源不斷產生遞增數字,對產生的數字累加求和。
 * <p>
 * Storm組件:Spout、Bolt、數據是Tuple,使用main中的Topology將spout和bolt進行關聯
 * MapReduce的組件:Mapper和Reducer、數據是Writable,通過一個main中的job將二者關聯
 * <p>
 * 適配器模式(Adapter):BaseRichSpout,其對繼承接口中一些沒必要的方法進行了重寫,但其重寫的代碼沒有實現任何功能。
 * 我們稱這為適配器模式
 */
public class ShuffleGroupingSumTopology {

    /**
     * 數據源
     */
    static class OrderSpout extends BaseRichSpout {

        private Map conf;   // 當前組件配置信息
        private TopologyContext context;    // 當前組件上下文對象
        private SpoutOutputCollector collector; // 發送tuple的組件

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        /**
         * 接收數據的核心方法
         */
        @Override
        public void nextTuple() {
            long num = 0;
            while (true) {
                num++;
                StormUtil.sleep(1000);
                System.out.println("當前時間" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "產生的訂單金額:" + num);
                this.collector.emit(new Values(num));
            }
        }

        /**
         * 是對發送出去的數據的描述schema
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("order_cost"));
        }
    }

    /**
     * 計算和的Bolt節點
     */
    static class SumBolt extends BaseRichBolt {

        private Map conf;   // 當前組件配置信息
        private TopologyContext context;    // 當前組件上下文對象
        private OutputCollector collector; // 發送tuple的組件

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        private Long sumOrderCost = 0L;

        /**
         * 處理數據的核心方法
         */
        @Override
        public void execute(Tuple input) {
            Long orderCost = input.getLongByField("order_cost");
            sumOrderCost += orderCost;

            System.out.println("線程ID:" + Thread.currentThread().getId() + " ,商城網站到目前" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "的商品總交易額" + sumOrderCost);
            StormUtil.sleep(1000);
        }

        /**
         * 如果當前bolt為最后一個處理單元,該方法可以不用管
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }

    /**
     * 構建拓撲,相當于在MapReduce中構建Job
     */
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        /**
         * 設置spout和bolt的dag(有向無環圖)
         */
        builder.setSpout("id_order_spout", new OrderSpout());
        builder.setBolt("id_sum_bolt", new SumBolt(), 3)
                .shuffleGrouping("id_order_spout"); // 通過不同的數據流轉方式,來指定數據的上游組件
        // 使用builder構建topology
        StormTopology topology = builder.createTopology();
        String topologyName = ShuffleGroupingSumTopology.class.getSimpleName();  // 拓撲的名稱
        Config config = new Config();   // Config()對象繼承自HashMap,但本身封裝了一些基本的配置
        // 啟動topology,本地啟動使用LocalCluster,集群啟動使用StormSubmitter
        if (args == null || args.length < 1) {  // 沒有參數時使用本地模式,有參數時使用集群模式
            LocalCluster localCluster = new LocalCluster(); // 本地開發模式,創建的對象為LocalCluster
            localCluster.submitTopology(topologyName, config, topology);
        } else {
            StormSubmitter.submitTopology(topologyName, config, topology);
        }
    }
}

在集群中啟動后,查看輸出的日志信息:

......
2018-04-13 11:53:58.848 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413115358的商品總交易額31
2018-04-13 11:53:59.846 STDIO [INFO] 當前時間20180413115359產生的訂單金額:14
2018-04-13 11:53:59.850 STDIO [INFO] 線程ID:47 ,商城網站到目前20180413115359的商品總交易額30
2018-04-13 11:54:00.847 STDIO [INFO] 當前時間20180413115400產生的訂單金額:15
2018-04-13 11:54:00.851 STDIO [INFO] 線程ID:47 ,商城網站到目前20180413115400的商品總交易額45
2018-04-13 11:54:01.848 STDIO [INFO] 當前時間20180413115401產生的訂單金額:16
2018-04-13 11:54:01.851 STDIO [INFO] 線程ID:45 ,商城網站到目前20180413115401的商品總交易額60
2018-04-13 11:54:02.849 STDIO [INFO] 當前時間20180413115402產生的訂單金額:17
2018-04-13 11:54:02.852 STDIO [INFO] 線程ID:47 ,商城網站到目前20180413115402的商品總交易額62
2018-04-13 11:54:03.851 STDIO [INFO] 當前時間20180413115403產生的訂單金額:18
2018-04-13 11:54:03.855 STDIO [INFO] 線程ID:45 ,商城網站到目前20180413115403的商品總交易額78
2018-04-13 11:54:04.852 STDIO [INFO] 當前時間20180413115404產生的訂單金額:19
2018-04-13 11:54:04.856 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413115404的商品總交易額50
2018-04-13 11:54:05.853 STDIO [INFO] 當前時間20180413115405產生的訂單金額:20
2018-04-13 11:54:05.858 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413115405的商品總交易額70
2018-04-13 11:54:06.855 STDIO [INFO] 當前時間20180413115406產生的訂單金額:21
......

可以看到有bolt有3個線程在執行。

Storm流式分組之All Grouping

將計算總和的例子,spout并行度設置為1,bolt并行度設置為3,group方式設置為AllGrouping,程序代碼如下:

package cn.xpleaf.bigdata.storm.group;

import cn.xpleaf.bigdata.storm.utils.StormUtil;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Date;
import java.util.Map;

/**
 * 1°、實現數字累加求和的案例:數據源不斷產生遞增數字,對產生的數字累加求和。
 * <p>
 * Storm組件:Spout、Bolt、數據是Tuple,使用main中的Topology將spout和bolt進行關聯
 * MapReduce的組件:Mapper和Reducer、數據是Writable,通過一個main中的job將二者關聯
 * <p>
 * 適配器模式(Adapter):BaseRichSpout,其對繼承接口中一些沒必要的方法進行了重寫,但其重寫的代碼沒有實現任何功能。
 * 我們稱這為適配器模式
 */
public class AllGroupingSumTopology {

    /**
     * 數據源
     */
    static class OrderSpout extends BaseRichSpout {

        private Map conf;   // 當前組件配置信息
        private TopologyContext context;    // 當前組件上下文對象
        private SpoutOutputCollector collector; // 發送tuple的組件

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        /**
         * 接收數據的核心方法
         */
        @Override
        public void nextTuple() {
            long num = 0;
            while (true) {
                num++;
                StormUtil.sleep(1000);
                System.out.println("當前時間" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "產生的訂單金額:" + num);
                this.collector.emit(new Values(num));
            }
        }

        /**
         * 是對發送出去的數據的描述schema
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("order_cost"));
        }
    }

    /**
     * 計算和的Bolt節點
     */
    static class SumBolt extends BaseRichBolt {

        private Map conf;   // 當前組件配置信息
        private TopologyContext context;    // 當前組件上下文對象
        private OutputCollector collector; // 發送tuple的組件

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        private Long sumOrderCost = 0L;

        /**
         * 處理數據的核心方法
         */
        @Override
        public void execute(Tuple input) {
            Long orderCost = input.getLongByField("order_cost");
            sumOrderCost += orderCost;

            System.out.println("線程ID:" + Thread.currentThread().getId() + " ,商城網站到目前" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "的商品總交易額" + sumOrderCost);
            StormUtil.sleep(1000);
        }

        /**
         * 如果當前bolt為最后一個處理單元,該方法可以不用管
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }

    /**
     * 構建拓撲,相當于在MapReduce中構建Job
     */
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        /**
         * 設置spout和bolt的dag(有向無環圖)
         */
        builder.setSpout("id_order_spout", new OrderSpout());
        builder.setBolt("id_sum_bolt", new SumBolt(), 3)
                .allGrouping("id_order_spout"); // 通過不同的數據流轉方式,來指定數據的上游組件
        // 使用builder構建topology
        StormTopology topology = builder.createTopology();
        String topologyName = AllGroupingSumTopology.class.getSimpleName();  // 拓撲的名稱
        Config config = new Config();   // Config()對象繼承自HashMap,但本身封裝了一些基本的配置
        // 啟動topology,本地啟動使用LocalCluster,集群啟動使用StormSubmitter
        if (args == null || args.length < 1) {  // 沒有參數時使用本地模式,有參數時使用集群模式
            LocalCluster localCluster = new LocalCluster(); // 本地開發模式,創建的對象為LocalCluster
            localCluster.submitTopology(topologyName, config, topology);
        } else {
            StormSubmitter.submitTopology(topologyName, config, topology);
        }
    }
}

上傳到集群提交后,輸出結果如下:

2018-04-13 12:42:36.992 STDIO [INFO] 當前時間20180413124236產生的訂單金額:1
2018-04-13 12:42:36.998 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413124236的商品總交易額1
2018-04-13 12:42:36.999 STDIO [INFO] 線程ID:45 ,商城網站到目前20180413124236的商品總交易額1
2018-04-13 12:42:37.000 STDIO [INFO] 線程ID:47 ,商城網站到目前20180413124236的商品總交易額1
2018-04-13 12:42:37.995 STDIO [INFO] 當前時間20180413124237產生的訂單金額:2
2018-04-13 12:42:37.999 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413124237的商品總交易額3
2018-04-13 12:42:38.000 STDIO [INFO] 線程ID:45 ,商城網站到目前20180413124237的商品總交易額3
2018-04-13 12:42:38.000 STDIO [INFO] 線程ID:47 ,商城網站到目前20180413124238的商品總交易額3
2018-04-13 12:42:38.996 STDIO [INFO] 當前時間20180413124238產生的訂單金額:3
2018-04-13 12:42:39.000 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413124239的商品總交易額6
2018-04-13 12:42:39.000 STDIO [INFO] 線程ID:45 ,商城網站到目前20180413124239的商品總交易額6
2018-04-13 12:42:39.001 STDIO [INFO] 線程ID:47 ,商城網站到目前20180413124239的商品總交易額6
2018-04-13 12:42:39.998 STDIO [INFO] 當前時間20180413124239產生的訂單金額:4
2018-04-13 12:42:40.001 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413124240的商品總交易額10
2018-04-13 12:42:40.002 STDIO [INFO] 線程ID:45 ,商城網站到目前20180413124240的商品總交易額10
2018-04-13 12:42:40.002 STDIO [INFO] 線程ID:47 ,商城網站到目前20180413124240的商品總交易額10
2018-04-13 12:42:40.999 STDIO [INFO] 當前時間20180413124240產生的訂單金額:5
2018-04-13 12:42:41.002 STDIO [INFO] 線程ID:45 ,商城網站到目前20180413124241的商品總交易額15
2018-04-13 12:42:41.003 STDIO [INFO] 線程ID:47 ,商城網站到目前20180413124241的商品總交易額15
2018-04-13 12:42:41.003 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413124241的商品總交易額15
2018-04-13 12:42:42.000 STDIO [INFO] 當前時間20180413124242產生的訂單金額:6
2018-04-13 12:42:42.004 STDIO [INFO] 線程ID:45 ,商城網站到目前20180413124242的商品總交易額21
2018-04-13 12:42:42.004 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413124242的商品總交易額21
2018-04-13 12:42:42.004 STDIO [INFO] 線程ID:47 ,商城網站到目前20180413124242的商品總交易額21
......

可以看到有三個輸出的bolt都同時收到了spout發送過來的tuple,這確實有點浪費資源。

注意,上面查看bolt的輸出結果,這與多個線程只輸出一份數據不一樣,因為其三個輸出都會同時輸出相同的一份數據,而如果只是多個線程非AllGrouping的情況下,不會同一份數據輸出多次的,這點尤其需要注意。

Storm流式分組之Global Grouping

將計算總和的例子,spout并行度設置為1,bolt并行度設置為3,group方式設置為GlobalGrouping,程序代碼如下:

package cn.xpleaf.bigdata.storm.group;

import cn.xpleaf.bigdata.storm.utils.StormUtil;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Date;
import java.util.Map;

/**
 * 1°、實現數字累加求和的案例:數據源不斷產生遞增數字,對產生的數字累加求和。
 * <p>
 * Storm組件:Spout、Bolt、數據是Tuple,使用main中的Topology將spout和bolt進行關聯
 * MapReduce的組件:Mapper和Reducer、數據是Writable,通過一個main中的job將二者關聯
 * <p>
 * 適配器模式(Adapter):BaseRichSpout,其對繼承接口中一些沒必要的方法進行了重寫,但其重寫的代碼沒有實現任何功能。
 * 我們稱這為適配器模式
 */
public class GlobalGroupingSumTopology {

    /**
     * 數據源
     */
    static class OrderSpout extends BaseRichSpout {

        private Map conf;   // 當前組件配置信息
        private TopologyContext context;    // 當前組件上下文對象
        private SpoutOutputCollector collector; // 發送tuple的組件

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        /**
         * 接收數據的核心方法
         */
        @Override
        public void nextTuple() {
            long num = 0;
            while (true) {
                num++;
                StormUtil.sleep(1000);
                System.out.println("當前時間" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "產生的訂單金額:" + num);
                this.collector.emit(new Values(num));
            }
        }

        /**
         * 是對發送出去的數據的描述schema
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("order_cost"));
        }
    }

    /**
     * 計算和的Bolt節點
     */
    static class SumBolt extends BaseRichBolt {

        private Map conf;   // 當前組件配置信息
        private TopologyContext context;    // 當前組件上下文對象
        private OutputCollector collector; // 發送tuple的組件

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        private Long sumOrderCost = 0L;

        /**
         * 處理數據的核心方法
         */
        @Override
        public void execute(Tuple input) {
            Long orderCost = input.getLongByField("order_cost");
            sumOrderCost += orderCost;

            System.out.println("線程ID:" + Thread.currentThread().getId() + " ,商城網站到目前" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "的商品總交易額" + sumOrderCost);
            StormUtil.sleep(1000);
        }

        /**
         * 如果當前bolt為最后一個處理單元,該方法可以不用管
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }

    /**
     * 構建拓撲,相當于在MapReduce中構建Job
     */
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        /**
         * 設置spout和bolt的dag(有向無環圖)
         */
        builder.setSpout("id_order_spout", new OrderSpout());
        builder.setBolt("id_sum_bolt", new SumBolt(), 3)
                .globalGrouping("id_order_spout"); // 通過不同的數據流轉方式,來指定數據的上游組件
        // 使用builder構建topology
        StormTopology topology = builder.createTopology();
        String topologyName = GlobalGroupingSumTopology.class.getSimpleName();  // 拓撲的名稱
        Config config = new Config();   // Config()對象繼承自HashMap,但本身封裝了一些基本的配置
        // 啟動topology,本地啟動使用LocalCluster,集群啟動使用StormSubmitter
        if (args == null || args.length < 1) {  // 沒有參數時使用本地模式,有參數時使用集群模式
            LocalCluster localCluster = new LocalCluster(); // 本地開發模式,創建的對象為LocalCluster
            localCluster.submitTopology(topologyName, config, topology);
        } else {
            StormSubmitter.submitTopology(topologyName, config, topology);
        }
    }
}

打包上傳到集群運行后,查看其輸出結果如下:

2018-04-13 12:56:06.506 STDIO [INFO] 當前時間20180413125606產生的訂單金額:1
2018-04-13 12:56:06.515 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413125606的商品總交易額1
2018-04-13 12:56:07.512 STDIO [INFO] 當前時間20180413125607產生的訂單金額:2
2018-04-13 12:56:07.516 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413125607的商品總交易額3
2018-04-13 12:56:08.513 STDIO [INFO] 當前時間20180413125608產生的訂單金額:3
2018-04-13 12:56:08.517 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413125608的商品總交易額6
2018-04-13 12:56:09.515 STDIO [INFO] 當前時間20180413125609產生的訂單金額:4
2018-04-13 12:56:09.519 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413125609的商品總交易額10
2018-04-13 12:56:10.518 STDIO [INFO] 當前時間20180413125610產生的訂單金額:5
2018-04-13 12:56:10.521 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413125610的商品總交易額15
2018-04-13 12:56:11.519 STDIO [INFO] 當前時間20180413125611產生的訂單金額:6
2018-04-13 12:56:11.523 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413125611的商品總交易額21
2018-04-13 12:56:12.520 STDIO [INFO] 當前時間20180413125612產生的訂單金額:7
2018-04-13 12:56:12.524 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413125612的商品總交易額28
2018-04-13 12:56:13.521 STDIO [INFO] 當前時間20180413125613產生的訂單金額:8
2018-04-13 12:56:13.525 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413125613的商品總交易額36
......

可以看到這與AllGrouping完全不同,三個bolt的executor線程,但是卻只有一個在執行操作。

通過前面三個流式分組方式的驗證,可以非常清晰地了解其含義:

ShuffleGrouping:三個bolt線程,同時執行,但對于同一個tuple數據,只有一個bolt會接收到,并且是隨機的。

AllGrouping:三個bolt線程,同時執行,但對于同一個tuple數據,3個bolt都會接收到。

GlobalGrouping:三個bolt線程,同時執行,但對于同一個tuple數據,只有固定一個bolt會接收到,其它2個bolt不會接收到。

Storm流式分組之Fields Grouping

在計算總和的例子上,再添加一個user_id的field,對其進行取模計算,同時在設置流式分組方式為根據user_id進行分組,并且為了驗證其概念,設置bolt的并行度為3,這樣理論上來說是,spout上產生的模為1 2 0的的userId的tuple會分別發送到三個不同線程ID的bolt上,后面我們只需要觀察輸出即可。

程序代碼如下:

package cn.xpleaf.bigdata.storm.group;

import cn.xpleaf.bigdata.storm.utils.StormUtil;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Date;
import java.util.Map;

/**
 * 1°、實現數字累加求和的案例:數據源不斷產生遞增數字,對產生的數字累加求和。
 * <p>
 * Storm組件:Spout、Bolt、數據是Tuple,使用main中的Topology將spout和bolt進行關聯
 * MapReduce的組件:Mapper和Reducer、數據是Writable,通過一個main中的job將二者關聯
 * <p>
 * 適配器模式(Adapter):BaseRichSpout,其對繼承接口中一些沒必要的方法進行了重寫,但其重寫的代碼沒有實現任何功能。
 * 我們稱這為適配器模式
 *
 * 流式分組之filedsGrouping,字段分組
 *      有點像SQL中的group by
 *      或者可以理解為hash取模分區
 */
public class FieldsGroupingSumTopology {

    /**
     * 數據源
     */
    static class OrderSpout extends BaseRichSpout {

        private Map conf;   // 當前組件配置信息
        private TopologyContext context;    // 當前組件上下文對象
        private SpoutOutputCollector collector; // 發送tuple的組件

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        /**
         * 接收數據的核心方法
         */
        @Override
        public void nextTuple() {
            long num = 0;
            while (true) {
                num++;
                long userId = num % 3;  // 0 1 2
                System.out.println("當前時間" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + ",用戶-->" + userId + "<--產生的訂單金額:" + num);
                this.collector.emit(new Values(userId, num));
                StormUtil.sleep(1000);
            }
        }

        /**
         * 是對發送出去的數據的描述schema
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("user_id", "order_cost"));
        }
    }

    /**
     * 計算和的Bolt節點
     */
    static class SumBolt extends BaseRichBolt {

        private Map conf;   // 當前組件配置信息
        private TopologyContext context;    // 當前組件上下文對象
        private OutputCollector collector; // 發送tuple的組件

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        private Long sumOrderCost = 0L;

        /**
         * 處理數據的核心方法
         */
        @Override
        public void execute(Tuple input) {
            Long userId = input.getLongByField("user_id");
            Long orderCost = input.getLongByField("order_cost");
            sumOrderCost += orderCost;

            System.out.println("線程ID:" + Thread.currentThread().getId() + " ,商城網站到目前" +
                    StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "用戶-->" + userId + "<--的商品總交易額" + sumOrderCost);
            StormUtil.sleep(1000);
        }

        /**
         * 如果當前bolt為最后一個處理單元,該方法可以不用管
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }

    /**
     * 構建拓撲,相當于在MapReduce中構建Job
     */
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        /**
         * 設置spout和bolt的dag(有向無環圖)
         */
        builder.setSpout("id_order_spout", new OrderSpout());
        builder.setBolt("id_sum_bolt", new SumBolt(), 3)
                .fieldsGrouping("id_order_spout", new Fields("user_id")); // 通過不同的數據流轉方式,來指定數據的上游組件
        // 使用builder構建topology
        StormTopology topology = builder.createTopology();
        String topologyName = FieldsGroupingSumTopology.class.getSimpleName();  // 拓撲的名稱
        Config config = new Config();   // Config()對象繼承自HashMap,但本身封裝了一些基本的配置
        // 啟動topology,本地啟動使用LocalCluster,集群啟動使用StormSubmitter
        if (args == null || args.length < 1) {  // 沒有參數時使用本地模式,有參數時使用集群模式
            LocalCluster localCluster = new LocalCluster(); // 本地開發模式,創建的對象為LocalCluster
            localCluster.submitTopology(topologyName, config, topology);
        } else {
            StormSubmitter.submitTopology(topologyName, config, topology);
        }
    }
}

打包上傳到集群并提交作業后,輸出結果如下:

2018-04-13 15:53:37.836 STDIO [INFO] 當前時間20180413155337,用戶-->1<--產生的訂單金額:1
2018-04-13 15:53:37.843 STDIO [INFO] 線程ID:45 ,商城網站到目前20180413155337用戶-->1<--的商品總交易額1
2018-04-13 15:53:38.839 STDIO [INFO] 當前時間20180413155338,用戶-->2<--產生的訂單金額:2
2018-04-13 15:53:38.844 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413155338用戶-->2<--的商品總交易額2
2018-04-13 15:53:39.841 STDIO [INFO] 當前時間20180413155339,用戶-->0<--產生的訂單金額:3
2018-04-13 15:53:39.845 STDIO [INFO] 線程ID:47 ,商城網站到目前20180413155339用戶-->0<--的商品總交易額3
2018-04-13 15:53:40.842 STDIO [INFO] 當前時間20180413155340,用戶-->1<--產生的訂單金額:4
2018-04-13 15:53:40.846 STDIO [INFO] 線程ID:45 ,商城網站到目前20180413155340用戶-->1<--的商品總交易額5
2018-04-13 15:53:41.844 STDIO [INFO] 當前時間20180413155341,用戶-->2<--產生的訂單金額:5
2018-04-13 15:53:41.850 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413155341用戶-->2<--的商品總交易額7
2018-04-13 15:53:42.848 STDIO [INFO] 當前時間20180413155342,用戶-->0<--產生的訂單金額:6
2018-04-13 15:53:42.851 STDIO [INFO] 線程ID:47 ,商城網站到目前20180413155342用戶-->0<--的商品總交易額9
2018-04-13 15:53:43.849 STDIO [INFO] 當前時間20180413155343,用戶-->1<--產生的訂單金額:7
2018-04-13 15:53:43.852 STDIO [INFO] 線程ID:45 ,商城網站到目前20180413155343用戶-->1<--的商品總交易額12
2018-04-13 15:53:44.850 STDIO [INFO] 當前時間20180413155344,用戶-->2<--產生的訂單金額:8
2018-04-13 15:53:44.853 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413155344用戶-->2<--的商品總交易額15
2018-04-13 15:53:45.852 STDIO [INFO] 當前時間20180413155345,用戶-->0<--產生的訂單金額:9
2018-04-13 15:53:45.855 STDIO [INFO] 線程ID:47 ,商城網站到目前20180413155345用戶-->0<--的商品總交易額18
2018-04-13 15:53:46.853 STDIO [INFO] 當前時間20180413155346,用戶-->1<--產生的訂單金額:10
2018-04-13 15:53:46.856 STDIO [INFO] 線程ID:45 ,商城網站到目前20180413155346用戶-->1<--的商品總交易額22
2018-04-13 15:53:47.854 STDIO [INFO] 當前時間20180413155347,用戶-->2<--產生的訂單金額:11
2018-04-13 15:53:47.859 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413155347用戶-->2<--的商品總交易額26
2018-04-13 15:53:48.855 STDIO [INFO] 當前時間20180413155348,用戶-->0<--產生的訂單金額:12
2018-04-13 15:53:48.860 STDIO [INFO] 線程ID:47 ,商城網站到目前20180413155348用戶-->0<--的商品總交易額30
2018-04-13 15:53:49.857 STDIO [INFO] 當前時間20180413155349,用戶-->1<--產生的訂單金額:13
2018-04-13 15:53:49.860 STDIO [INFO] 線程ID:45 ,商城網站到目前20180413155349用戶-->1<--的商品總交易額35
2018-04-13 15:53:50.859 STDIO [INFO] 當前時間20180413155350,用戶-->2<--產生的訂單金額:14
2018-04-13 15:53:50.862 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413155350用戶-->2<--的商品總交易額40
2018-04-13 15:53:51.860 STDIO [INFO] 當前時間20180413155351,用戶-->0<--產生的訂單金額:15
2018-04-13 15:53:51.863 STDIO [INFO] 線程ID:47 ,商城網站到目前20180413155351用戶-->0<--的商品總交易額45
2018-04-13 15:53:52.861 STDIO [INFO] 當前時間20180413155352,用戶-->1<--產生的訂單金額:16
2018-04-13 15:53:52.863 STDIO [INFO] 線程ID:45 ,商城網站到目前20180413155352用戶-->1<--的商品總交易額51
2018-04-13 15:53:53.862 STDIO [INFO] 當前時間20180413155353,用戶-->2<--產生的訂單金額:17
2018-04-13 15:53:53.866 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413155353用戶-->2<--的商品總交易額57
2018-04-13 15:53:54.863 STDIO [INFO] 當前時間20180413155354,用戶-->0<--產生的訂單金額:18
2018-04-13 15:53:54.867 STDIO [INFO] 線程ID:47 ,商城網站到目前20180413155354用戶-->0<--的商品總交易額63
......

那么結果就顯而易見了,user_id模為1的tuple都發送到ID為45的線程上,user_id模為2的tuple都發送到ID為39的線程上,user_id模為0的tuple都發送到ID為47的線程上。

Storm流式分組之Custom Grouping

自定義流式分組,自定義的Custom Grouping如下:

/**
 * 自定義的流式分組
 * 模擬globalGrouping--->將所有的數據,傳遞到其中的一個task中
 * 模擬fieldsGrouping(后面有時間自己可以實現這一個)
 */
class MyCustomStreamingGrouping implements CustomStreamGrouping {

    private WorkerTopologyContext context;
    private GlobalStreamId stream;
    private List<Integer> targetTasks;

    /**
     * 類似自定義spout或bolt的初始化動作
     * @param context
     * @param stream
     * @param targetTasks   bolt對應的task的列表,如果我們在bolt.setNum(3)--->targetTasks的大小就是3
     */
    @Override
    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
        this.context = context;
        this.stream = stream;
        this.targetTasks = targetTasks;
        System.out.println("bolt對應的task列表: " + targetTasks);
    }

    /**
     *
     * @param taskId
     * @param values    就是tuple
     * @return
     */
    @Override
    public List<Integer> chooseTasks(int taskId, List<Object> values) {
        if(targetTasks.size() < 1) {
            throw new RuntimeException("bolt的task個數居然為0,沒有任務執行作業");
        }
        return Arrays.asList(targetTasks.get(0));
    }
}

其實這就是模擬Global Grouping的自定義流式分組,依然是計算總和的例子,其代碼如下:

package cn.xpleaf.bigdata.storm.group;

import cn.xpleaf.bigdata.storm.utils.StormUtil;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.grouping.CustomStreamGrouping;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.task.WorkerTopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;

/**
 * 1°、實現數字累加求和的案例:數據源不斷產生遞增數字,對產生的數字累加求和。
 * <p>
 * Storm組件:Spout、Bolt、數據是Tuple,使用main中的Topology將spout和bolt進行關聯
 * MapReduce的組件:Mapper和Reducer、數據是Writable,通過一個main中的job將二者關聯
 * <p>
 * 適配器模式(Adapter):BaseRichSpout,其對繼承接口中一些沒必要的方法進行了重寫,但其重寫的代碼沒有實現任何功能。
 * 我們稱這為適配器模式
 *
 * 流式分組之customGrouping,用戶自定義分組
 */
public class CustomGroupingSumTopology {

    /**
     * 數據源
     */
    static class OrderSpout extends BaseRichSpout {

        private Map conf;   // 當前組件配置信息
        private TopologyContext context;    // 當前組件上下文對象
        private SpoutOutputCollector collector; // 發送tuple的組件

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        /**
         * 接收數據的核心方法
         */
        @Override
        public void nextTuple() {
            long num = 0;
            while (true) {
                num++;
                long userId = num % 3;  // 0 1 2
                System.out.println("當前時間" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + ",用戶-->" + userId + "<--產生的訂單金額:" + num);
                this.collector.emit(new Values(userId, num));
                StormUtil.sleep(1000);
            }
        }

        /**
         * 是對發送出去的數據的描述schema
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("user_id", "order_cost"));
        }
    }

    /**
     * 計算和的Bolt節點
     */
    static class SumBolt extends BaseRichBolt {

        private Map conf;   // 當前組件配置信息
        private TopologyContext context;    // 當前組件上下文對象
        private OutputCollector collector; // 發送tuple的組件

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        private Long sumOrderCost = 0L;

        /**
         * 處理數據的核心方法
         */
        @Override
        public void execute(Tuple input) {
            Long userId = input.getLongByField("user_id");
            Long orderCost = input.getLongByField("order_cost");
            sumOrderCost += orderCost;

            System.out.println("線程ID:" + Thread.currentThread().getId() + " ,商城網站到目前" +
                    StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "用戶-->" + userId + "<--的商品總交易額" + sumOrderCost);
            StormUtil.sleep(1000);
        }

        /**
         * 如果當前bolt為最后一個處理單元,該方法可以不用管
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }

    /**
     * 構建拓撲,相當于在MapReduce中構建Job
     */
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        /**
         * 設置spout和bolt的dag(有向無環圖)
         */
        builder.setSpout("id_order_spout", new OrderSpout());
        builder.setBolt("id_sum_bolt", new SumBolt(), 3)
                .customGrouping("id_order_spout", new MyCustomStreamingGrouping()); // 通過不同的數據流轉方式,來指定數據的上游組件
        // 使用builder構建topology
        StormTopology topology = builder.createTopology();
        String topologyName = CustomGroupingSumTopology.class.getSimpleName();  // 拓撲的名稱
        Config config = new Config();   // Config()對象繼承自HashMap,但本身封裝了一些基本的配置
        // 啟動topology,本地啟動使用LocalCluster,集群啟動使用StormSubmitter
        if (args == null || args.length < 1) {  // 沒有參數時使用本地模式,有參數時使用集群模式
            LocalCluster localCluster = new LocalCluster(); // 本地開發模式,創建的對象為LocalCluster
            localCluster.submitTopology(topologyName, config, topology);
        } else {
            StormSubmitter.submitTopology(topologyName, config, topology);
        }
    }
}

上傳到集群并提交作業,其輸出結果如下:

2018-04-13 16:21:23.919 STDIO [INFO] 當前時間20180413162123,用戶-->1<--產生的訂單金額:1
2018-04-13 16:21:23.924 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413162123用戶-->1<--的商品總交易額1
2018-04-13 16:21:24.922 STDIO [INFO] 當前時間20180413162124,用戶-->2<--產生的訂單金額:2
2018-04-13 16:21:24.926 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413162124用戶-->2<--的商品總交易額3
2018-04-13 16:21:25.923 STDIO [INFO] 當前時間20180413162125,用戶-->0<--產生的訂單金額:3
2018-04-13 16:21:25.926 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413162125用戶-->0<--的商品總交易額6
2018-04-13 16:21:26.925 STDIO [INFO] 當前時間20180413162126,用戶-->1<--產生的訂單金額:4
2018-04-13 16:21:26.928 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413162126用戶-->1<--的商品總交易額10
2018-04-13 16:21:27.926 STDIO [INFO] 當前時間20180413162127,用戶-->2<--產生的訂單金額:5
2018-04-13 16:21:27.930 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413162127用戶-->2<--的商品總交易額15
2018-04-13 16:21:28.928 STDIO [INFO] 當前時間20180413162128,用戶-->0<--產生的訂單金額:6
2018-04-13 16:21:28.931 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413162128用戶-->0<--的商品總交易額21
2018-04-13 16:21:29.929 STDIO [INFO] 當前時間20180413162129,用戶-->1<--產生的訂單金額:7
2018-04-13 16:21:29.932 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413162129用戶-->1<--的商品總交易額28
2018-04-13 16:21:30.930 STDIO [INFO] 當前時間20180413162130,用戶-->2<--產生的訂單金額:8
2018-04-13 16:21:30.934 STDIO [INFO] 線程ID:39 ,商城網站到目前20180413162130用戶-->2<--的商品總交易額36
2018-04-13 16:21:31.932 STDIO [INFO] 當前時間20180413162131,用戶-->0<--產生的訂單金額:9
......

可以看到只有一個executor接收到tuple數據,也就是說,通過使用自定義流式分組,確實實現了Global Grouping的功能。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女