RocketMQ特性Broker存储事务消息实现

Agnes ·
更新时间:2024-11-14
· 110 次阅读

目录

引言

TransactionalMessageService

处理事务消息

第一处:

第二处:

引言

Broker中,事务消息的初始化是通过BrokerController.initialTransaction()方法执行的。

private void initialTransaction() { this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class); if (null == this.transactionalMessageService) { this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore())); LOG.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName()); } this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class); if (null == this.transactionalMessageCheckListener) { this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener(); LOG.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName()); } this.transactionalMessageCheckListener.setBrokerController(this); this.transactionalMessageCheckService = new TransactionalMessageCheckService(this); }

这里有三个核心的初始化变量

TransactionalMessageService

事务消息主要处理服务。默认实现类是TransactionalMessageServiceImpl也可以自己定义事务消息处理实现类,通过ServiceProvider.loadClass()方法进行加载。

TransactionalMessageService类定义如下。内部属性已加注释标明。

public interface TransactionalMessageService { //用于保存Half事务消息 PutMessageResult prepareMessage(MessageExtBrokerInner messageInner); CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner); //删除事务消息 boolean deletePrepareMessage(MessageExt messageExt); //提交事务消息 OperationResult commitMessage(EndTransactionRequestHeader requestHeader); //回滚事务消息 OperationResult rollbackMessage(EndTransactionRequestHeader requestHeader); void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener); //打开事务消息 boolean open(); //关闭事务消息 void close(); }

transactionalMessageCheckListener

事务消息回查监听器

transactionalMessageCheckService

事务消息回查服务,启动一个线程定时检查超时的Half消息是否需要回查。

处理事务消息

当初始化完成之后,Broker就可以处理事务消息了。

Broker存储事务消息的是org.apache.rocketmq.broker.processor.SendMessageProcessor,这和普通消息其实是一样的。

但是有两点针对事务消息的特殊处理

第一处:

org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage中:

//获取扩展字段的值,若是该值为true则为事务消息 String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); boolean sendTransactionPrepareMessage = false; if (Boolean.parseBoolean(traFlag) && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //判断当前Broker配置是否支持事务消息 if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); return response; } sendTransactionPrepareMessage = true; } if (sendTransactionPrepareMessage) { //保存Half信息 putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner); } else { putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); } 第二处:

存储事务消息前的预处理,对应方法是

org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) { //将原消息的topic保存在扩展字段中 MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); //将原消息的QueueId保存在扩展字段中 MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId())); //将原消息的SysFlag保存在扩展字段中 msgInner.setSysFlag( MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE)); //修改topic的值为RMQ_SYS_TRANS_HALF_TOPIC msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic()); //修改Queueid为0 msgInner.setQueueId(0); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); return msgInner; }

完成上述步骤之后,调用DefaultMessageStole.putMessage()方法将其保存到CommitLog中。

CommitLog存储成功之后,通过org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend()方法对其进行处理。

final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); switch (tranType) { // Prepared and Rollback message is not consumed, will not enter the consume queue case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: queueOffset = 0L; break; case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: default: break; }

这里的逻辑是这样的,当读到的消息类型为事务消息时,设置当前消息的位点值为0,而不是设置真实的位点。这样该位点就不会建立ConsumeQueue索引,也不会被消费

以上就是RocketMQ特性Broker存储事务消息实现的详细内容,更多关于RocketMQ Broker存储事务消息的资料请关注软件开发网其它相关文章!



broker rocketmq 事务

需要 登录 后方可回复, 如果你还没有账号请 注册新账号