一、背景
在部分业务场景中,系统需要更新存储的数据同时通过消息通知外部,而且又有一致性的需求,这就需要保证更新数据和发送消息构成一个事物,即更新数据与发送消息同时成功或同时失败。目前关系数据库已经实现事物,大多MQ也均已实现事物消息,所以这篇文章调研当前MQ的事物消息现状、使用和设计及实现。目前主要使用Apache RocketMQ(下文简称RocketMQ、RMQ)和QMQ,所以只分析这两种。
二、现状
目前主要使用RocketMQ,也在尝试使用QMQ,所以主要调研了这两种MQ的事物消息。目前使用Apache RocketMQ 4.0.0-incubating,此版本不支持事物消息。Apache RocketMQ从4.3.0版本后开始支持事物消息。QMQ目前使用1.1.14,QMQ1.1.0版本后支持事物消息。
三、设计及实现
RocketMQ4.3.0+事物消息设计
RocketMQ 事务消息的实现原理基于两阶段提交和定时事务状态回查来决定消息最终是提交还是回滚,交互设计如下图所示。
- 应用程序在事务内完成相关业务数据落库后,需要同步调用RocketMQ客户端消息发送接口,发送状态为prepare的消息。
- RocketMQ 在收到类型为 prepare 的消息时, 会首先备份消息的原主题与原消息消费队列,然后将消息存储在主题为 RMQ_SYS_TRANS_HALF_TOPIC 的消息消费队列中。
- prepare发送成功后, RocketMQ客户端根据服务端响应的发送结果做处理,如果发送结果为成功则回调应用程序记录消息的发送成功标记,该标记与本地业务操作同属一个事务,确保消息发送与本地事务的原子性。
- 客户端根据回调应用程序返回的本地事物状态回馈服务端事物状态为提交、回滚消息、未知。
- 服务端收到回馈的事物状态如果是提交或回滚, 则消息服务器提交或回滚消息,如果是未知时进行补偿处理。RocketMQ 消息服务器开启一个定时任务,消费 RMQ_SYS_TRANS_HALF_TOPIC 的消息,向消息发送端(应用程序)发起消息事务状态回查。
- 应用程序根据保存的事务状态回馈消息服务器事务的状态(提交、回滚、未知),如果是提交或回滚, 则消息服务器提交或回滚消息,如果是未知,待下一次回查, RocketMQ 允许设置一条消息的回查间隔与回查 次数,如果在超过回查次数后依然无法获知消息的事务状态, 默认回滚消息。
RocketMQ4.3.0+事物消息实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| //事物消息生产者,用来发送事物消息
org.apache.rocketmq.client.producer.TransactionMQProducer
//发送事物消息
TransactionMQProducer.sendMessageInTransaction(org.apache.rocketmq.common.message.Message, java.lang.Object)
//事物监听器
org.apache.rocketmq.client.producer.TransactionListener
//prepare消息发送成功时回调应用程序
org.apache.rocketmq.client.producer.TransactionListener.executeLocalTransaction
//消息服务器回查未知事物状态的prepare消息的状态
org.apache.rocketmq.client.producer.TransactionListener.checkLocalTransaction
|
QMQ事物消息设计
QMQ事物消息设计上它本身在消息协议设计中没有区分普通消息和事物消息,而是依赖关系数据库同一服务中不同数据库事物的强一致性以及Spring事物管理,另外增加WatchDog服务来补偿发送失败的消息重发。
- 应用程序在事务中业务数据落库,需要同步调用QMQ客户端发送消息。
- 客户端会判断如果是事物消息会先保存再内存中,本地事物提交前会回调客户端存储内存中的消息到数据库中。
- 本地事物提交后会回调客户端的事物监听。
- 客户端将内存中的消息依次发送到Broker,发送成功会删除数据库消息。
- WatchDog会定时检查业务数据库消息表中记录的消息并重新发送。
- 对于一定时间间隔内且没超过最大发送错误次数的消息会重新发送到Broker
QMQ事物消息实现
WatchDog服务:也称补偿任务,检查配置的所有业务数据库中消息表中的消息,并重新发送。
客户端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| //QMQ事物监听接口,定义了事物的所有阶段包括开始事物、提交前、提交后、完成后等回调方法,还有事物中增加消息的方法
qunar.tc.qmq.TransactionListener
//QMQ默认实现的事物监听接口,实现增加消息到内存,提交前保存消息到数据库、提交后发送消息
qunar.tc.qmq.producer.tx.DefaultTransactionListener
//消息存储接口,定义了插入消息、完成消息、开始事物、结束事物方法
qunar.tc.qmq.MessageStore
//默认消息存储实现,使用JDBC增删数据库消息
qunar.tc.qmq.producer.tx.spring.DefaultMessageStore
//事物提供者接口,定义了是否事物中、设置事物监听器、消息存储器
qunar.tc.qmq.TransactionProvider
//默认Spring事物管理实现,使用Spring事物管理器实现事物监听
qunar.tc.qmq.producer.tx.spring.SpringTransactionProvider
//消息生产者,发送事物消息需要设置TransactionProvider
qunar.tc.qmq.producer.MessageProducerProvider
|
四、使用
RocketMQ4.3.0+事物消息
https://github.com/apache/rocketmq/blob/master/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
| import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
|
QMQ事物消息
https://github.com/qunarcorp/qmq/blob/master/docs/cn/transaction.md
五、结论
RocketMQ4.3.0一下版本暂不支持事物消息,可以选择与QMQ相同的方法通过Spring事物管理来实现消息的存储于发送,通过定时任务来补偿发送失败的消息。
RocketMQ4.3.0及以上版本支持两阶段提交的事物消息,可进一步封装事物监听实现类似QMQ在数据库中初始化消息表,在执行本地事物阶段将消息入库,在回查事物状态时从数据库中查询消息状态。
参考
https://github.com/apache/rocketmq/tree/master/docs/cn
qmq详解 https://blog.csdn.net/csdnnews/article/details/9925682