本篇文章為大家展示了如何進行Twitter Storm Stream Grouping編寫自定義分組實現,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
##自定義Grouping測試
Storm是支持自定義分組的,本篇文章就是探究Storm如何編寫一個自定義分組器,以及對Storm分組器如何分組數據的理解。
這是我寫的一個自定義分組,總是把數據分到第一個Task:
public class MyFirstStreamGrouping implements CustomStreamGrouping { private static Logger log = LoggerFactory.getLogger(MyFirstStreamGrouping.class); private List<Integer> tasks; @Override public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) { this.tasks = targetTasks; log.info(tasks.toString()); } @Override public List<Integer> chooseTasks(int taskId, List<Object> values) { log.info(values.toString()); return Arrays.asList(tasks.get(0)); } }
從上面的代碼可以看出,該自定義分組會把數據歸并到第一個Task<code>Arrays.asList(tasks.get(0));</code>,也就是數據到達后總是被派發到第一組。
測試代碼:
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("words", new TestWordSpout(), 2); //自定義分組, builder.setBolt("exclaim1", new DefaultStringBolt(), 3) .customGrouping("words", new MyFirstStreamGrouping());
和之前的測試用例一樣,Spout總是發送<code>new String[] {“nathan”, “mike”, “jackson”, “golda”, “bertels”}</code>列表的字符串。我們運行驗證一下:
11878 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson 11943 [Thread-41-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [nathan] 11944 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan 11979 [Thread-29-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [mike] 11980 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike 12045 [Thread-41-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [jackson] 12045 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson 12080 [Thread-29-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [jackson] 12081 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson 12145 [Thread-41-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [mike] 12146 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike
從這個運行日志我們可以看出,數據總是派發到一個Blot:Thread-25-exclaim1。因為我時本地測試,Thread-25-exclaim1是線程名。而派發的線程是數據多個線程的。因此該測試符合預期,即總是發送到一個Task,并且這個Task也是第一個。
##理解自定義分組實現
自己實現一個自定義分組難嗎?其實如果你理解了Hadoop的Partitioner,Storm的CustomStreamGrouping和它也是一樣的道理。
Hadoop MapReduce的Map完成后會把Map的中間結果寫入磁盤,在寫磁盤前,線程首先根據數據最終要傳送到的Reducer把數據劃分成相應的分區,然后不同的分區進入不同的Reduce。我們先來看看Hadoop是怎樣把數據怎樣分組的,這是Partitioner唯一一個方法:
public class Partitioner<K, V> { @Override public int getPartition(K key, V value, int numReduceTasks) { return 0; } }
上面的代碼中:Map輸出的數據都會經過getPartition()方法,用來確定下一步的分組。numReduceTasks是一個Job的Reduce數量,而返回值就是確定該條數據進入哪個Reduce。返回值必須大于等于0,小于numReduceTasks,否則就會報錯。返回0就意味著這條數據進入第一個Reduce。對于隨機分組來說,這個方法可以這么實現:
public int getPartition(K key, V value, int numReduceTasks) { return hash(key) % numReduceTasks; }
其實Hadoop 默認的Hash分組策略也正是這么實現的。這樣好處是,數據在整個集群基本上是負載平衡的。
搞通了Hadoop的Partitioner,我們來看看Storm的CustomStreamGrouping。
這是CustomStreamGrouping類的源碼:
public interface CustomStreamGrouping extends Serializable { void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks); List<Integer> chooseTasks(int taskId, List<Object> values); }
一模一樣的道理,targetTasks就是Storm運行時告訴你,當前有幾個目標Task可以選擇,每一個都給編上了數字編號。而 <code> chooseTasks(int taskId, List<Object> values); </code> 就是讓你選擇,你的這條數據values,是要哪幾個目標Task處理?
如上文文章開頭的自定義分組器實現的代碼,我選擇的總是讓第一個Task來處理數據,<code> return Arrays.asList(tasks.get(0)); </code> 。和Hadoop不同的是,Storm允許一條數據被多個Task處理,因此返回值是List<Integer>.就是讓你來在提供的 'List<Integer> targetTasks' Task中選擇任意的幾個(必須至少是一個)Task來處理數據。
上述內容就是如何進行Twitter Storm Stream Grouping編寫自定義分組實現,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。