本篇文章給大家分享的是有關如何進行storm1.1.3與kafka1.0.0整合,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
package hgs.core.sk;
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
@SuppressWarnings("deprecation")
public class StormKafkaMainTest {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
//zookeeper鏈接地址
BrokerHosts hosts = new ZkHosts("bigdata01:2181,bigdata02:2181,bigdata03:2181");
//KafkaSpout需要一個config,參數代表的意義1:zookeeper鏈接,2:消費kafka的topic,3,4:記錄消費offset的zookeeper地址 ,這里會保存在 zookeeper
//集群的/test7/consume下面
SpoutConfig sconfig = new SpoutConfig(hosts, "test7", "/test7", "consume");
//消費的時候忽略offset從頭開始消費,這里可以注釋掉,因為消費的offset在zookeeper中可以找到
sconfig.ignoreZkOffsets=true;
//sconfig.scheme = new SchemeAsMultiScheme( new StringScheme() );
builder.setSpout("kafkaspout", new KafkaSpout(sconfig), 1);
builder.setBolt("mybolt1", new MyboltO(), 1).shuffleGrouping("kafkaspout");
Config config = new Config();
config.setNumWorkers(1);
try {
StormSubmitter.submitTopology("storm----kafka--test", config, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
/* LocalCluster cu = new LocalCluster();
cu.submitTopology("test", config, builder.createTopology());*/
}
}
class MyboltO extends BaseRichBolt{
private static final long serialVersionUID = 1L;
OutputCollector collector = null;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple input) {
//這里把消息大一出來,在對應的woker下面的日志可以找到打印的內容
//因為得到的內容是byte數組,所以需要轉換
String out = new String((byte[])input.getValue(0));
System.out.println(out);
collector.ack(input);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}pom.xml文件的依賴
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>hgs</groupId> <artifactId>core.sk</artifactId> <version>1.0.0-SNAPSHOT</version> <packaging>jar</packaging> <name>core.sk</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>1.1.3</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.1.3</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>1.0.0</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <!-- <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka-monitor</artifactId> <version>1.2.2</version> </dependency> --> <!-- <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.8.2.1</version> </dependency> --> <dependency> <groupId>org.clojure</groupId> <artifactId>clojure</artifactId> <version>1.7.0</version> </dependency> <!-- 嘗試了很多次 都會有這個錯誤: java.lang.NullPointerException at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getOffsetLags(KafkaOffsetLagUtil.java:272) 最后修改為kafka相應的kafka-clients版本后問題得到解決,應該是該出的問題 --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>2.2</version> <configuration> <archive> <manifest> <!-- 我運行這個jar所運行的主類 --> <mainClass>hgs.core.sk.StormKafkaMainTest</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef> <!-- 必須是這樣寫 --> jar-with-dependencies </descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
以上就是如何進行storm1.1.3與kafka1.0.0整合,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。