Flink笔记(二十一):Flink 之 OperatorState的使用(已实现Exactly-Once)

Agnes ·
更新时间:2024-11-13
· 715 次阅读

场景:

        从指定路径读取文件中的内容,即实现 tail -f 的功能。

文件:

在这里插入图片描述

需求: 并行读取数据; 获取 OperatorState 状态 + Checkpoint 机制,保证 Exactly-Once(已读数据在任务异常重启之后不会再次去读); 文件夹有多个.txt 文件,根据 subTask Index来读取文件。即 subTask0 读取 0.txt 文件,subTask 1读取 1.txt 文件。 代码:  1.自定义Source /** * TODO 自定义可并行Source(保证Exactly-Once) * --->extends RichParallelSourceFunction:用于实现可并行 Source * --->implements CheckpointedFunction:Checkpoint机制,用来保证 Exactly-Once * @author liuzebiao * @Date 2020-2-16 16:21 */ public class MyExactlyOnceParFileSource extends RichParallelSourceFunction<Tuple2> implements CheckpointedFunction { private String path = "C:\\Users\\a\\Desktop\\test"; private Boolean flag = true; private long offset = 0;//偏移量默认值 private transient ListState offsetState;//状态数据不参与序列化,添加 transient 修饰 public MyExactlyOnceParFileSource() { } public MyExactlyOnceParFileSource(String path) { this.path = path; } /** * run()方法,用于一直运行产生数据 * @param ctx * @throws Exception */ @Override public void run(SourceContext<Tuple2> ctx) throws Exception { //获取 offsetState 中的历史值(赋值给offset) Iterator iterator = offsetState.get().iterator(); while (iterator.hasNext()) { offset = iterator.next(); } //获取当前 subTask 的 index 值 int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); //定义用于读取的文件路径 RandomAccessFile randomAccessFile = new RandomAccessFile(path+"/"+subtaskIndex+".txt", "r"); //从指定偏移量位置读取 randomAccessFile.seek(offset); //多并行线程不安全问题。需要加锁。其实 Flink 已经帮我们想到这种情况了。 final Object checkpointLock = ctx.getCheckpointLock();//最好用final修饰 while (flag) { String line = randomAccessFile.readLine(); if (line != null) { line = new String(line.getBytes("ISO-8859-1"), "UTF-8"); synchronized (checkpointLock){ //获取 randomAccessFile 已经读完数据的指针 offset = randomAccessFile.getFilePointer(); //将数据发送出去 ctx.collect(Tuple2.of(subtaskIndex+"",line)); } }else{ Thread.sleep(1000); } } } /** * cancel() 方法,用于关闭Source */ @Override public void cancel() { flag = false; } /** * 定期将指定的状态数据,保存到 StateBackEnd 中 * @param functionSnapshotContext * @throws Exception */ @Override public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { //将历史值清除 offsetState.clear(); // 更新最新的状态值 offsetState.add(offset); } /** * 初始化状态(初始化OperatorState) 相当于subTask new完成之后构造器的生命周期方法,构造器执行完会执行一次 * * 从 StateBackend 中取状态 * @param context * @throws Exception */ @Override public void initializeState(FunctionInitializationContext context) throws Exception { //定义一个状态描述器(根据状态描述器 1.初始化状态 或者 2.获取历史状态) ListStateDescriptor stateDescriptor = new ListStateDescriptor( "offset-state",//指定状态描述器s称(可以随便定义,但是一个Job任务中不能重复) Types.LONG // TypeInformation.of(new TypeHint() {}) // Long.class ); //获取 operatorState 数据 offsetState = context.getOperatorStateStore().getListState(stateDescriptor); } }  2.任务代码

     从自定义Source中读取文件中的内容,即实现 tail -f 的功能。

/** * TODO 从自定义Source中读取文件中的内容,即实现 tail -f 的功能。 * * @author liuzebiao * @Date 2020-2-17 13:26 */ public class ReadTxtContentJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //1.只有开启了CheckPointing,才会有重启策略 env.enableCheckpointing(5000); //2.设置并行度为2,因为读取的文件夹中就存放了2个文件(0.txt和1.txt) env.setParallelism(2); /**此部分读取Socket数据,只是用来人为出现异常,触发重启策略。验证重启后是否会再次去读之前已读过的数据(Exactly-Once)*/ /*************** start **************/ DataStreamSource socketTextStream = env.socketTextStream("localhost", 8888); SingleOutputStreamOperator streamOperator = socketTextStream.map(new MapFunction() { @Override public String map(String word) throws Exception { if ("exception".equals(word)) { throw new RuntimeException("Throw Exception"); } return word; } }); /************* end **************/ //3.读取自定义 Source 中的数据 DataStreamSource<Tuple2> streamSource = env.addSource(new MyExactlyOnceParFileSource()); //4.Sink部分,直接输出至控制台 streamSource.print(); env.execute("ReadTxtContentJob"); } } 测试结果: 任务启动,会读取 0.txt 和 1.txt 中的所有内容; 当向 0.txt 和 1.txt 文件中追加数据,会实时读取追加的内容; 当认为出现异常,启动重启策略。重启后,由于使用 Checkpoint 机制,之前读过的内容不会再读,保证了 Exactly-Once。
作者:扛麻袋的少年



flink once

需要 登录 后方可回复, 如果你还没有账号请 注册新账号