图片来自unsplash
长文预警, 全文两万五千多字, 37页word文档的长度
(略有杂乱,有些非常复杂的地方可能需要更多的例子来说明,使得初学者也能很容易看懂,但是实在花的时间已经太多太多了,留待后边利用起碎片时间一点点修改吧。。。。毋怪。。)
分布式最难的2个问题
1. Exactly Once Message processing
2. 保证消息处理顺序.
我们今天着重来讨论一下
为什么很难
怎么解
前言
就作者学习流系统的感受来看, 流系统有2个难点, 第一是end to end consistency,或者说exactly once msg processing; 第二则是event time based window操作; 本来作者想用一篇文章同时概括和比较这2点,无奈第一点写完, 文章已经长度爆炸。于是分开2篇, 此为上篇, 着重于从分布式系统的本质问题出发, 从最底层的各种"不可能", 和它们的解(比如:consensus协议)开始, 一层一层的递进到高层的流系统中, 如何实现容错场景下的end to end consistency,或者说exactly once msg processing。
目录
流系统的具体对比在“9.流系统的EOMP”这一节, 前边都是准备知识...
一些术语
圣光(广告)不会告诉你的事
几个事实
Liveness和Safety的取舍
绝望中的曙光
Zombie Fencing
三节点间的EOMP
加入节点状态的三节点间的EOMP
流系统的EOMP
异步增量checkpointing
系统内与系统外
Latency, 幂等和non-deterministic
REFERENCE
一些术语
端到端一致性end to end Consistency一致性其实就是业务正确性, 在不同的业务场景有不同的意思, 在"流系统中间件"这个业务领域, 端到端的一致性就代表Exact once msg processing, 一个消息只被处理一次,造成一次效果; 注意: 这里的"一个消息"代表"逻辑上的一个", 即application对中间件的期待就是把此消息作为一个来处理, 而不是指消息本身的值相等, 比如要求计数+1的一个消息, 消息本身的内容可能一模一样, 但是application发来2次相同消息的"本意"就是要计数两次, 那么中间件就应该处理两次, 如果application由于超时重发了本意只想让中间件处理一次的+1操作, 那么中间件就应该处理一次; 中间件怎么能区分application的"本意"来决策到底处理一次还是多次, 是end to end consistency的关键.
EOMP
由于Exactly once msg processing太经常出现, 我们用EOMP来代替简写一下;
容错failure tolerance为了方便讨论,后边谈到failure, 我们指的都是crash failure, 你可以想象是任何可以造成“把机器砸了然后任何本地状态丢失(比如硬盘损坏)一样效果的情况出现"; 在今天的虚拟云时代,这其实很常见,比如container或者虚拟机被resource manager突然kill掉回收了, 那么即使物理机其实没有问题, 你的application的逻辑节点也是被完全销毁的样子;
容错在end to end Consistency的语义下,是指在机器挂了,网络链接断开...等情况下,系统的运算结果和没有任何failure发生时是一摸一样的.
Effective once msg processing(应该翻成有效一次性处理?)后边我们可以看到, 保证字面上的Exact once msg processing(即整个系统在物理意义上真的只对消息处理一次), 这在需要考虑容错的情况下是不可能做到的; Effective once msg processing是一个更恰当的形容,而所有号称可以做到EOMP的系统,其实都只是能做到Effective once msg processing; 即:中间件, 或者说流处理framework可能在failure发生的情况下处理了多次同一个消息,但是最终的系统计算 果和没有任何failure时, 一个消息真的只处理了一次时计算的结果相等; 这和幂等息息相关;
幂等Idempotent一个相同的操作, 无论重复多少次, 造成的效果都和只操作一次相等; 比如更新一个keyValue, 无论你update多少次, 只要key和value不变,那么效果是一样的; 再比如更新计数器处理一次消息就计数器+1, 这个操作本身不幂等, 同一个消息被中间件重"发+收"两次就会造成计数器统计两次; 而如果我们的消息有id, 那么更新计数器的逻辑修改为, 把处理过的消息的id全记录起来, 接到消息先查重, 然后才更新计数器, 那么这个"更新计数器的逻辑"就变成幂等操作了;
把本不幂等的操作转化为幂等操作是end to end consistency的关键之一.
确定性计算deterministic和幂等有些类似, 不过是针对一个计算; 相同的input必得到相同的output, 则是一个确定性(deterministic); 比如从一个msg里计算出一个key和一个value, 如果对同一个消息运算无数次得到的key和value都相同, 那么这个计算就是deterministic的, 而如果key里加上一个当前的时钟的字符串表示, 那么这个计算就不是确定性的, 因为如果重新计算一次这个msg, 得到的是完全不同的key;
注意1: 非确定性计算一般会导致不幂等的操作, 比如我们如果要把上边例子里的keyvalue存在数据库里, 重复处理多少次同一个msg, 我们就会重复的插入多少条数据(因为key里的时间戳字符串不同);
注意2: 非确定性计算并非必然导致不幂等的操作,比如这个时间戳没有添加在key里而是添加在value里, 且key总是相同的, 那么这个计算还是"非确定性"计算; 但是当我们存数据的时候先查重才存keyvalue, 那么无论我们重复处理多少次同一个msg, 我们也只会成功存入第一个keyValue, 之后的keyValue都会被过滤掉;
支持非确定业务计算的同时, 还能在容错的情况下达成端到端一致性, 是流系统的大难题, 甚至我们今天会提到的几个state of art的流系统都未必完全支持; (好吧Spark说的就是你)
圣光(广告)不会告诉你的事
分布式系统最tricky的问题就是, 问题看起来很普通很简单; 一些问题总是看起来有简单直接的解法,而一个"简单解"被人查出问题时,也总是看起来可以很简单的就可以把这个挑出的edge case很简单的解决掉; 然而我们会立刻发现解决这个edge case而引入的新步骤会引发新的问题... 如此循环, 直到"简单"叠加到"无法解决的复杂".
由于人们对这些问题的"预期是简单的", 所以很多书, online doc, 都大大简化了对问题的描述和对问题的分析; 最普遍的是对failure recovery的介绍, 一般只会简单的写"failure发生时, 系统会怎么recovery", 但是完全不提怎么检测failure和“根本不可能完美检测到failure”这个分布式系统的基本事实, 从而给了读者“failure可以完美检测”的错觉;
这是因为一来说清楚各种edge case会大大增加文档的复杂性, 另外一点是写了读者可能也看不明白, 还有就是广告效应, 比如真正字面意义的exactly once msg processing是不存在的, 但是所有其他做到effective once msg processing的系统都说自己可以支持exactly once, 那自己也得打这个广告不是; 还有就是语焉不详, 比如某stream系统说自己可以实现exactly once msg delivery, 别看delivery和processing好像差不多, 这里边的用词艺术就有意思了,delivery是指消息只在stream里出现一次, 但是在stream里只出现一次的消息却无法保证只被consume一次确根本不提; 再比如某serverless产品处理某stream的消息, 描述是保证旧的消息没有处理之前不会处理新消息, 你会想, 简单描述成保证消息按顺序处理不是一样么? 其实差大了去了, 前者并没有屏蔽掉旧消 突然replay, 覆盖掉新消息的处理结果的edge case, 而这个事实甚至颠覆了很多使用这个服务的Sr. SDE的对其的认知;
没有理解分布式系统的几个简单的本质问题之前, 你读文档的理解很有可能和文档真正精准定义的事实不符; 且读者对“系统保证”的理解, 往往会由于文档"艺术"定义的误导, 而过多的假设系统保证的"强", 直到被坑了去寻根问底, 才会收到"你误读了文档的哪里的详细解释";这是分布式系统"最难的地方在最普通的地方"的直接结果之一;
个人认为最好的办法就是去理解分布式系统软件算法所能达到的上限=>关于各种impossibility的结论的论文,然后去学习克服他们的方法的论文; 这样, 我们才能从各种简化了的 tutorials里, 从API中, 从各种云服务, 框架的广告词背后, 发现“圣光不会告诉你的事", 和"这个世界的真相";(从广告和online doc天花乱坠的描述中看到分布式系统设计真正的取舍, 这是区分API调包侠和分布式系统专家的分水岭之一); 而不是“简单的信了它们的邪”; 而下边,就是学习分布式系统,你所需要了解的最重要事实中, 和end to end consistency相关的几个;
几个事实
不存在完美的failure detector
很多关于分布式系统的书上都会说,当failure发生时系统应该怎么做来容错, 就好像可以准确的检测到failure一样; 然而事实是, 在目前互联网的物理实现上(share nothing architecture, 只靠网络互联,不直接共享其他比如内存物理硬盘等),我们无法准确的检测到failure;
简单来说,就是当我们发现一个node无反应的时候,比如ping它,给它发消息,request,查询,都没有反应,我们无法知道,这到底是对方已经停止工作了,还是只是处理的很慢而已; 无法制造完美的failure detector, 即使在今天也是分布式系统的基础事实; 本文无意在基础事实上多费唇舌, 无法接受此事实者可以去翻相关论文; ╮( ̄▽ ̄"")╭
Essentially, the impossibility results for Consensus and Atomic Broadcast stem from the inherent difficulty of determining whether a process has actually crashed or is only “very slow”. [30]不存在完美的failure detector, 所导致的几个颠覆你认知的问题:
分布式共识问题"Consensus"在"不存在完美的Failure Detector的情况下"不可解“, 这又叫做FLP impossibility[36], 可以说是上世纪,奠定分布式系统研究基石, 方向的一篇论文; 即: 在理论上, 在分布式环境里(更准确说应该是异步环境里), 在最多可能出现 个crash failure的强假设下, 不存在任何一个算法可以保证系统里的所有的"正常"节点对某一信息有共识; 对于"共识"你可以理解为一个数据一摸一样的备份在多个节点上; (那么paxos, raft这种consensus协议是怎么回事呢? 稍后会解释)
在分布式环境, 连分配只增序列号这件事都很难(即不同的进程去向一个系统申请序列号, 从0开始不断增加, 保证process得到的序列号不能重复), 因为本质上这是一个consensus问题, 后边可以看到, 能够分配高可用性的global序列号(epoch id), 是解决zombie leader/master/processor的问题时的一大助力;
在保证liveness的情况下(即检测到失败就在另外的机器重启逻辑节点), 无法保证系统中的Singleton角色“在同一时间点”只有一个; 比如在有leader概念的分布式系统里, 要求任意时间点只有一个leader做决定, 比如HBase需要只能有一个Region Server负责某region的写操作; 再比如kafka或者Kinesis[22]里需要只能有一个partition processor接受一个stream partition的信息并且采取行动; 而事实是, 任何云服务和现有实现, 都无法在物理上保证“在同一时间点”, 真的只有一个这样的逻辑角色存在于机群中; 这就牵涉到一个概念=> Zombie Process.
Zombie process, 由于没有完美的failure detector, 所以即使几率再低, 只要时间够长, 需要failure detection的用例够多, 系统不可避免会错误的判断把一个并没有真正crash掉的process当作死掉了; 而如果系统需要保持高可用性,需要在检测到crash的时候,在新的机器上启动此process继续处理,那么当failure detector出错,则会发生新老process共同工作的问题,此时,这个老的process就是zombie process;
严重注意,在分布式系统里,我们需要单一责任的一个节点/processor/role来做决策或者处理信息时,我们要么不保护系统的高可用性(机器挂了就停止服务),要么解决zombie process会带来的问题;高可用性的系统中, zombie无法消除;这关系到分布式系统设计里的一个核心问题:liveness和safety的取舍;
Linveness和Safety的取舍
在缺乏完美的failure detector的情况下, 对方迟迟不回信息(ping它也不回), 不发heartbeat, 那么本机只有2个选择: 1. 认为对方还没有crash, 持续等待; 2. 认为其crash掉了, 进行failover处理;
选择1伤害系统的liveness, 因为如果对方真的挂了,我们会无限等待下去, 系统或者计算就无法进行下去; 选择2伤害系统的safety, 因为如果对方其实没有crash, 那我们就需要处理可能出现的重发去重, 或者zombie问题, 即系统的逻辑节点的“角色唯一性“就会被破坏掉了;
越好的liveness要求越快的响应速度, 而“100%的safety“的意义, 则因系统的具体功能的不同而不同, 但一般都要求系统做决定要小心谨慎, 不能放过一个edge case, 穷尽所有必要的检查来保证"系统不允许出现的行为绝对不会发生"; 在consensus的语义下来说, safety就是绝对不能向外发出不一致的决定(比如向A说决定是X, 后来向B说决定是Y);
可以看到, 系统的edge case越多, safety越难保证, 而edge cases的全集只是可能发生的情况的集合, 而某一次运行只会发生一种情况(且大概率是正常情况); 如果系统不检查最难分辨最耗时的几种小概率发生的edge case, 那么系统大概率(甚至极大概率)也可以完美运转毫无问题几个月, 运气好甚至几年; 这样降低了系统的safety(不再是100%), 但是提高了系统的响应速度(由于是概率上会出问题, 所以即使降低了safety保证, 也不是说就一定会出问题, 只是你把系统的正确性交给了运气和命运); 而如果系统保证检查所有的edge case, 但是系统99.9999%的概率都不会进入一些edge cases, 那么这些检查就会阻碍正常情况的 算速度; Liveness和Safety, 这是分布式系统设计的最基本取舍之一;
而FLP则干脆说: 在分布式consensus这个问题里, 如果你想要获得100%的系统safety, 那么你绝对无法保证系统liveness, 即:系统总是存在活锁的可能性, 算法设计只能减小这个可能性, 而无法绝对消除它;
更多的safety VS. liveness 取舍的例子Kubernetes StatefulSet, 简单说是可以给容器(pod/container)指定一个名字的, 且保证全cluster总是只有一个容器可以有这个名字, 这样application就可以通过这个保证来指定机群中的逻辑角色, 且用这个逻辑容器中保存一些状态; (一般的replicaSet会load balance连接或请求到背后不同的节点, 你的一个请求要求在server本地存一些状态, 下一个请求未必还会到同一个server)
When a stateful pod instance dies (or the node it’s running on fails), the pod instance needs to be resurrected on another node, but the new instance needs to get the same name, network identity, and state as the one it’s replacing. This is what happens when the pods are managed through a StatefulSet. [37]Kubernetes StatefulSet在liveness和safety里选择了safety, 当statefulSet所在的的物理节点"挂了"之后, kubernetes默认不会重启这个pod到其他节点去, 因为它无法确定这个物理节点到底死没死, 为了保证safety它选择放弃了liveness, 即系统无法自愈, StatefulSet提供所提供的服务不可用, 直到靠人干预来解决问题;
([38] P305: 10.5. Understanding how StatefulSets deal with node failures)Akka Cluster也做了相同的选择, 在cluster membership管理中,有一个auto-downing的配置, 如果你打开它, 那么cluster就会完全相信Akka的failure detection而自动把unreachable的机器从cluster中删去, 这意味着一些在这个unreachable节点上的Actor会自动在其他节点重启; Akka Cluster的文档中, auto-downing是强烈不推荐使用的[38], 这是由于Akka Cluster提供的很多feature要求角色的绝对单一性, 比如singleton role这个功能, 在保证“cluster里只有这一个节点扮演这个actor"(safety), 和保证"cluster里总要有一个节点扮演这个actor"(liveness) 中, 选择了safety, 即保证at most one actor存在于cluster中, 一旦次actor的节点变成unreachable(比如机器真的挂了), 那么Akka也无能为力, 只能傻等这个节点回来或者人来干预决策:
The default setting in Akka Cluster is to not remove unreachable nodes automatically and the recommendation is that the decision of what to do should be taken by a human operator or an external monitoring system. [29]一个商用的Akka拓展(Akka Split Brain Resolver)提供了一些智能点的解决方案(基于quorum), 有兴趣的同学可以看引用文档[29];
This is a feature of the Lightbend Reactive Platform. that is exclusively available for Lightbend customers.[29]为什么Kubernetes和Akka不能同时保证safety和liveness呢?这是因为这两个作为比较底层的平台, 他们需要对上层提供非常大的自由性, 而不能限制上层的活动; 比如kubernetes没有规定用户不能在pod上跑某种程序, Akka也没有规定用户不能写某种actor的code; 这样, 在不限制自己处理能力的同时要保证任何行为都看起来exactly happen once(因为语义上singleton节点只有一个, 那么就不能让用户写的任意单线程程序出现多节点平行执行的外部效果), 而这对中间件来说是不可能的, 这就引出了另外一篇论文: end to end argument[27], 作者已经写过一篇文章详细介绍end to end argument(阿莱克西斯:End to End Argument(可能是最重要的系统设计论文)), 这里不在赘述; 后边我们可以看到Flink, Spark等流系统为了保证exactly once msg proces sing需要怎样和end to end argument 搏斗;
可以同时保证safety和liveness么
取决于具体情况下对safety和liveness的具体要求, 在流处理的情况下, 至少本文提到的4种流系统都给出了自己的解; 请耐心往下阅读
绝望中的曙光
可解也不可解的分布式consensus由于异步环境下, 钉死了我们不可能有一个完美不犯错的failure detector; 这篇著名的论文Unreliable Failure Detectors for Reliable Distributed Systems[30] 详细描述了即使我们用一个不准确的failure detector, 也可以解决consensus的方法; 但是它并没有推翻FLP impossibility的结论:Consensus还是并非绝对可解; 但是, 如果我们对需要consensus的计算加一个限制,则Consensus可解;
这个限制是: 计算和通讯只需要在"安全时间"内完成即可, 对[30]提供的算法来讲, 提供consensus的系统需要在这段时间内"正确识别crash"即可,也就是说(1)识别出真正挂掉的node, 和(2)不要怀疑正确的node;
怎么理解呢, 这两个看似对立的概念: (1)consensus的有解(比如paxos协议)是对的, (2)consensus的无解证明:FLP impossibility也是对的; 要准确且简单的解释为什么它们都是对的有点难, 推荐还是看论文; 但是用比喻来解释的话, 根据[30], Consensus算法可以看作这样一个东西, 当系统出现crash, failure detector判断错误,或者网络突然延迟...等时候, 算法会进入某种循环而不会轻易作出决定;
for example, there is a crash that no process ever detects, and all correct processes are repeatedly (and forever) falsely suspected — the application may lose liveness but not safety [31]而只要满足必要的条件时(计算和通讯只需要在"安全时间"内完成), 系统则可以跳出循环让机群达成一致[30,31];
(1) There is a time after which every process that crashes is always suspected by some correct process.而FLP impossibility则可以理解为挑刺儿的说, 那这个条件永远无法出现呢? 你的算法就活锁了呀(丢失liveness);
幸运的是, 在现实世界, 我们总可以对消息传递和处理来估计一个上限, 你可以理解为,只要消息处理总是在这个上限之内完成,那么consensus总是可以实现, 而消息处理的时间即使偶尔超过了这个上限, 我们的consensus协议也会进入安全循环自我保护, 从而不会破坏系统的safety, 而系统总是可以再次回归平稳(处理时间回归上限之内); 而FLP则是像说: 你无法证明系统总是可以回归平稳 (确实无法证明, 因为FLP的前提是异步模型, 而我们的真实世界更像是介于异步和同步模型之间的半同步模型, 我们只能说极大概率系统可以"回归平稳", 而无法证明它的绝对保证; =>可以绝对保证"上限"的模型一般称为同步模型);
其实用paxos来模拟出FLP的活锁的例子也很简单, 你把节点间对leader的heartbeat timeout时间设为0.001ms, 那么所有的节点都会忙着说服别的其他节点自己才是leader(因为太短的保活时间, 除了自己, 节点总是会认为其他的任意节点是leader时, leader死了), 那么系统就会进入活锁, 永远无法前进达成cluster内的consensus, 系统丧失liveness;
Zombie Fencing
即使consensus问题解决了, zombie节点也还是大问题, kubernetes和Akka可以选择避开zombie, 损失liveness; 然而对于绝大多数分布式系统来说, 是 必须直面zombie节点这个问题的,比如各种分布式系统的master节点, 如果master挂了整个系统不在另外的机器重启master,整个系统就可能变为不可用; 再比如kafka和Kinesis的单一partition只能有一个consumer, 如果这个msg consumer挂了不自动重启, 对消息的处理就会完全停止;
zombie是最容易被忽视的问题, 比如, 即使我们有了paxos, raft, zookeeper这种consensus工具可以帮我们做leader election, 也不要以为你的系统中不会同时有2个leader做决策了; 这是因为先一代的leader可能突然失去任何对外通信,或者cpu资源被其他进程吃光, 或者各种edge case影响, 使得其他节点无法和其通信, 新的leader被选出, 而老的leader其实还没死, 如果老的leader在失去cpu之前的最后一件事是去写只有leader才能写的数据库, 那么当它突然获得cpu时间且网络恢复正常, 那么这个以为自己还是leader的zombie leader就会出乎意料的去写数据库;
这曾经是HBase的一个重大bug[39, Leader Election and External Resources P105],
Apache HBase, one of the early adopters of ZooKeeper, ran into this problem in the field. HBase has region servers that manage regions of a database table. The data is stored on a distributed file system, HDFS, and a region server has exclusive access to the files that correspond to its region. HBase ensures that only one region server is active at a time for a particular region by using leader election through ZooKeeper...(解释不动了, 大家看英文吧...)
其实对付zombie已经是分布式系统的共识了,也有很多标准的解法,以至于各个论文都不会太仔细的去描述, 这里简单介绍几种方法:
zombie fencing设计的关键点在于如何阻止已经“成为zombie的自己”搞乱正常的“下一代的自己”的状态;毕竟无论是zombie还是新的要取代可能死掉的上一代的"下一代", 大家跑的都是相同的代码逻辑,也就是说这同一段代码, zombie来跑就"不能过:"(比如不能对系统的状态造成影响), 但是"下一代"来跑, 就可以正常工作;这一般需要满足以下几点:
如何正确区分“正常的下一代”(由于怀疑当前的节点已经死掉了,所以重新创建和启动的新一代逻辑节点)和“zombie”(怀疑错误,当前节点并没有死掉,但是新一代节点已经创建并启动,当前节点成为大家都以为死掉但是还活着的zombie)一般需要一个多机复制且稳定自增的epoch number来确定新老逻辑节点;这个epoch number要在分布式环境中稳定自增,一般只能通过consensus协议来实现;否则要分配新一代epoch number时,由于管理epoch number的机群的failover造成分配了一个老的epoch number给新启动的“下一代”,那么zombie反而会有更大的epoch number,这就会造成整个系统的状态混乱;怎样的混乱在介绍完zombie fencing之后就显而易见了(因为所有其他节点都以为zombie死掉了, 把所有的最新操作和状态发给新节点,但是新节点却有一个比zombie还 小的epoch number, 从而被zombie fence掉, 而不是自己可以fence zombie)
会被zombie影响的系统需要特殊设计使得:当“新一代”注册后就拒绝“老一代的任何请求”,特别是写入请求;也就是具体的利用epoch number的zombie fencing的实现; 一般需要具体情况具体分析;
如果被影响的系统是自己的一个microservice,那么可以随意设计协议来验证一个请求所携带的epoch number是不是最新的;而当这个被影响的系统是一个外部系统, 比如是业务系统需要用到的一个数据库,由于你没法改数据库的代码和数据库client与server之间的协议, 那么就要利数据库所提供的功能或者说它的协议来设计application层级的zombie fencing协议;比如对提供test and set,compare and swap的kv数据库来说,application设计自己的业务表时,要求所有的表都必须有一个epoch字段,而所有的写入都必须用test and set操作来检测当前epoch字段是否比要写入的请求的epoch字段大或相等, 否则拒绝写入; 这样, 只有"下一代"可以更改zombie写入的数据, 而zombie永远无法更改"下一代"插入或者更新过的数据;
另一方面,很多时候"下一代"需要读取上一代的信息,继承上一代的数据,然后继续上一代的工作;那么如果刚读取完数据,zombie就改变了数据,那么"下一代"对于当前系统状态的判断就会出差错;一个general的解决的方法也很简单,要读先写,“下一代”开始工作前, 如果要先读入数据了解“系统的当前状态”,必须先改变数据的epoch number为自己的epoch number(当然要遵从只增更改原则test and set,如果发现当前数据的epoch number已经比自己的epoch number还大了,则说明自己也已经是zombie了,更新的"下下一代"已经开始工作), 更改数据的epoch number成功之后,再读入数据,就可以保证比自己老的zombie绝对不可能再更改这个数据,而现在读入的数据可以体现系统的最新状态,从而完成对"老一代"数据的继承;而在增加epoch number之前所有被写入的数据;这里即使是"新一代"启动之后, 读取系统状态之前被zombie写入的数据, 都可以看做老一代的合法数据,只要被新一代开始工作前继承读入即可; 我们所要避免的是"新一代" 所读取的事实被zombie所更改; 而不是在物理时间的意义上在"新一代"启动时就立刻阻止zombie的所有系统改动;
zombie fencing的设计取决于分布式系统的具体情况,比如业务逻辑可能更改的数据范围可能是几百万几千万的数据记录,那么这也意味着zombie可能会修改的数据范围非常大,那么要求"下一代"在开始工作前更改所有数据的epoch number就很不现实;
对于zombie的影响的耐受性也会影响zombie fencing的设计,比如如果"下一代"只需要自己所接触的有限数据在特定时刻之后不被zombie影响就能正确工作, 那么只要在"下一代"需要接触特定数据时才更改此数据的epoch number来屏蔽zombie即可,那么即使业务可能修改的数据范围很大,简单的更改数据的epoch number也还是可以接受的解决方案;
最糟糕的情况,如果"zombie"可能会插入新的数据, 而"下一代"的正常工作 需要不能有非法的新数据插入(比如下一代开始工作前先统计所有资源的个数,然后开始基于这个事实和"只有自己才能更改资源"的假设,作出各种决策, 而此时zombie突然插入了一条新资源记录或者资源使用记录...),如果"新一代"完全无法预测zombie会插入什么记录,要阻止zombie随意插入数据,“新一代”就只能在利用predicate lock来防止新纪录插入,且不说很多数据库根本不支持“锁住不存在的数据”的predicate lock,就算支持此功能的数据库也很有可能是使用表级锁来锁住整张表;如果数据表 设计成了需要共享给多个节点使用(比如一张资源表,不同的singleton worker负责维护不同的资源范围),那么表级锁就会妨碍其他worker的工作;
zombie fencing的设计在于如何引入简单的fencing点, 对"新一代"畅通无阻,但是却可以阻止zombie的异常活动, 如果协议设计使得"新一代"可以很容易制造这个fencing点, 则"新一代"在启动或者需要的时候加入fencing点即可, 比如前边说的任何数据都要附带一个epoch值,任何数据写入都要用test and set来对比数据的当前epoch值和请求的epoch值; 对于上文的随机插入的业务需求, 可以要求业务逻辑插入任何数据之前,先在一个注册表的属于自己epoch的一行里记录自己要写的数据的id, 且在记录的时候用test and set来检测自己这一行数据的active值是否被更改为disable了;这样就相当于引入了一个更简单的fencing点,因为"下一代"只要在注册表里把所有上一代的记录写为disable, 就可以阻止zombie的未来任何活动,但是此时无法阻止zombie的最后一个注册的数据插入, 但是"下一代"可以简单的读注册表得知这个数据的id, 从而对这个"最后的zombie写入"采取相应的策略(继承,删除, 甚至fencing, 比如这个id并不存在,那么无法得知是zombie真的在写之前死了所以永远不会插入这个记录了,还是zombie只是卡了, 那么"下一代"可以用自己的epoch和zombie注册的id先插入一条记录来占位,这样无论zombie是真的死了还是卡了,都无法再写入这个数据了);这样,我们就引入了一个连数据插入都可以fencing的fencing点;
Zombie fencing一般都是以上这些套路, 用consensus协议确定epoch number区分"下一代"还是zombie,这个epoch number一般也可以称为fencing token, 通过把fencing token分发给需要拒绝zombie的service,把fencing token和需要保护的数据(防止被zombie修改)存在一起;所以一般论文[7, 26]里只会简单的提到epoch或者sequencer等概念,基本都是zombie fencing的fencing token
三节点间距的EOMP
三点为 (上游/input提供端)=> (当前计算节点/计算结果发送端) =>(下游/计算接收端)
如果我们考虑必须保证系统的高可用性,即检测到任意process的failure,都会由一个(绝对不死)的高可用的JobManager或者MasterNode,来重启(可能在另外的node)这个process, 所以我们定义这种即使所在的host挂掉, 也会不断重新在其他host上重启的process为逻辑process; 这时我们要面临几种可能造成inconsistent的情况:
"计算接受端"没有成功ack"计算结果发送端"的消息,一般表现为发送端的等待ack 超时;根据之前的讨论,接收端有可能把消息处理完毕了(ack的消息丢失,或者刚处理完消息还没发ack就挂了…等情况),也可能没有处理完毕(没接到或刚接到消息就挂了…等情况);
这种情况发送端可以重发信息, 而发送端是需要“上游input提供端”提供某种数据然后进行某种计算后产生的这个消息/计算结果(设为outputA), 那么"计算结果发送端"有两个策略:
策略1: 利用存储计算结果来尽量避免重算
要实现上下游exact once processing,需要实现4个条件
(a. 结果高可用, b.下游去重, c. 上游可以replay, d. 记录上游进度)
a. 要求结果高可用, 应对timeout时, “下游计算接收端”其实并没有成功处理"计算结果发送端"的计算结果的情况(比如下游挂了), 这时"计算结果发送端"可以把计算的结果存储在高可用的DataStore里(比如DynamoDB,Spanner…或者自己维护的多备份数据库); 这样超时只要重发这个计算结果即可, 自己甚至可以开始去做别的事情, 比如处理和计算下一个来自“上游/input提供端“的event, 而已经被“下游计算接收端”ack的"计算结果"则可以清理,一 由异步的garbage collection清理掉. 注意, 由于存在存储失败的可能性, 或者刚计算完结果还没来得及存储就挂掉重启的可能,我们无法真的保证避免重算;详见:无法避免的重算 的例子
b.下游去重,应对timeout时下游其实已经处理完毕消息的情况
一般的解决方案:当逻辑接收端不固定, 比如发送端要根据计算出来的outputA的某key字段把不同的key发送给负责不同key range(也就是partition)的多个"下游计算接收端"; 只需要一个sequenceId就可以实现接收端的消息去重;接收端和发送端各维护一个partition level的sequenceId即可;这样当发送端收到当前message sequenceId(假设为n)的Ack才发下一个sequenceId为n+1的信息,否则就无限重试;而接收端则根据收到的消息的id是不是已经处理过的最大id+1来判断是这是不是下一个message。
Google MillWheel的特例:Google MillWheel做出了一个很有意思的选择,发送端完全不维护sequenceId,而是为每一个发出的message生成一个全局唯一的id,下游则需要记住"所有"见过的id来去重,但是这样会造成大量查询io和存储cost,所以需要另外的方案来解决性能和下游没有无限的存储所以"不可能记住所有id"的问题。这个例子比较特殊,有兴趣的同学可以查阅[4,7]
c. 要求触发本次计算的“上游input提供端”可以replay input event,否则刚接到event还没计算就挂掉重启, 则event丢失;
无法避免的重算:任何时候计算没完成,或者计算完成后但是成功储存前(a.结果高可用的需求), 计算节点fail掉重启, 我们都需要replay上次计算过的input event,所以由于计算结果都还没存成功,所以从物理上讲, 此时我们还是重算了的; 所以即使我们采用把计算结果记录下来的策略, 我们无法从物理意义上真正避免重算, 我们避免的是有多个"重复的"成功计算结果提交给下游;而当计算不是deterministic的, 这多个“重复的”计算结果可能是不同的值发送给不同的下游((比如按照计算结果的key发送给下游不同的partition); 那么下游就会处理同一个event所产生的本应只有一个的计算结果两次,且由于非确定性计算的原因,这两个计算结果不一样; 这就会造成event不是EOMP的问题; (不仅在物理上计算了2次, 在效果上也影响了2次下游的计算, 打破的effective process once的要求)
d: 要求记录event处理的进度, 并保证储存计算结果不出现重复; 记录event处理的进度, 使得trigger本次计算的"event"可以被屏蔽(比如, ack“上游input提供端”, 告知其input event处理完毕, 可以发下一个了), 来避免计算的re-trigger; 这要求以下策略2选一
记录event处理的进度, 和把计算结果存在高可用存储里的操作是一个原子操作, 要么一起成功, 要么一起失败; 这种策略可以保证当计算结果储存下来, 此计算不会replay了;
或者存储计算结果是一个幂等操作,那么可以先存计算结果,再记录event处理进度,一旦计算计算结果成功但是记录event处理进度失败,重新计算上游的同一个event并储存计算结果也不会引起问题。
否则要么计算没存event就被屏蔽掉了, 要么多次计算结果存储在DataStore里造成下游的重复信息; 注意, 此时下游是无法分辨这是重复信息的, 因为这是datastore里的"2条的记录", 将会获得不同的message id;
幂等和end2end argument: 所以实现原子操作就不需要幂等了么? 是也不是, 在业务层是的, 比如要实现业务层的幂等,我们可以在存计算结果到datastore里的时候把一个与触发本次计算的event的唯一id记录在一起,这样我们每次存的时候就可以使用乐观锁的方式test-and-set, 来保证如果这个id在数据库里没有才插入(取决于业务,我们也可 以用这个id当主key来,那么即使多次写入同样的内容也没关系=>要求计算是deterministic的;) 如果我们保证触发计算的event的"屏蔽"和计算结果的储存是一个原子操作,那么我们就不需要上边这种复杂的存储策略,因为一旦计算结果存储成功,触发计算的event必定被"屏蔽"掉了, 而如果没存储成功, 则event一定会replay来重试;
然而在传输层却不是的,比如储存数据库的tcp有可能丢包重发,依靠tcp的传输层id自动去重,实现tcp的幂等;
策略2: 完全依赖重算
高可靠重发的问题是,所有信息都必须先记录在高可用性的DataStore里, 相对于重新计算,重发需要的网络IO, 存储,状态管理的cost是很高的;而如果触发计算的event可以replay的话(其实不管重算还是不重算,为了防止“刚接到event, 计算节点就挂掉的情况”, 我们都要求event可以replay), 我们就可以选择重算然后重发来代替存储计算结果的重发;重算需要2个条件:
计算需要是 deterministic 的,用完全一样的数据,必须算出完全相同的结果,否则,当计算结果所需要发送的逻辑下游是由计算结果所决定的情况下(比如按照计算结果的key发送给下游不同的partition) 那么non-deterministic的重算有可能把计算结果发给不同的下游,这样如果重算发生时,下游(假设是节点A)其实已经成功处理完毕重算前上游发送的信息, 只是ACK丢失, 那么重算的结果却发送给了另外一个(节点B), 那么就会造成一个event造成了2个下游effect的结果, 引起一个event造成2次下游影响的结果, 违反EOMP的原则;
重算之前, 状态需要rollback到没有计算之前, 否则会影响需要状态的计算的结果正确性,如果状态更新非幂等,本次计算所做的状态更新也会更新多次;详见"加入节点状态的三节点间的EOMP"
(在多节点流计算里,要求上游可以重发,意味着上游把计算结果存下来了,或者上游可以重算,如果上游需要重算,那么上游需要上游的上游重发,那么上游的上游可以用储存的结果重发或者重算。。。以此类推)
(2种策略其实都有可能造成重算,也都对event replay有需求;为什么还要浪费资源去存储计算结果呢?这里边的重要区别是,当储存结束时,对触发本次计算的上游event的依赖结束了,而不稳定的下游不会造成额外的重算, 和对上游, 上游的上游....计算的"链式反应", 详见流的EOMP中的讨论)
加入节点状态的三节点间的EOMP
带状态的计算, 比如流计算的某中间节点需要统计总共都收到多少信息了, 每次从上游收到新信息, 都把自己统计的当前历史信息总数更新并发往下游节点, 那么这个"系统的历史信息"就是这个"统计消息总数"的逻辑节点的状态; 由于状态也需要高保活,所以它也一定需要储存在远端dataStore里, 这样储存状态的远端datastore就相当于一个特殊的下游; 不同点在于, 当采用策略2:重算, 而不存储中间计算结果的话, 重算时则需要datastore可以把它所记录的状态rollback到最初刚开始处理此event的那个点; 这里我们只能rollback, 而不能只是依靠幂等来保证“状态的更新是exactly once”的原因是, 节点在处理任意消息时的状态也和当前信息的数据一样是本次计算的input, 而更新后的状态则是本次消息处理的output, 如果重算时不rollback节点的状态, 那么我们就会用一个被本消息"影响过"的状态来进行计算, 而这是会违反exactly once msg processing语义的; 比如节点的本地状态是上次收到的信息的数据上记录的时间戳, 节点的运算是计算2个event数据之间的时间戳差距; 假设eventA发生在时刻0, eventB发生在时刻10, 那么eventB引发的计算应往下游发送10, 并把节点的 本地状态更新为10, 此时如果eventB的这个计算需要重算, 但是我们不rollback状态10回到0的话, eventB重算所得的结果就会变成0;
注意: 由于state更新也是处理event的"下游", 那么计算过程中的所有状态更新都可以算作“计算结果”的一部分, 所以当我们需要储存计算结果时,则需要把
(1)状态更新储存回高可靠的statestore里,
(2)记录event处理进度,
(3)把计算结果存在高可用存储里,
这3个操作作为一个原子操作(以后我们称之为"原子完成"来省略篇幅); 而任何时候需要重算的话, 状态必须恢复到处理event之前的样子。
加入state,我们需要把(d. 要求记录event处理的进度, 并保证储存计算结果不出现重复, 更改为 (d+. 要求记录event处理的进度, 并保证储存计算结果和state的更新不出现重复,
并加入要求(e. state需要在replay 上游event的时候rollback到处理event之前时的状态
这些要求稍有抽象,让我们看一下流系统一般怎么达成这些要求;
流系统的EOMP
考虑一个多节点的流系统,如果我们把上游所发来的计算结果当成前边所说的“触发计算的event”,而自己的发给下游的计算结果msg作为触发下游计算的event;那么我们就可以用上边的模型保证两两节点之间的exact once msg processing,从而最终实现端到端的exact once msg processing; 这就是Google MillWheel(DataFlow) 和Kafka Stream的解决方案; 他们都选择把每个节点的计算结果储存起来,并保证即使non-deterministic的计算, 也只有一次的计算会起作用, 而不会出现(策略2-1中提到的non-deterministic造成的不一致);他们的区别是
如何实现state和
如何实现接收端去重
如何实现“原子完成”
Google MillWheel(DataFlow)
每个节点维护一个用来去重的"已处理msgId"集, 从上游收到新msg之后, 检查去重 (b.下游去重)
开始计算, 所有的状态更新写在本地, 由于一个状态只有一个更新者(本计算), 所以可以在本地维护一个状态的view, 所有的更新只更新本地的view而暂时不commit到"remote的高可用DataStore", MillWheel用的BigTable;
计算完毕后, (1).所有的要发送的计算结果,(有一些可能是在计算过程中产生并要求发送的, 都会cache起来), (2)所有的state的所有更新, (3) 引发计算的msgId, 会用一个atomic write写在BigTable里。(a.要求结果高可用, d+.要求记录event处理的进度, 并保证储存计算结果和state的更新不出现重复)
当commit成功之后, ACK上游, 而由于上游也采用commit计算结果到BigTable里的策略,且只有当自己(这里)发出的消息ACK之后, 才会允许 garbage collection回收计算结果占用的存储, 所以在收到ACK之前, 上游的计算结果, 也就是当前计算所需要的msg, 都可以重发,直至本节点计算成功且commit结果 (c. 要求触发本次计算的event可以replay)
一旦计算过程中failure发生(比如机器挂了), 会在另外的host上重启本process节点,从BigTable恢复本地state和"用来去重的已处理msgId集", 由于上次计算的结果还没有commit, 所以满足(e. state需要在replay event的时候rollback到处理event之前时的状态)
新启动的运算节点在load本地状态之前先用自己的sequencer废掉现存的sequencer, 这样BigTable就可以block zombie计算节点的写;
Kafka Stream
Kafka Stream是建立在kafka分布式队列功能上个一个library, 所以在介绍kafka Stream之前, 我们先来讲一下Kafka
简单介绍Kafka Topic
Kafka的topic可以看作一个partition的queue, 通过发给topic时指定partition(或者用一个partitioner 比如按key做hash来指定使用那个partition), 不同的key的消息可以发送到不同的partition, 相同key的message则可以保证发送到同一个partition去, top ic里的信息可以靠一个确定的index来访问, 就好像一个数据库一样,所以只要在data retention到期之前,consumer都可以用同一个index来访问之前已经访问过的数据;
Kafka Transactional Messaging
前边说过, Kafka Stream是建立在kafka分布式队列功能上个一个library, 主要依靠kafka的Transactional Messaging来实现end2end exactly once msg processing;
Transactional Messaging是指用户可以通过类似以下code来定义哪些对kafka topic的写属于一个transaction, 并进一步保证tx的atomic和Isolation
producer.initTransactions(); try { // called right before sending any records producer.beginTransaction(); //sending some messages... // when done sending, commit the transaction producer.commitTransaction(); } catch (Exception e) { producer.close(); } catch (KafkaException e) { producer.abortTransaction(); }
Kafka transaction保证了, beginTransactions之后的, 所有往不同Kafka topic里发送的消息, 要么在commitTransaction之后才能被read-committed的consumer看到, 要么由于close或者failure而全部作废, 从而不为read-committed的consumer所见;
而kafka stream通过用kafka本身的分布式queue的功能来实现了state和记录处理event进度的功能; 因为
(1) 所有的要发送的计算结果(由于可以允许计算发不同消息给多个下游,所以可能发给不同的topic和partition)
(2)记录input event stream的消费进度,
(3)所有的state的所有更新,
这3点, Kafka Stream都是用写消息到kafka topic里实现的, (1)自不必说,本来就是往topic里写数据,(2)其实是写当前consume position的topic; (注意Kafka Stream的上下游消息传递考的是一个中间隐藏的Kafka topic, 上游往这个topic写, 下游从这个topic读, 上游不需要下游的ack,只要往topic里写成功即可, 也不需要管下游已经处理到那里了;而下游则需要维护自己"处理到那里了"这个信息,储存在consume position的topic, 这样比如机器挂掉需要在另外的host上重启计算节点,则计算节点可以从记录consume position的topic里读出自己处理到那里然后从失败的地方重洗开始) (3)其实是写一个内部隐藏的state的change log的topic,和一个本地key value表(也就是本计算节点的state); failover的时候, 之前的"本地"表丢失没关系, 可以冲change log里恢复出失败前确定commit的所有state;
(1)(2)(3)的topic都只是普通的Kafka topic; 只不过(2)(3)由Kafka Stream自动创建和维护(一部分用来支持高层API的(1)也是自动创建)
开始计算时, 在从上游的topic里拿msg之前, Kafka Stream会启动一个tx, 然后开始才开始计算, 此时tx coordinator会分配一个新的epoch id给这个producer并且以后跟tx coordinator通讯都要附带这个epochId
Kafka Stream的计算节点的上游信息都来自kafka topic的分布式partition queue, 且只接收commit之后的record, 在queue里的record都有确定的某种sequenceId, 所以只要计算节点记录好自己当前处理的sequenceId, 处理完一个信息就更新自己的sequenceId到下一个record, 且commit到可靠dataStore里, 就绝对不会重复处理上游event, 而只要没有commit这个位置则可以无数次replay当前的record; (b.下游去重, c. 要求触发本次计算的event可以replay)
在tx内部,每从上游topic里读一条信息就写一条信息到记录consume position的topic里, 每一个state的更改都会更新到本地的state(是一张表)里,且同时写在隐藏的changelog里; 计算过程中需要往下游发信息则写与下游联系的topic;
计算结束后, commit本次的tx, 由Kafka Transactional Messaging来保证本次tx里发生的所有(1)往下游发的消息, (2) 记录input event stream的消费进度,(3)所有的state的所有更新是一个原子操作, 由于结果都成功写入kafka topic,所以达到计算结果的高可用性 (a.要求结果高可用, d+.要求记录event处理的进度, 并保证储存计算结果和state的更新不出现重复)
计算过程中出现failure(比如机器挂了), 那么当计算重启,会重新运行initTransactions来注册tx, 此时tx coordinator会分配一个新的epoch id给此producer, 并且从此以后拒绝老的epoch id的任何commit信息来防止zombie的计算节点; tx coordinator同时roll back(如果上一个tx已经在prepare_commit状态, 继续完成transaction, 具体看下边Transactional Messaging这个章节); 如果rollback,那么input的处理进度, 状态的更改和往下游发送的信息都会rollback, 那么计算可以重新开始,就好像没有上次fail的失败一样; 如果上一个tx已经prepare_commit, 那么完成所有信息的commit; 此时当initTransactions返回,当前计算会接着上一个tx完成的进度继续计算;(e. state需要在replay event的时 候rollback到处理event之前时的状态)
Idempotent producer幂等producer主要解决这么一个问题: Kafka的消息producer, 也就是往Kafka发消息的client 如果不幂等, 那么因为Kafka的接受消息的broker和producer之间在什么是“重复信息”上没有共识的话,则broker无法分辨两个前后一模一样的消息, 到底是producer的本意就是要发两次,还是由于producer的重发(比如:producer在收到broker的"接受成功"的ack之前就挂了,所以不知道之前的消息有没有成功被broker接收, 因此重启后重发了此信息)。此时broker只能选择接受消息,这就造成了同一个消息的多次接受;
同时我们也要解决zombie producer的问题: 如果我们保证producer高可用, 重启我们认为fail掉的producer, 那么其实没死的zombie producer的信息则会造成,重复且乱序的发布消息; (由于zombie的存在, 会有2个producer同时发布我们以为只有一个producer会按顺序发布的消息,这样就无法保证顺序: 比如zombie在发送A, B, C...的时候, 新启动的producer也开始发送A, B, C... )
Kafka的解法:
用一个producer指定的固定不变的transactional.id(非自增id,叫producerName可能更好)来识别可能会在不同机器上重启的同一个logical producer; 相当于给producer起了一个logical name;
注册transaction.id来开始session, 而在session里此tx发来的消息都可以通过维护一个sequenceid来dedup.
非正常结束tx的话, 比如机器挂了, producer重启, 那么就会再次注册自己的transaction.id, 则标志前一个session失效, 而所有属于上一个session的信息全部作废(具体看下一节Atomic and Isolation), 这样就可以做到producer的zombie fencing
(Further, since each new instance of a producer is assigned a new, unique, PID, we can only guarantee idempotent production within a single producer session ---- Idempotent Producer Guarantees [26])Atomic and IsolationProducer Zombie fencing: 注册transaction.id会申请高可靠epoch id, broker和tx coordinator可以依此fencing zombie的任何写操作 (e.g. tx coordinator关闭tx);. Zombie fencing in confluent.io/blog/trans
多个Tx coordinator跑在kafka broker里, 写是按照tx.id hash给不同的Tx coordinator, 每一个tx coordintor负责subset的transactionlog的partition; 这样保证同一个logic produce启动的tx必定连接同一个tx coordinator; tx coordinator保证所有的状态都在的高可用高一致性的写在tx log里, (且用queue zombie fencing来保护自己的状态一致性, Discussion on Coordinator Fencing in [26]) (Discussion on Transaction-based Fencing, => 如果zombie不跟coordinator再联系,那么可以一直跟broker发垃圾信息... P39in [26])
Producer注册新的tx之后,在给任意topic的任意partition发消息之前,先跟tx coordinator注册这个partition,
当写完毕,producer给tx coordinator发commit,tx coordinator执行2PC,在transaction log里写prepare_commit, 这样就一定会commit了,因为producer 通知commit就代表所有的写已经写成功了, 这一步其实只是把决定记下来;
Tx coordinator联系所有的注册过的topic的partition,写一个commit marker进去。
当所有的marker写完,在transaction log里记录commit complete。
注意:当在第一步tx coordinator在发现新的重复transaction.id来注册时,会检查有没有相同的transaction.id下未关闭的tx,有的话发起rollback,先在transaction log里记下rollback的决定,然后联系所有的注册过的topic的partition, 写入一个ABORT marker;而如果此tx的状态已经时prepare_commit了,那么有可能tx coordinator在下边第6步联系所有partition来commit中间挂掉了,那么要接着完成这个commit过程;即roll forward而不是roll back,
Read_commit等级的consumer需要等待transaction有结果,consumer library读到任何与Transactional Messaging相关 信息,就开始进入cache阶段,并不会运行任何consumer端的计算,只有当读到commit mark,则把cache住的record依次交给consumer端的计算,而当读到ABORT mark,则把相关tx的record全部filter掉;注意: pending的tx会block所有Read_commit等级的consumer对topic的读;
在保证两两节点之间的EOMP来实现整个流的EOMP的模型里,如果我们某一个或多个节点的状态和计算结果根本不记录在高可用DataStore里,我们还是可以实现EOMP, 我们只需要(1)replay这个节点的上游来重算这个节点的状态和发给下游的计算结果, (2)下游去重;
如果上游也没计算结果记, 那么replay上游的上游即可, 如果上游的上游也没记....一直追溯到记录了计算结果的上游节点即可;
如果一直都没有failure,那么比起Dataflow和Kafka Stream那种记录所有计算结果的模型 我们少记录一些额外的计算结果和状态就减少了很多系统负载; 这就是重算与记录计算结果模型的结合;
重算与记录计算结果的结合考虑 A,B,C, D 4个节点, A的计算结果传给B, 而B则把一部分计算结果给C一部分给D, 如果B没有记录自己的output, 则Cfail掉之后需要replay上游的input,这就需要B的一些重算来重新制造C所需要的input, 即使B的input(即A)记录了所有的计算结果, 我们还需要"恰巧可以产生这些历史计算结果的"B的历史状态,才能重算出C所需要的input; (所以B必须保存历史状态或者用某种方法重建自己的历史状态才能保证可以重算C所需要的input)
如果C的状态也丢失了, 那么对上游的负担则更重些, B需要重新计算来提供所有的历史计算结果(即C的所有历史input)来让C重建自己的历史状态
可以看到, 任意一个节点的某状态S(n+1)是
(1)上一个历史状态S(n), 和
(2)从历史状态S0建立开始所接收到的信息M(n),
同时作为输入而得到的输出; 而这个过程中又会向下游发出一些计算结果O(n+1)
所以M(n) + S(n) => S(n+1) + O(n+1), 当下游crash重启需要O(n+1)时, 我们则有2种选择:
1.记录O(n+1),
2.不记录O(n+1)但是记住, O(n+1)是根据什么数据生成的
1.是记录计算结果, 2是重算; 两者并用的好处在于, 1可以异步batch进行而不需要节点必须等待O(n+1)记录成功才往下游发送O(n+1); 而2保证了当1还没有完成时, 系统也有足够的信息可以重建O(n+1);
这是一个链式反应, 当重算需要M(n)和S(n)时, 而如果M(n)并没有存则需要上游重算M(n), 上游还没存这些重算M(n)的信息则需要replay上游的上游来重算这些信息,这就是所谓的链式反应...;最极端的情况是什么都没存,那么需要从头开始跑我们的stream程序;
可以看到, 如果没有存中间计算结果或者状态, 那么当这个数据被下游重算需要的时候, 需要我们重算这个数据, 这就会产生对上游的计算结果或者状态的需求, 这就要求我们如果不存下这些数据, 我们就需要记住计算这个数据的数据依赖图, 所以要么把"中间"数据和状态存起来待用, 要么记住他们的数据依赖图; 而这些记录的中间结果只有当对其的所有依赖从计算图中消失时, 我们才可以垃圾回收/删除这些数据(比如所有基于某状态的计算结果都已经存下来了, 那么这个状态的数据就可以删除, 再比如某计算结果所引发的下游计算结果和状态都已经存下来了, 那么此计算结果的数据就可以删除了);从而不会造成储存数据爆炸;
这, 也就是Spark Streaming的解法;
SparkSpark有三种Stream...
(1)快要被deprecate掉的DStreaming [10, 14]
(2)新一代为了弥补和Flink之间差距的, 支持event time的Structural Streaming(可惜还是有很多不足, 具体的不同和哪里有不足, 要留到对比各个系统对event time和windows 操作的支持的对比, 也就是下篇来详细描述了) [12,13]
(3)实验中的Continuous Streaming(Spark Continuous Processing) [11, 20]
(3)还在实验状态, 基本上是把底层都改掉来使用了和Flink相同的Chamdy-Lamport算法[20], 但是貌似还有很多问题需要解决所以目前不支持EOMP, 这里不多聊了;
根据Structural Streaming的论文[12], (2)和(1)使用了相似的方法来保证EOMP, 但是其实作者发现(2)比起(1)还是有一些性能上的改进[21], 但是总体原则还是和(1)类似的利用一个重算关系图lineage来维护各个状态计算结果的依赖关系, 通过异步的checkpoint来截断lineage也就是各个节点状态和计算结果复杂的关系(比如一个数据如果已经checkpoint了, 那么它所依赖的所有状态和计算结果都可以在关系图里删去, 因为replay如果依赖于这个数据, 那么使用它的checkpoint即可, 而不需要知道这个数据是怎么算出来的, 如果还没checkpoint成功, 则需要根据数据依赖图来重算这个数据), 像这样利用checkpoint, 就可以防止lineage无限增长;
但是维护关系图需要利用micro-batch来平衡"关系维护"造成的cost, 否则每一条信息的process都产生一个新状态和新计算结果的话, 关系图会爆炸式增长(用micro-batch, 可能1000条信息会积累起来当作"一个信息"发给下游, 只需要在关系图里记录一个batch-id即可, 而不是1000个msg id, 对与状态来说也是这样,处理1000个msg之前的状态分配一个id, 处理这1000个信息之后的状态一个id, 而不需要记录1000个状态id, 同时他们之间的联系线也从1000条降低为1条;这样就大大减小了关系图维护的负担);
但这样造成的结果是micro-batch会造成很高的端到端处理的latency, 因为micro-batch里的第一条信息要等待micro-batch里的最后一条信息来了之后才能一起传给下游; 而这个等待是叠加的,当stream的层数越深,每一层的micro-batch的第一条信息都需要等待最后一条信息被处理完毕,相比在每一层都毫不等待,micro-batch造成的额外latency就会叠加式的增高;
注意, Spark Structured Stream提供了一种continuous mode[11,12,13,20]来替代micro-batch,解决了latency的问题,但是目前支持的operator很少,且不能做到exact once msg processing, 这里不多加讨论了(不过将来有望做成和flink一样的模式, 毕竟也用的Chandy-Lamport Distributed Snapshot algorithm). : Note that any time you switch to continuous mode, you will get at-least-once fault-tolerance guarantees.[13]
spark的micro-batch会造成严重的latency问题, 而Dataflow和Kafka Stream的方案要求记录每一个计算结果, 则会在大大增加系统负担的同时也会有不小的latency附加; 那么有没有一种方法可以不记录所有中间计算结果, 并且也不使用micro-batch呢?
我们来看看flink的艺术;
Flink如果我们不储存流系统中间