不使用 Flink 提供的 sum() 方法,使用 KeyedState 完成对单词分组求和计算操作。
代码:/**
* TODO 不使用 Flink 提供的sum()方法,对单词进行分组求和计算。
*
* @author liuzebiao
* @Date 2020-2-17 11:42
*/
public class KeyedStateDemo {
public static void main(String[] args) throws Exception {
//1.创建一个 flink steam 程序的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.使用StreamExecutionEnvironment创建DataStream
DataStreamSource lines = env.socketTextStream( "localhost", 8888);
//Transformation(s) 对数据进行处理操作
SingleOutputStreamOperator<Tuple2> wordAndOne = lines.map(new MapFunction<String, Tuple2>() {
@Override
public Tuple2 map(String word) {
//将每个单词与 1 组合,形成一个元组
return Tuple2.of(word, 1);
}
});
//进行分组聚合(keyBy:将key相同的分到一个组中)
KeyedStream<Tuple2, Tuple> keyedStream = wordAndOne.keyBy(0);
/** 使用 KeyedState 通过中间状态求和 ----- start ----**/
SingleOutputStreamOperator<Tuple2> summed = keyedStream.map(new RichMapFunction<Tuple2, Tuple2>() {
//状态数据不参与序列化,添加 transient 修饰
private transient ValueState valueState;
@Override
public void open(Configuration parameters) throws Exception {
//获取 KeyedState 数据
ValueStateDescriptor stateDescriptor = new ValueStateDescriptor("sum-key-state", Types.TUPLE(Types.STRING, Types.INT));
valueState = getRuntimeContext().getState(stateDescriptor);
}
@Override
public Tuple2 map(Tuple2 tuple2) throws Exception {
//输入的单词
String word = tuple2.f0;
//输入的次数
Integer count = tuple2.f1;
//根据State获取中间数据
Integer historyVal = valueState.value();
//根据State中间数据,进行累加
if (historyVal != null) {
historyVal += count;
//累加后,更新State数据
valueState.update(historyVal);
} else {
valueState.update(count);
}
return Tuple2.of(word,valueState.value());
}
});
/** 使用 KeyedState 通过中间状态求和 ----- end ----**/
//Transformation 结束
//3.调用Sink (Sink必须调用)
summed.print();
//启动(这个异常不建议try...catch... 捕获,因为它会抛给上层flink,flink根据异常来做相应的重启策略等处理)
env.execute("KeyedStateDemo");
}
}
3.测试结果:
可以实现和 sum() 方法一模一样的求和操作。
KeyedState Demo,介绍到此为止
文章都是博主用心编写,如果本文对你有所帮助,那就给我点个赞呗 ^ _ ^
End