Seata 框架本身并没有内置针对可靠事件模式的解决方案,但我们可以使用另一款已经介绍过的框架来实现这一目标,就是 RocketMQ。
RocketMQ 为开发人员提供了事务消息这一消息类型,专门用来应对分布式环境下的数据一致性问题。
事务消息的基本概念
事务消息是RocketMQ提供的一种高级消息类型,支持在分布式场景下消息生产和本地事务的最终一致性。我们可以分别从生产者和消费者维度出发来分析可靠事件实现上的需求。
事务消息的出现完美解决了可靠事件模式执行过程中可能出现的问题。事务消息提供了类似X/Open XA的分布事务功能,通过事务消息能达到分布式事务的最终一致性。
那么,RocketMQ 是如何做到这一点的呢?关键就在于它所提供的半消息机制。
所谓半消息(Half Message),是指暂不能投递的消息。发送方已经将消息成功发送到了服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成暂不能投递状态,处于该种状态下的消息就是半消息。
介绍完半消息的概念,我们再来明确什么是半消息回查。
我们知道由于网络闪断、生产者应用重启等原因,可能会导致某条事务消息的二次确认丢失。RocketMQ 服务端通过扫描发现某条消息长期处于半消息状态时,就会主动向消息生产者询问该消息的最终状态(Commit 或 Rollback),这一过程就是半消息回查。图 1 展示了 RocketMQ 中事务消息的整体架构。
进一步,我们梳理 RocketMQ 事务消息的执行过程,如图2所示。
可以看到,图 2 存在服务A和服务B这两个微服务。其中服务 A 是消息发布者,而服务 B 是消息消费者,我们需要确保两者之间数据的一致性。这里有 7 个步骤。
图 2 更多是站在消息发布者的角度看待事务消息的发布流程。而针对消息消费而言,如果消费者处理事务消息时出现异常,RocketMQ 会进行重试操作,直到消息消费和本地事务处理都成功。这是一种回调机制,会被 RocketMQ 自动调用。
事务消息开发模式
介绍完 RocketMQ 事务消息的基本概念和执行流程之后,我们接着介绍它的开发模式。
实现消息发布者
当我们在微服务架构中引入事务消息之前,需要创建一张事务执行记录表。事务执行记录表的作用有两个:一个是实现事务回查,另一个则是实现业务层幂等控制。
事务执行记录表的创建脚本如以下代码所示。
代码清单1 事务执行记录表 SQL 定义代码
CREATE TABLE `tx_record` (`tx_no` varchar(64) NOT NULL COMMENT '事务Id',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',PRIMARY KEY (`tx_no`)) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='事务记录表'
接下来我们要引入 RocketMQ 内置的TransactionListener接口。
为了实现事务消息,开发人员的主要开发工作量就体现在对这个接口的实现过程中。TransactionListener接口的定义如下所示。
代码清单2 TransactionListener接口定义代码
public interface TransactionListener {//当发送事务消息成功之后,该方法会被触发,本地事务将被执行LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);//当没有收到事务消息的响应时,服务器会发送确认消息来检查事务状态,该方法会被触发并获取本地事务状态LocalTransactionState checkLocalTransaction(final MessageExt msg);}
可以看到,TransactionListener接口的两个方法分别完成了本地事务执行和本地事务回查这两个核心操作。那么我们应该如何实现这两个方法呢?这里给出这两个方法的执行伪代码。
代码清单3 TransactionListener接口两个方法实现伪代码
executeLocalTransaction { 执行本地事务 如果失败就选择回滚事务,反之提交事务}checkLocalTransaction { 实现事务回查 根据事务执行记录判断,已执行则提交事务}
注意:这两个方法需要消息的发布者来实现,但调用方是 RocketMQ 自身,而且这个调用过程是自动触发的,不需要开发做任何干预。
图 3 围绕消息发布者展示了其所需要实现的各个核心步骤。
图3 事务消息中消息发布者实现过程
如果我们使用 Spring 框架来集成 RocketMQ,那么图 3 中的业务服务实现类的实现过程可以参考如下代码示例。
代码清单3 消息发布端业务服务实现类示例代码。
@Servicepublic class CustomerTicketServiceImpl implements ICustomerTicketService {@AutowiredTxRecordMapper txRecordMapper;@AutowiredRocketMQTemplate rocketMQTemplate;@Overridepublic void generateTicket(AddCustomerTicketReqVO addCustomerTicketReqVO) {//从VO中创建TicketGeneratedEventTicketGeneratedEvent ticketGeneratedEvent = createTicketGeneratedEvent(addCustomerTicketReqVO);//将Event转化为JSON对象JSONObject jsonObject =new JSONObject();jsonObject.put("ticketGeneratedEvent",ticketGeneratedEvent);String jsonString = jsonObject.toJSONString();//生成消息对象Message<String> message = MessageBuilder.withPayload(jsonString).build();//发送事务消息rocketMQTemplate.sendMessageInTransaction("producer_group_ticket","topic_ticket",message,null);}@Override@Transactionalpublic void doGenerateTicket(TicketGeneratedEvent ticketGeneratedEvent) {//幂等判断if(Objects.nonNull(txRecordMapper.findTxRecordByTxNo(ticketGeneratedEvent.getTxNo()))){return ;}//插入工单CustomerTicket customerTicket = CustomerTicketConverter.INSTANCE.convertEvent(ticketGeneratedEvent);customerTicket.setStatus(1);save(customerTicket);//添加事务日志txRecordMapper.addTxRecord(ticketGeneratedEvent.getTxNo());}...}
上述代码展示的是一个插入客服工单(CustomerTicket)的过程,generateTicket和doGenerateTicket方法分别对应图 3 中的发送消息和执行本地事务这两个环节。
注意:这里使用了RocketMQTemplate的sendMessageInTransaction方法来发送事务消息。同时,我们也看到了事务执行记录表的一种应用场景,即实现业务层幂等控制。
接下来继续实现图3所示的TransactionListener接口,示例代码如下:
代码清单4 TransactionListener接口实现类示例代码。
@Component@RocketMQTransactionListener(txProducerGroup = "producer_group_ticket")public class ProducerListener implements RocketMQLocalTransactionListener {@AutowiredICustomerTicketService customerTicketService;@AutowiredTxRecordMapper txRecordMapper;//事务消息发送后的回调方法,当消息发送给MQ成功,此方法被回调@Override@Transactionalpublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {try {//解析消息,转成Event对象TicketGeneratedEvent ticketGeneratedEvent = convertEvent(message);//执行本地事务customerTicketService.doGenerateTicket(ticketGeneratedEvent);//当返回RocketMQLocalTransactionState.COMMIT,自动向MQ发送commit消息,MQ将消息的状态改为可消费return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {e.printStackTrace();//如果本地事务执行失败,就将消息设置为回滚状态return RocketMQLocalTransactionState.ROLLBACK;}}//事务状态回查@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {//解析消息,转成Event对象TicketGeneratedEvent ticketGeneratedEvent = convertEvent(message);//根据事务Id判断是否存在已执行的事务Boolean isTxNoExisted = Objects.nonNull(txRecordMapper.findTxRecordByTxNo(ticketGeneratedEvent.getTxNo()));//如果事务已执行则返回COMMIT,反之返回UNKNOWN状态if(isTxNoExisted){return RocketMQLocalTransactionState.COMMIT;}else{return RocketMQLocalTransactionState.UNKNOWN;}}...}
这段代码清晰地展示了TransactionListener接口中两个核心方法的实现过程。在executeLocalTransaction方法中,我们通过调用CustomerTicketService业务服务类的doGenerateTicket方法完成了本地事务;而在checkLocalTransaction方法中,我们则实现了事务回查机制。这里同样展示了事务执行记录表的另一种应用场景,即实现事务回查。
实现消息消费者
类似,当使用事务消息时,消息消费者的实现过程同样遵循一定的开发规范,如图 4 所示。
图4 事务消息中消息消费者实现过程
可以看到,相比于消息发布者,消息消费者的实现过程要简单很多。
代码清单5 消息消费实现类示例代码。
@Component@RocketMQMessageListener(consumerGroup = "consumer_group_ticket",topic = "topic_ticket")public class Consumer implements RocketMQListener<String> {@AutowiredIChatRecordService chatRecordService;//接收消息@Overridepublic void onMessage(String message) {log.info("开始消费消息:{}",message);//解析消息JSONObject jsonObject = JSONObject.parseObject(message);String ticketGeneratedEventString = jsonObject.getString("ticketGeneratedEvent");//转成TicketGeneratedEventTicketGeneratedEvent ticketGeneratedEvent = JSONObject.parseObject(ticketGeneratedEventString, TicketGeneratedEvent.class);//添加本地聊天记录chatRecordService.generateChatRecord(ticketGeneratedEvent);}}
可以看到,这个消息消费者的实现过程没有任何特殊之处,我们只需要实现RocketMQListener接口的onMessage方法,并在该方法中调用业务服务实现类中的业务方法即可。
消费者端的业务服务实现类的实现过程如下。
代码清单6 消息消费端业务服务实现类示例代码:
@Servicepublic class ChatRecordServiceImpl implements IChatRecordService {@AutowiredTxRecordMapper txRecordMapper;@Override@Transactionalpublic void generateChatRecord(TicketGeneratedEvent ticketGeneratedEvent) {//幂等判断if(Objects.nonNull(txRecordMapper.findTxRecordByTxNo(ticketGeneratedEvent.getTxNo()))){return ;}//插入聊天记录ChatRecord chatRecord = ChatRecordConverter.INSTANCE.convertEvent(ticketGeneratedEvent);save(chatRecord);//添加事务日志txRecordMapper.addTxRecord(ticketGeneratedEvent.getTxNo());}}
这里同样通过事务执行记录表实现了业务层幂等控制,并最终完成本地事务的提交。
作为总结,我们使用时序图来详细展示事务消息发送和消费过程,如图 5 所示。
图5 事务消息发送和消费时序图