溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

storm-kafka(storm spout作為kafka的消費端)

發布時間:2020-07-25 21:50:51 來源:網絡 閱讀:11308 作者:jethai 欄目:大數據

storm是grovvy寫的

kafka是scala寫的

storm-kafka  storm連接kafka consumer的插件

下載地址:

https://github.com/wurstmeister/storm-kafka-0.8-plus




除了需要storm和kafka相關jar包還需要google-collections-1.0.jar

以及zookeeper相關包 curator-framework-1.3.3.jar和curator-client-1.3.3.jar

以前由com.netflix.curator組織開發現在歸到org.apache.curator下面



1.Kafka Consumer即Storm Spout代碼

package demo;

import java.util.ArrayList;
import java.util.List;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;

public class MyKafkaSpout {
public static void main(String[] args) {
    
    String topic ="track";
    ZkHosts zkhosts  = new ZkHosts("192.168.1.107:2181,192.168.1.108:2181,192.168.1.109:2181");
    
    SpoutConfig spoutConfig = new SpoutConfig(zkhosts, topic,
            "/MyKafka", //偏移量offset的根目錄
            "MyTrack");//子目錄對應一個應用    
    List<String> zkServers=new ArrayList<String>();
    //zkServers.add("192.168.1.107");
    //zkServers.add("192.168.1.108");
    for(String host:zkhosts.brokerZkStr.split(","))
    {
        zkServers.add(host.split(":")[0]);
    }
    
    spoutConfig.zkServers=zkServers;
    spoutConfig.zkPort=2181;
    spoutConfig.forceFromStart=true;//從頭開始消費,實際上是要改成false的
    spoutConfig.socketTimeoutMs=60;
    spoutConfig.scheme=new SchemeAsMultiScheme(new StringScheme());//定義輸出為string類型
    
    TopologyBuilder builder=new TopologyBuilder();
    builder.setSpout("spout", new KafkaSpout(spoutConfig),1);//引用spout,并發度設為1
    builder.setBolt("bolt1", new MyKafkaBolt(),1).shuffleGrouping("spout");
    
    Config config =new Config();
    config.setDebug(true);//上線之前都要改成false否則日志會非常多
    if(args.length>0){
        
        try {
            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
        } catch (AlreadyAliveException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InvalidTopologyException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        
    }else{
        
        LocalCluster localCluster=new LocalCluster();
        localCluster.submitTopology("mytopology", config,  builder.createTopology());
        //本地模式在一個進程里面模擬一個storm集群的所有功能
    }
    
    
    
}
}


2.Bolt代碼只是簡單打印輸出,覆寫execute方法即可

package demo;

import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class MyKafkaBolt implements IBasicBolt {

    @Override
    public void declareOutputFields(OutputFieldsDeclarer arg0) {
        // TODO Auto-generated method stub

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void cleanup() {
        // TODO Auto-generated method stub

    }

    @Override
    public void execute(Tuple input, BasicOutputCollector arg1) {
    String kafkaMsg =input.getString(0);
    System.err.println("bolt"+kafkaMsg);

    }

    @Override
    public void prepare(Map arg0, TopologyContext arg1) {
        // TODO Auto-generated method stub

    }

}






向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女