Flink之slot、并行度、graph优化

Chynna ·
更新时间:2024-09-20
· 563 次阅读

一、Flink概述

Flink运行时主要角色有两个:JobManager和TaskManager。
JobManager主要是负责接受客户端的job,调度job,协调checkpoint等。
TaskManager执行具体的Task。TaskManager为了对资源进行隔离和增加允许的task数,引入了slot的概念,这个slot对资源的隔离仅仅是对内存进行隔离,策略是均分,比如taskmanager的管理内存是3GB,假如有两个个slot,那么每个slot就仅仅有1.5GB内存可用。Client这个角色主要是为job提交做些准备工作,比如构建jobgraph提交到jobmanager,提交完了可以立即退出,当然也可以用client来监控进度。

Jobmanager和TaskManager之间通信类似于Spark 的早期版本,采用的是actor系统。如下图
在这里插入图片描述

二、Flink的task

Flink每个算子都可以设置并行度,然后就是也可以设置全局并行度。
Flink的task按理说应该是每个算子的一个并行度实例就是一个subtask-在这里为了区分暂时叫做substask。那么,带来很多问题,由于flink的taskmanager运行task的时候是每个task采用一个单独的线程,这就会带来很多线程切换开销,进而影响吞吐量。为了减轻这种情况,flink进行了优化,也即对subtask进行链式操作,链式操作结束之后得到的task,再作为一个调度执行单元,放到一个线程里执行。如下图的,source/map 两个算子进行了链式;keyby/window/apply有进行了链式,sink单独的一个。
在这里插入图片描述
说明:图中假设是source/map的并行度都是2,keyby/window/apply的并行度也都是2,sink的是1,总共task有五个,最终需要五个线程。

三、Slot Sharing

Slot Sharing是指,来自同一个Job且拥有相同slotSharingGroup(默认:default)名称的不同Task的SubTask之间可以共享一个Slot,这使得一个Slot有机会持有Job的一整条Pipeline,这也是上文提到的在默认slotSharing的条件下Job启动所需的Slot数和Job中Operator的最大parallelism相等的原因。通过Slot Sharing机制可以更进一步提高Job运行性能,在Slot数不变的情况下增加了Operator可设置的最大的并行度,让类似window这种消耗资源的Task以最大的并行度分布在不同TM上,同时像map、filter这种较简单的操作也不会独占Slot资源,降低资源浪费的可能性。

默认情况下,flink允许如果任务是不同的task的时候,允许任务共享slot,当然,前提是必须在同一个job内部。

结果就是,每个slot可以执行job的一整个pipeline,如上图。这样做的好处主要有以下几点:

1.Flink 集群所需的taskslots数与job中最高的并行度一致。也就是说我们不需要再去计算一个程序总共会起多少个task了。

2.更容易获得更充分的资源利用。如果没有slot共享,那么非密集型操作source/flatmap就会占用同密集型操作 keyAggregation/sink 一样多的资源。如果有slot共享,将基线的2个并行度增加到6个,能充分利用slot资源,同时保证每个TaskManager能平均分配到重的subtasks,比如keyby/window/apply操作就会均分到申请的所有slot里,这样slot的负载就均衡了。
链式的原则,也即是什么情况下才会对task进行链式操作呢?简单梗概一下:

上下游的并行度一致 下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入) 上下游节点都在同一个 slot group 中(下面会解释 slot group) 下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS) 上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD) 两个节点间数据分区方式是 forward(参考理解数据流的分区) 用户没有禁用 chain 四、slot和parallelism

1.slot是指taskmanager的并发执行能力
在这里插入图片描述
taskmanager.numberOfTaskSlots:3
每一个taskmanager中的分配3个TaskSlot,3个taskmanager一共有9个TaskSlot

2.parallelism是指taskmanager实际使用的并发能力
在这里插入图片描述
parallelism.default:1

运行程序默认的并行度为1,9个TaskSlot只用了1个,有8个空闲。设置合适的并行度才能提高效率。

3.parallelism是可配置、可指定的
在这里插入图片描述
1.可以通过修改$FLINK_HOME/conf/flink-conf.yaml文件的方式更改并行度

2.可以通过设置$FLINK_HOME/bin/flink 的-p参数修改并行度

3.可以通过设置executionEnvironmentk的方法修改并行度

4.可以通过设置flink的编程API修改过并行度

5.这些并行度设置优先级从低到高排序,排序为api>env>p>file.

6.设置合适的并行度,能提高运算效率

7.parallelism不能多于slot个数。

五、Operator Chain & Slot Sharing API

Flink在默认情况下有策略对Job进行Operator Chain 和 Slot Sharing的控制,比如:将并行度相同且连续的SingleOutputStreamOperator操作chain在一起,Job的所有Task都采用名为default的slotSharingGroup做Slot Sharing。但在实际的需求场景中,我们可能会遇到需人为干预Job的Operator Chain 或 Slot Sharing策略的情况,本段就重点关注下用于改变默认Chain 和 Sharing策略的API。

StreamExecutionEnvironment.disableOperatorChaining():关闭整个Job的Operator
Chain,每个Operator独自占有一个Task,如上图四所描述的Job,如果disableOperatorChaining则
source->map会拆开为source(),
map()两种Task,Job实际的Task数会增加到7。这个设置会降低Job性能,在非生产环境的测试或profiling时可以借助以更好分析问题,实际生产过程中不建议使用。 someStream.filter(…).map(…).startNewChain().map():startNewChain()是指从当前Operator[map]开始一个新的chain,即:两个map会chaining在一起而filter不会(因为startNewChain的存在使得第一次map与filter断开了chain)。 someStream.map(…).disableChaining():disableChaining()是指当前Operator[map]禁用Operator
Chain,即:Operator[map]会独自占用一个Task。 someStream.map(…).slotSharingGroup(“name”):默认情况下所有Operator的slotGroup都为default,可以通过slotSharingGroup()进行自定义,Flink会将拥有相同slotGroup名称的Operators运行在相同Slot内,不同slotGroup名称的Operators运行在其他Slot内。

Operator Chain有三种策略ALWAYS、NEVER、HEAD,详细可查看org.apache.flink.streaming.api.operators.ChainingStrategy。startNewChain()对应的策略是ChainingStrategy.HEAD(StreamOperator的默认策略),disableChaining()对应的策略是ChainingStrategy.NEVER,ALWAYS是尽可能的将Operators chaining在一起;在通常情况下ALWAYS是效率最高,很多Operator会将默认策略覆盖为ALWAYS,如filter、map、flatMap等函数。


作者:herokang



slot 并行 flink

需要 登录 后方可回复, 如果你还没有账号请 注册新账号