這篇文章主要介紹了Flink的函數有哪些,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
1. Map: 將數據流中的數據進行一個轉化,形成一個新的數據流,消費一個元素,并且產生一個元素
具體代碼實現
package com.wudl.core;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @version v1.0
* @ProjectName Flinklearning
* @ClassName WordMap
* @Description TODO map 算子實例
* @Date 2020/10/29 10:15
*/
public class WordMap {
/**
* @param args
* Map 函數的用法
* 映射:將數據流中的數據進行一個轉化,形成一個新的數據流,消費一個元素,并且產生一個元素
*參數: Lambda 表達式或者,new MapFunction實現類
* 返回值:DataStream
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setMaxParallelism(1);
env.socketTextStream("10.204.125.140", 8899)
.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
String[] split = s.split(",");
return split[0] + "---" + split[1];
}
}).print();
env.execute();
}
}2. FlatMap:
將數據流中的整體拆分成一個 一個 的個體使用, 消費一個元素并產生零到多個元素
package com.wudl.core;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
import java.util.List;
/**
* @version v1.0
* @ProjectName Flinklearning
* @ClassName TransformFlatMap
* @Description TODO FlatMap
*
* FlatMap: 是一種扁平的映射,將數據流中的整體拆分成為一個個的個體使用, 消費后的元素產生零到多個元素
*
*
*
* @Author wudl
* @Date 2020/10/29 10:46
*
*
* 函數 FlatMap
* 將數據流中的整體拆分成一個 一個 的個體使用, 消費一個元素并產生零到多個元素
* 參數: lambda 表達式或者是FlatFunction的實現類
* 返回值:DataStream
*
*
*
*/
public class TransformFlatMap {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// DataStreamSource<List<Integer>> listDs = env.fromCollection(Arrays.asList(
// Arrays.asList(1, 2, 3),
// Arrays.asList(3, 4, 5),
// Arrays.asList(8,9,0)
// ));
// listDs.flatMap(new FlatMapFunction<List<Integer>, Integer>() {
// @Override
// public void flatMap(List<Integer> list, Collector<Integer> collector) throws Exception {
//
// for (Integer number : list) {
// collector.collect(number + 100);
// }
//
// }
// }).print();
DataStreamSource<String> strDs = env.socketTextStream("10.204.125.140", 8899);
strDs.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
String[] split = s.split(",");
collector.collect(split[0]+split[1]);
}
}).print();
env.execute();
}
}第三種:Filter 對數據流的過濾根據指定的規則將滿足條件的(true) 的數據保留, 不瞞住條件的(false) 將丟棄
package com.wudl.core;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @version v1.0
* @ProjectName Flinklearning
* @ClassName TransformFilter
* @Description TODO 流的過濾
* @Date 2020/11/5 10:26
*/
public class TransformFilter {
/**
* 函數中Filter 中過濾
* 過濾:根據指定的規則將滿足條件的(true) 的數據保留, 不瞞住條件的(false) 將丟棄
* 返回值:DataStream
*/
public static void main(String[] args) throws Exception {
//1.獲取上下文的環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.設置并行度
env.setParallelism(1);
//3.獲取數據流
DataStreamSource<String> SourceDs = env.socketTextStream("10.204.125.140", 8899);
//4. 過濾數據流
DataStream<String> filter = SourceDs.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
String[] split = value.split(",");
return split[1].length() > 3;
}
});
filter.print();
env.execute();
}
}感謝你能夠認真閱讀完這篇文章,希望小編分享的“Flink的函數有哪些”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。