Flink笔记(十八):Flink 之 StateBackend 介绍 使用

Hedva ·
更新时间:2024-11-13
· 838 次阅读

1.StateBackEnd

       用来保存 State 的存储后端就叫做StateBackendStateBackend 默认是保存在 JobManager 的内存中,也可以保存在 本地文件系统 或者 HDFS 分布式文件系统中。

       当检查点(CheckPoint)机制启动时,状态将在检查点中持久化来应对数据的丢失以及恢复。而1.状态在内部是如何表示的? 2.状态是如何持久化到检查点中以及3.持久化到哪里都取决于选定的StateBackend。

2.可用的StateBackEnd

   Flink为我们提供了如下三种Statebackend实现:

     1. MemoryStateBackend
     2. FsStateBackend
     3. RocksDBStateBackend
在这里插入图片描述
       在没有配置 StateBackend 的情况下,Flink默认使用的是MemoryStateBackend。即:将 CheckPointing 数据保存在 JobManager 的内存中。

3.配置StateBackend

       Flink 提供了不同的StateBackend,用于指定 State 状态的存储方式和位置。
默认情况下,我们可以在配置文件 flink-conf.yaml 中确定所有Flink作业的 StateBackend。(打开109、114行注释,自行选择即可。filesystem为 hdfs)

       可能的配置项是 jobmanager (MemoryStateBackend)filesystem (FsStateBackend)** rocksdb (RocksDBStateBackend**),或者实现了状态后端工厂 FsStateBackendFactory 的类的完全限定类名,例如:为RocksDBStateBackend设置为org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
如下图所示:(配置完成后需重启 Flink)
在这里插入图片描述
       但是,在 flink-conf.yaml中配置的默认StateBackend,按我们开发的每一个Flink任务,StateBackend都是可以被覆盖的,如下所示

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(...); 4.StateBackend 使用 4.1 写入CheckPointing数据到 hdfs

 场景: 读取Socket中的数据,WordCount 求和。

4.1.1 添加 Hadoop 整合包

        Flink 与 Hadoop 整合包,请参考:Flink环境搭建,链接有介绍。将 2.7.5 版本 Hadoop 整合包,复制到 Flink 目录下的 lib 文件夹下,然后重启 Flink 集群。
在这里插入图片描述

4.1.2 代码 /** * TODO 写入CheckPoint数据到 hdfs * * @author liuzebiao * @Date 2020-2-15 20:13 */ public class StateBackEndDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //只有开启了checkpointing,才会有重启策略(多长时间执行一次checkpoint) env.enableCheckpointing(5000); //默认的重启策略是:固定延迟无限重启 //设置重启策略 env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(3,2));//重启3次,隔2秒一次 //设置状态数据存储后端(此处设置后,会覆盖 flink-conf.yaml 中的配置) //env.setStateBackend(new FsStateBackend("hdfs://master:9000/test")); //系统异常退出或人为 Cancel 掉,不删除checkpoint数据 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //通过Socket实时获取数据 DataStreamSource lines = env.socketTextStream("192.168.204.210", 8888); //将数据转换成 Tuple 元组 SingleOutputStreamOperator<Tuple2> streamOperator = lines.map(new MapFunction<String, Tuple2>() { @Override public Tuple2 map(String str) throws Exception { return Tuple2.of(str, 1); } }); //keyBy() KeyedStream<Tuple2, Tuple> keyedStream = streamOperator.keyBy(0); //sum() SingleOutputStreamOperator<Tuple2> summed = keyedStream.sum(1); summed.print(); env.execute("StateBackEndDemo"); } } 4.1.3 将任务上传至集群,启动任务

    请参考:Flink 提交任务的两种方式。推荐使用 Web 页面方式上传。
在这里插入图片描述

4.1.4 查看任务执行页面 Checkpoints 信息

       我们可以点击目前运行的任务,然后查看 Checkpoints 的信息。此时任务ID:31f48d20d48f0bec256e6b4d24553b8a
在这里插入图片描述

4.1.5 查询 HDFS 记录的 CheckPoint

       CheckPoint 记录,配置文件 flink-conf.yaml 中,配置路径为:hdfs://192.168.204.210:9000/StateBackend,因为 1.1.1 代码中未作 StateBackend 路径覆盖,所以 CheckPoint 数据记录在 hdfs://192.168.204.210:9000/StateBackend路径下。
在这里插入图片描述
CheckPoint 中具体保存的信息,如下:
在这里插入图片描述

4.2 写入数据到本地系统

    同1.1.2 代码,只需修改为本地系统路径即可。其他代码不变。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new FsStateBackend("file:///D:/study_workspace/flink_demo/flink-java/backend"));

其他细节就不再做过多赘述了。

5.Flink 从 Checkpoint 中恢复数据

请跳转下文查看:Flink笔记(十九):Flink 从 Checkpoint 中恢复数据

Flink 之 StateBackend 部分,介绍到此为止

文章都是博主用心编写,如果本文对你有所帮助,那就给我点个赞呗 ^ _ ^

End


作者:扛麻袋的少年



flink

需要 登录 后方可回复, 如果你还没有账号请 注册新账号