RocketMQ是一个队列模型的消息中间件,具有
高性能 高可靠 高实时 分布式采用java语言开发的分布式的消息系统,阿里巴巴团队开发,2016年底贡献给apache。
模型
队列模型 主题模型或发布订阅模型在主题模型种,消息的生产者被成为发布者(Publisher),消息的消费者被称为订阅者(Subscriber),存放消息的容易被称为主题(topic)。
发布者将消息发布到指定的主题种,订阅者需要提前订阅主题才能接受特定主题的消息。
RocketMQ中的消息模型
Producer Group 生产者组:代表某一类的生产者,比如多个秒杀系统作为生产者,多个合在一起就是一个生产者组,一个生产者组通常产生相同的消息。 Topic 主题:代表一类消息,比如订单消息,物流消息等。 Consumer Group 消费者组:代表某一类的消费者,比如多个短信系统作为消费者,多个合在一起就是个一个消费者组,它们一般消费相同的消息。主题中存在多个队列,生产者每次生产消息之后是指定主题的某个队列发送消息的。
消费位移:消息被一个消费者组消费之后是不会删除的(因为其他消费者组也需要),为每个消费组维护一个消息位移。每次消费者组消费完会返回一个成功的响应,然后队列再把维护的消费位移加一,这样就不会出现消费被重复消费。
为什么一个主题中要维护多个队列?提高并发能力。
RocketMQ在一个topic中配置多个队列并且每个队列维护每个消费者组的消费位置,实现了主题模式/发布订阅模式。
RocketMQ架构图
Broker:负责消息的存储,投递和查询以及服务高可用保证,就是消息队列服务器。生产者生产消息到Broker,消费者从Broker拉取消息并消息。主从部署(master/slave),slave当时从master同步数据,master宕机,slave提供消费服务,但不能写入信息。
NameServer:注册中心。Borker管理和路由信息管理。Borker将自己的信息注册NameServer,生产者和消费者定期查询相关的Broker信息。作用:解耦,多个Broker保持负载均衡。通过单个Broker和所有的NameServer保持长连接,并且每隔30秒Broker会向所有的NameServer发送心跳,心跳包括自身的Topic配置信息。
Producer:消息发布的角色,支持分布式集群方式部署,生产者。生产者需要向Broker发送消息的时候,需要先从NameServer获取关于Broker的路由信息,然后通过轮询的方法去向每个队列中生产数据以达到负载均衡的效果。
Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。消费者通过NameServer获取所有Broker的路由消息后,向Broker发送pull请求来获取消息数据。Consumer支持两种方式启动-广播(Broadcast)和集群(Cluster)。广播模式下一个消息会发送给同一个消费组中的所有消费者,集群模式下消息只会发送会一个消费者。
顺序消费:普通顺序(大多数情况)和严格顺序(集群中由一台宕机则整个集群不可用,主要场景用于binlog同步),Hash取模法???
重复消费:幂等:(任意多次执行所产生的影响均与一次执行的影响相同),结合具体业务,redis(天然支持幂等),数据库插入法???
分布式事务
事务要么都执行要么都不执行。比较常见的分布式事务实现有2PC,TCC和事务消息(half半消息机制)???
RocketMQ中使用的是事务消息加上事务反查机制,
消息堆积问题
限流降级,判断是否出现大量错误,
增加消费者实例同时增加每个主题的队列数量,因为一个队列只会被一个消费者消费。
回溯消费
消费被消费后仍需要保留,重新消费一般是安装时间维度。RocketMQ支持按时间回溯消费,可精确到毫秒。
RocketMQ的刷盘机制
同步刷盘和异步刷盘
同步复制和异步复制
同步刷盘和异步刷盘是在单个结点层面的,同步复制和异步复制是Broker主从模式下,主节点返回消息给客户端的时候是否需要同步从节点。
存储机制:
commitLog:消息主体以及元数据的存储主体,存储Producer
端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。
ConsumeQueue:消息消费队列,引入的目的主要是提高消息消费的性能。由于RocketMQ
是基于主题 Topic
的订阅模式,消息消费是针对主题进行的,如果要遍历 commitlog
文件中根据 Topic
检索消息是非常低效的。Consumer
即可根据 ConsumeQueue
来查找待消费的消息。其中,ConsumeQueue
(逻辑消费队列)作为消费消息的索引,保存了指定 Topic
下的队列消息在 CommitLog
中的起始物理偏移量 offset
**,消息大小 size
和消息 Tag
的 HashCode
值。consumequeue
文件可以看成是基于 topic
的 commitlog
索引文件**,故 consumequeue
文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样 consumequeue
文件采取定长设计,每一个条目共20个字节,分别为8字节的 commitlog
物理偏移量、4字节的消息长度、8字节tag hashcode
,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个 ConsumeQueue
文件大小约5.72M;
IndexFile:IndexFile
(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。
内存映射机制???