Flink實現UDF函式Filter、MapFunction

Flink暴露了所有udf函式的介面,實現方式為介面或者抽象類。

實現MapFunction介面示例:

實現溫度感測器例項轉換成(感測器Id-溫度)字串描述。

自定義MapFunction類

public class CustomMapFunction implements MapFunction { @Override public String map(SensorReading sensorReading) throws Exception { return sensorReading。sensorId+“-”+sensorReading。temperature; }}

轉換實現

// 建立流處理的執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment。getExecutionEnvironment(); // 從檔案中讀取資料 String inputPath = “F:\\Projects\\BigData\\Flink\\FlinkTutorial\\src\\main\\resources\\sensor。txt”; // 獲取資料 DataStreamSource dataStream = env。readTextFile(inputPath); // 1、先轉換成SensorReading型別(簡單轉換操作) DataStream stream1 = dataStream。map(new MapFunction() { @Override public SensorReading map(String data) throws Exception { String[] arr = data。split(“,”); return new SensorReading(arr[0], arr[1], Double。valueOf(arr[2]。toString())); } }); // 呼叫自定義CustomMapFunction類的,轉換輸出 SingleOutputStreamOperator dataStreamMap = stream1。map(new CustomMapFunction()); dataStreamMap。print(“CustomMapFunction”); env。execute(“Function test”);

在idea 執行main 執行效果

Flink實現UDF函式Filter、MapFunction

實現FilterFunction介面示例:

實現過濾溫度大於30度的溫度感測器。

自定義FilterFunction類

public class CustomFilterFunction implements FilterFunction { @Override public boolean filter(SensorReading sensorReading) throws Exception { return sensorReading。temperature>30。0; }}

過濾實現

// 建立流處理的執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment。getExecutionEnvironment(); // 從檔案中讀取資料 String inputPath = “F:\\Projects\\BigData\\Flink\\FlinkTutorial\\src\\main\\resources\\sensor。txt”; // 獲取資料 DataStreamSource dataStream = env。readTextFile(inputPath); // 1、先轉換成SensorReading型別(簡單轉換操作) DataStream stream1 = dataStream。map(new MapFunction() { @Override public SensorReading map(String data) throws Exception { String[] arr = data。split(“,”); return new SensorReading(arr[0], arr[1], Double。valueOf(arr[2]。toString())); } }); // 呼叫自定義CustomFilterFunction類的,實現過濾 DataStream dataStreamFilter = stream1。filter(new CustomFilterFunction()); dataStreamFilter 。print(“CustomFilterFunction”); env。execute(“Function test”);

執行main 執行效果

Flink實現UDF函式Filter、MapFunction