Spark 是一個基于內存式的分布式計算框架。具有高性能,高效可擴展,容錯等優點。
今天講解一下spark的流計算,其實它也不完全是實時的流計算,算是一種準實時的流計算。
上圖講解

運行環境:需要linux環境下的spark環境
本例用的centOS 6.5x64 因為需要使用TCP協議傳輸數據,所以需要安裝一個nc插件。
安裝方式: yum install ncxxx 或者掛載光盤安裝
安裝后啟動nc -lk 9999 端口可以隨便指定,最好是1024以上的就可以。
下面貼出代碼
java版本的
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import com.google.common.base.Optional;
import scala.Tuple2;
public class SparkDemo {
public static void main(String[] args) {
SparkConf conf=new SparkConf().setAppName("sparkDemo2").setMaster("local[3]");
JavaStreamingContext jsc=new JavaStreamingContext(conf,Durations.seconds(5));
//使用帶狀態的算子,需要checkpoint做容錯處理
jsc.checkpoint("D://chkspark");
JavaReceiverInputDStream<String> socketTextStream=jsc.socketTextStream("10.115.27.234", 1000);
JavaDStream<String> wordsDstream=socketTextStream.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID=1L;
public Iterable<String> call(String line) throws Exception {
return Arrays.asList(line.split(" "));
}
});
JavaPairDStream<String, Integer> wordsToPairDstream=wordsDstream.mapToPair(new PairFunction<String, String,Integer>() {
private static final long SerialVersionUID=1L;
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
/**
* 一個batch對應一個RDD。
* */
JavaPairDStream<String, Integer> resultDstream=wordsToPairDstream.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
private static final long serialVersionUID=1L;
public Optional<Integer> call(List<Integer> values, Optional<Integer> state) throws Exception {
Integer oldValue=0; //默認舊value是0
if (state.isPresent()) {
oldValue=state.get();
}
for (Integer value:values) {
oldValue+=value;
}
return Optional.of(oldValue);
}
});
//打印結果
resultDstream.print();
jsc.start();
jsc.awaitTermination();
}
}程序測試: 從linux端的nc 下輸入任意字符串,spark streaming會實時對輸入的數據做出統計。類似于wordcount. 除非手動kill這個進程,否則會一直運行下去。因為它的原理就是和自來水的水流一樣,是一連串的數據流。
運行結果展示:

也可以用scala寫出同樣的程序,代碼量更少。
需要深入理解spark streaming的架構原理。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。