浅谈消息队列

Vivienne ·
更新时间:2024-09-21
· 553 次阅读

消息队列列的⼀一些要点:服务质量量(QOS)、性能、扩展性等等,下⾯面⼀一⼀一探索这些概念,并谈谈在特定 的消息队列列如kafka或者mosquito中是如何具体实现这些概念的。

服务质量量
服务语义
服务质量量⼀一般可以分为三个级别,下⾯面说明它们不不同语义。
At most once
⾄至多⼀一次,消息可能丢失,但绝不不会重复传输。 ⽣生产者:完全依赖底层TCP/IP的传输可靠性,不不做特殊 处理理,所谓“发送即忘”。kafka中设置 acks=0 。 消费者:先保存消费进度,再处理理消息。kafka中设置 消费者⾃自动提交偏移量量并设置较短的提交时间间隔。


At least once
⾄至少⼀一次,消息绝不不会丢,但是可能会重复。 ⽣生产者:要做消息防丢失的保证。kafka中设置 acks=1或 all 并设置 retries>0 。 消费者:先处理理消息,再保存消费进度。kafka中设置消费者⾃自动提交偏 移量量并设置很⻓长的提交时间间隔,或者直接关闭⾃自动提交偏移量量,处理理消息后⼿手动调⽤用同步模式的偏移 量量提交。


Exactly once
精确⼀一次,每条消息肯定会被传输⼀一次且仅⼀一次。 这个级别光靠消息队列列本身并不不好保证,有可能要依 赖外部组件。 ⽣生产者:要做消息防丢失的保证。kafka中设置 acks=1 或 all 并设置 retries>0 。mosquito中通过四步握⼿手与DUP、MessageID等标识来实现单次语义。 消费者:要做消息防重复的保 证,有多种⽅方案,如:在保存消费进度和处理理消息这两个操作中引⼊入两阶段提交协议;让消息幂等;让 消费处理理与进度保存处于⼀一个事务中来保证 原⼦子性 。kafka中关闭⾃自动提交偏移量量,并设置⾃自定义的再 平衡监听器器,监听到分区发⽣生变化时从外部组件读取或者存储偏移量量,保证⾃自⼰己或者其他消费者在更更换 分区时能读到最新的偏移量量从⽽而避免重复。总之就是结合 ConsumerRebalanceListener 、 seek 和⼀一 个 外部系统 (如⽀支持事务的数据库)共同来实现单次语义。此外,kafka还提供了了GUID以便便⽤用户⾃自⾏行行实 现去重。kafka 0.11版本通过3个⼤大的改动⽀支持 EOS :1.幂等的producer;2. ⽀支持事务;3. ⽀支持EOS的 流式处理理(保证读-处理理-写全链路路的EOS)。 这三个级别可靠性依次增加,但是延迟和带宽占⽤用也会增加, 所以实际情况中,要依据业务类型做出权衡。


可靠性
上⾯面的三个语义不不仅需要⽣生产者和消费者的配合实现,还要broker本身的可靠性来进⾏行行保证。可靠性就 是只要broker向producer发出确认,就⼀一定要保证这个消息可以被consumer获取。
kafka 中⼀一个 topic 有多个 partition ,每个 partition ⼜又有多个 replica ,所有 replica 中有⼀一 个 leader , ISR 是⼀一定要同步 leader 后才能返回提交成功的 replica集 , OSR 内的 replica 尽⼒力力 的去同步 leader ,可能数据版本会落后。在 kafka ⼯工作的过程中,如果某个 replica 同步速度慢于




replica.lag.time.max.ms 指定的阈值,则被踢出 ISR 存⼊入 OSR ,如果后续速度恢复可以回到 ISR中。可以配置 min.insync.replicas 指定 ISR 中的 replica 最⼩小数量量,默认该值为1。 LEO 是分区的 最新数据的offset,当数据写⼊入leader后,LEO就⽴立即执⾏行行该最新数据,相当于最新数据标识位。 HW 是 当写⼊入的数据被同步到所有的 ISR 中的副本后,数据才认为已提交, HW 更更新到该位置, HW 之前的数据 才可以被消费者访问,保证没有同步完成的数据不不会被消费者访问到,相当于所有副本同步数据标识 位。
每个 partition 的所有 replica 需要进⾏行行 leader 选举(依赖ZooKeeper)。在 leader 宕机后,只 能从 ISR 列列表中选取新的 leader ,⽆无论 ISR 中哪个副本被选为新的 leader ,它都知道 HW 之前的数 据,可以保证在切换了了 leader 后,消费者可以继续看到 HW 之前已经提交的数据。当 ISR 中所
有 replica 都宕机该 partition 就不不可⽤用了了,可以设
置 unclean.leader.election.enable=true ,该选项使得 kafka 选择任何⼀一个活的replica成为leader然后继续⼯工作,此 replica 可能不不在 ISR 中,就可能导致数据丢失。所以实际使⽤用中需要进⾏行行可⽤用性与可靠性的权衡。
kafka建议数据可靠存储不不依赖于数据强制刷盘(会影响整体性能),⽽而是依赖于 replica 。


顺序消费
顺序消费是指消费者处理理消息的顺序与⽣生产者投放消息的顺序⼀一致。 主要可能破坏顺序的场景是⽣生产者 投放两条消息AB,然后A失败重投递导致消费者拿到的消息是BA。
kafka中能保证分区内部消息的有序性,其做法是设
置 max.in.flight.requests.per.connection=1 ,也就是说⽣生产者在未得到broker对消息A的确认 情况下是不不会发送消息B的,这样就能保证broker存储的消息有序,⾃自然消费者请求到的消息也是有序 的。 但是我们明显能感觉到这会降低吞吐量量,因为消息不不能并⾏行行投递了了,⽽而且会阻塞等待,也没法发挥batch 的威⼒力力。 如果想要整个topic有序,那就只能⼀一个 topic ⼀一个 partition 了了,⼀一个 consumer group 也就只有⼀一个 consumer 了了。这样就违背了了kafka⾼高吞吐的初衷。


重复消费
重复消费是指⼀一个消息被消费者重复消费了了。 这个问题也是上⾯面第三个语义需要解决的。
⼀一般的消息系统如kafka或者类似的rocketmq都不不能也不不提倡在系统内部解决,⽽而是配合第三⽅方组 件,让⽤用户⾃自⼰己去解决。究其原因还是解决问题的成本与解决问题后获得的价值不不匹配,所以⼲干脆不不解 决,就像操作系统对待死锁⼀一样,采取“鸵⻦鸟政策”。


性能
衡量量⼀一个消息系统的性能有许多⽅方⾯面,最常⻅见的就是下⾯面⼏几个指标。
连接数




是指系统在同⼀一时刻能⽀支持多少个⽣生产者或者消费者的连接总数。连接数和broker采⽤用的⽹网络IO模型直 接相关,常⻅见模型有:单线程、连接每线程、Reactor、Proactor等。 单线程⼀一时刻只能处理理⼀一个连 接,连接每线程受制于server的线程数量量,Reactor是⽬目前主流的⾼高性能⽹网络IO模型,Proactor由于操 作系统对真异步的⽀支持不不太⾏行行所以尚未流⾏行行。
kafka的 broker 采⽤用了了类似于 Netty 的 Reactor 模型:1(1个 Acceptor 线程)+N(N
个 Processor 线程)+M(M个 Work 线程)。 其中 Acceptor 负责监听新的连接请求,同时注册
OPACCEPT 事件,将新的连接按照 RoundRobin 的⽅方式交给某个 Processor 线程处理理。 每
个 Processor 都有⼀一个 NIO selector ,向 Acceptor 分配的 SocketChannel 注册 OPREAD、OPWRITE 事件,对socket进⾏行行读写。N由 num.networker.threads 决定。 Worker 负责具体的业务逻 辑如:从 requestQueue 中读取请求、数据存储到磁盘、把响应放进 responseQueue 中等等。M的⼤大 ⼩小由 num.io.threads 决定。
Reactor模型⼀一般基于IO多路路复⽤用(如 select , epoll ),是⾮非阻塞的,所以少量量的线程能处理理⼤大量量 的连接。 如果⼤大量量的连接都是 idle 的,那么Reactor使⽤用 epoll 的效率是杠杠的,如果⼤大量量的连接都 是活跃的,此时如果没有 Proactor 的⽀支持就最好把 epoll 换成 select 或者 poll 。 具体做法是 - Djava.nio.channels.spi.SelectorProvider 把 sun.nio.ch 包下⾯面的 EPollSelectorProvider换成 PollSelectorProvider 。
QPS
是指系统每秒能处理理的请求数量量。QPS通常可以体现吞吐量量(该术语很⼴广,可以⽤用TPS/QPS、PV、UV、业务数/⼩小时等单位体现)的⼤大⼩小。
kafka中由于可以采⽤用 batch 的⽅方式(还可以压缩),所以每秒钟可以处理理的请求很多(因为减少了了解 析量量、⽹网络往复次数、磁盘IO次数等)。另⼀一⽅方⾯面,kafka每⼀一个topic都有多个partition,所以同⼀一个topic下可以并⾏行行(注意不不是并发哟)服务多个⽣生产者和消费者,这也提⾼高了了吞吐量量。

并发数
是指系统同时能处理理的请求数量量数。⼀一般⽽而⾔言, QPS = 并发数/平均响应时间 或者说 并发数 = QPS*平 均响应时间 。
这个参数⼀一般只能估计或者计算,没法直接测。顾名思义,机器器性能越好当然并发数越⾼高咯。此外注意 ⽤用上多线程技术并且提⾼高代码的并⾏行行度、优化IO模型、减少减少内存分配和释放等⼿手段都是可以提⾼高并 发数的。


[img]blob:http://bbs.itheima.com/179b2379-2b8a-4e5f-a7be-5d2a1977207a[/img]
扩展性

消息系统的可扩展性是指要为系统组件添加的新的成员的时候⽐比较容易易。kafka 中扩展性的基⽯石就是 topic 采⽤用的 partition 机制。第⼀一, Kafka 允许 Partition
在 cluster 中的 Broker 之间移动,以此来解决数据倾斜问题。第⼆二,⽀支持⾃自定义的 Partition 算 法,⽐比如你可以将同⼀一个 Key 的所有消息都路路由到同⼀一个 Partition 上去(来获得顺序)。第 三, partition 的所有 replica 通过 ZooKeeper 来进⾏行行集群管理理,可以动态增减副本。第四,partition也⽀支持动态增减。
对于 producer ,不不存在扩展问题,只要 broker 还够你连接就⾏行行。 对于 consumer ,⼀一个 consumer group 中的 consumer 可以增减,但是最好不不要超过⼀一个 topic 的 partition 数量量,因为多余的
consumer 并不不能提升处理理速度,⼀一个 partition 在同⼀一时刻只能被⼀一个 consumer group 中的⼀一 个 consumer 消费
代码上的可扩展性就属于设计模式的领域了了,这⾥里里不不谈。


作者:爱学习的小肥猪



队列 消息队列

需要 登录 后方可回复, 如果你还没有账号请 注册新账号