您的位置 首页 > 腾讯云社区

RocketMQ 源码分析 —— 集成 Spring Boot---芋道源码

摘要: 原创出处 http://www.iocoder.cn/RocketMQ/spring-boot-integration/ 「芋道源码」欢迎转载,保留摘要,谢谢!

1. 概述2. 调试环境搭建3. 项目结构一览5. annotation 包6. autoconfigure 包7. config 包8. support 包9. core 包666. 彩蛋1. 概述

在开始分享 https://github.com/apache/rocketmq-spring 项目(RocketMQ 集成到 Spring Boot 中),我们先恶趣味的看一段历史:

2014-08 Spring Boot 1 正式发布。2018-03 Spring Boot 2 正式发布。2018-12 RocketMQ 团队发布 RocketMQ 集成到 Spring Boot 的解决方案,并且提供了中文文档。

在阅读本文之前,希望胖友能够先熟读 中文文档 。最好呢,当然不强制,可以操练下每个 Demo 。

2. 调试环境搭建

在读源码之前,我们当然是先把调试环境搭建起来。

2.1 依赖工具JDK :1.8+MavenIntelliJ IDEA2. 源码拉取

从官方仓库 https://github.com/apache/rocketmq-spring Fork 出属于自己的仓库。为什么要 Fork ?既然开始阅读、调试源码,我们可能会写一些注释,有了自己的仓库,可以进行自由的提交。?

使用 IntelliJ IDEA 从 Fork 出来的仓库拉取代码。拉取完成后,Maven 会下载依赖包,可能会花费一些时间,耐心等待下。

在等待的过程中,我来简单说下,搭建调试环境的过程:

启动 RocketMQ Namesrv启动 RocketMQ Broker启动 RocketMQ Spring Boot Producer启动 RocketMQ Spring Boot Consumer

最小化的 RocketMQ 的环境,暂时不考虑 Namesrv 集群、Broker 集群、Consumer 集群。

? 另外,本文使用的 rocketmq-spring 版本是 2.0.2-SNAPSHOT 。

2.3 启动 RocketMQ Namesrv

方式一,可以参考 《RocketMQ 源码解析 —— 调试环境搭建》 的 「3. 启动 RocketMQ Namesrv」 的方式,进行启动 RocketMQ Namesrv 。

方式一,可以方便调试 RocketMQ Namesrv 的代码。

方式二,也可以按照 《Apache RocketMQ —— Quick Start》 的 「Start Name Server」 的方式,进行启动 RocketMQ Namesrv 。

方式二,比较方便。

2.4 启动 RocketMQ Broker

方式一,可以参考 《RocketMQ 源码解析 —— 调试环境搭建》 的 「4. 启动 RocketMQ Broker」 的方式,进行启动 RocketMQ Broker 。

需要注意的是,要删除 org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener 和 org.apache.rocketmq.broker.transaction.TransactionalMessageService 两个 SPI 配置文件,否则事务功能,无法正常使用。

方式二,也可以按照 《Apache RocketMQ —— Quick Start》 的 「Start Broker」 的方式,进行启动 RocketMQ Broker 。

2.5 启动 RocketMQ Spring Boot Producer

第一步,打开根目录的 pom.xml 文件,将 rocketmq-spring-boot-samples 示例项目的注释去掉。如下:

<!-- pom --> <modules> <module>rocketmq-spring-boot-parent</module> <module>rocketmq-spring-boot</module> <module>rocketmq-spring-boot-starter</module> <!-- Note: The samples need to mvn compiple in its own directory <module>rocketmq-spring-boot-samples</module> --> <module>rocketmq-spring-boot-samples</module> </modules>

此时,Maven 又会下载依赖包,可能会花费一些时间,耐心等待下。

第二步,右键运行 rocketmq-produce-demo 的 ProducerApplication 的 #main(String[] args) 方法,Producer 就启动完成了。输出日志如下图:

此时,可能会报 Intellij IDEA 报错:Error : java 不支持发行版本5 。可以参考 《Intellij idea 报错:Error : java 不支持发行版本5》 文章,进行解决。

2.6 启动 RocketMQ Spring Boot Consumer

右键运行 rocketmq-consumer-demo 的 ConsumerApplication 的 #main(String[] args) 方法,Consumer 就启动完成了。输出日志如下图:

? 后面,我们就可以愉快的各种调试玩耍了~

3. 项目结构一览

本文主要分享 rocketmq-spring 的 项目结构。 希望通过本文能让胖友对 rocketmq-spring 的整体项目有个简单的了解。

项目结构一览

3.1 代码统计

这里先分享一个小技巧。笔者在开始源码学习时,会首先了解项目的代码量。

第一种方式,使用 IDEA Statistic 插件,统计整体代码量。

Statistic 统计代码量

我们可以粗略的看到,总的代码量在 1700 行。这其中还包括单元测试,示例等等代码。 所以,不慌,一点不慌~

第二种方式,使用 Shell 脚本命令逐个 Maven 模块统计 。

一般情况下,笔者使用 find . -name "*.java"|xargs cat|grep -v -e ^

当然,考虑到准确性,胖友需要手动 cd 到每个 Maven 项目的 src/main/java 目录下,以达到排除单元测试的代码量。

Shell 脚本统计代码量

统计完后,是不是更加不慌了。哈哈哈哈。

3.2 rocketmq-spring-boot-parent 模块

rocketmq-spring-boot-parent 模块,无具体代码,作为其它项目的 Maven Parent 项目,例如定义了依赖版本号。

3.3 rocketmq-spring-boot-starter 模块

rocketmq-spring-boot-starter 模块,无具体代码,作为 Spring Boot RocketMQ Starter 模块。其 pom.xml 的代码如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-parent</artifactId> <version>2.0.2-SNAPSHOT</version> <relativePath>../rocketmq-spring-boot-parent/pom.xml</relativePath> </parent> <artifactId>rocketmq-spring-boot-starter</artifactId> <packaging>jar</packaging> <name>RocketMQ Spring Boot Starter</name> <description>SRocketMQ Spring Boot Starter</description> <url>https://github.com/apache/rocketmq-spring</url> <dependencies> <!-- Spring Boot RocketMQ 具体实现 --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot</artifactId> </dependency> <!-- Spring Boot Starter --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!-- 提供 Validation 功能 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-validation</artifactId> </dependency> </dependencies> </project> 3.4 rocketmq-spring-boot 模块

rocketmq-spring-boot 模块,1979 行代码,提供了 Spring Boot RocketMQ 的具体实现。其每个 package 包的功能,分别如下:

annotation :注解和注解相关的枚举。autoconfigure :自动配置。config :配置类。 有点难解释。等后面直接撸源码。core :核心实现。support :提供支持,例如说工具类。 有点难解释。等后面直接撸源码。3.5 rocketmq-spring-boot-samples 模块

rocketmq-spring-boot-samples 模块,435 行代码,提供示例。* rocketmq-consume-demo 模块,提供消费者示例。* rocketmq-produce-demo 模块,提供生产者示例。

艿艿:后面的小节,我们开始看具体的源码。

5. annotation 包5.1 @RocketMQMessageListener

org.apache.rocketmq.spring.annotation.@RocketMQMessageListener 注解,声明指定 Bean 是 RocketMQ 消费者的 MessageListener 。代码如下:

// RocketMQMessageListener.java @Target(ElementType.TYPE) // 表名,注解在类上 @Retention(RetentionPolicy.RUNTIME) @Documented public @interface RocketMQMessageListener { /** * 消费分组 * * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve * load balance. It's required and needs to be globally unique. * * * See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a> for further discussion. */ String consumerGroup(); /** * 消费主体 * * Topic name. */ String topic(); /** * 选择消息的方式 * * Control how to selector message. * * @see SelectorType */ SelectorType selectorType() default SelectorType.TAG; /** * 选择消息的表达式 * * Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92} */ String selectorExpression() default "*"; /** * 消费模式 * * Control consume mode, you can choice receive message concurrently or orderly. */ ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY; /** * 消费模型 * * Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice. */ MessageModel messageModel() default MessageModel.CLUSTERING; /** * 消费线程数 * * Max consumer thread number. */ int consumeThreadMax() default 64; } 具体使用,见示例 OrderPaidEventConsumer 。selectorType 属性,org.apache.rocketmq.spring.annotation.SelectorType 枚举,选择消息的方式。代码如下: // SelectorType.java public enum SelectorType { /** * @see ExpressionType#TAG * * 标签 */ TAG, /** * @see ExpressionType#SQL92 * * SQL */ SQL92 } consumeMode 属性,org.apache.rocketmq.spring.annotation.ConsumeMode ,消费模式。代码如下: // ConsumeMode.java public enum ConsumeMode { /** * Receive asynchronously delivered messages concurrently * * 并发消费 */ CONCURRENTLY, /** * Receive asynchronously delivered messages orderly. one queue, one thread * * 顺序消费 */ ORDERLY } messageModel 属性,org.apache.rocketmq.spring.annotation.MessageModel ,消费模型。代码如下: // MessageModel.java public enum MessageModel { /** * 广播消费 */ BROADCASTING("BROADCASTING"), /** * 集群消费 */ CLUSTERING("CLUSTERING"); private final String modeCN; MessageModel(String modeCN) { this.modeCN = modeCN; } public String getModeCN() { return this.modeCN; } } 5.2 @RocketMQTransactionListener

org.apache.rocketmq.spring.annotatio.@RocketMQTransactionListener 注解,声明指定 Bean 是 RocketMQ 生产者的 RocketMQLocalTransactionListener 。代码如下:

// RocketMQTransactionListener.java @Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE}) // 表名,注解在类上 @Retention(RetentionPolicy.RUNTIME) @Documented @Component // 默认带了 @Component 注解,所以只要添加到了类上,就会注册成 Spring Bean 对象 public @interface RocketMQTransactionListener { /** * 生产者分组 * * Declare the txProducerGroup that is used to relate callback event to the listener, rocketMQTemplate must send a * transactional message with the declared txProducerGroup. * <p> * <p>It is suggested to use the default txProducerGroup if your system only needs to define a TransactionListener class. */ String txProducerGroup() default RocketMQConfigUtils.ROCKETMQ_TRANSACTION_DEFAULT_GLOBAL_NAME; /** * Set ExecutorService params -- corePoolSize */ int corePoolSize() default 1; /** * Set ExecutorService params -- maximumPoolSize */ int maximumPoolSize() default 1; /** * Set ExecutorService params -- keepAliveTime */ long keepAliveTime() default 1000 * 60; //60ms /** * Set ExecutorService params -- blockingQueueSize */ int blockingQueueSize() default 2000; } // RocketMQConfigUtils.java public static final String ROCKETMQ_TRANSACTION_DEFAULT_GLOBAL_NAME = "rocketmq_transaction_default_global_name"; 6. autoconfigure 包6.1 RocketMQProperties

org.apache.rocketmq.spring.autoconfigure.RocketMQProperties ,RocketMQ 客户端的 Properties 对象。代码如下:

// RocketMQProperties.java @ConfigurationProperties(prefix = "rocketmq") // 配置文件中 rocketmq 前缀 public class RocketMQProperties { /** * The name server for rocketMQ, formats: `host:port;host:port`. * * Namesrv 地址 */ private String nameServer; /** * Producer 配置 */ private Producer producer; // ... 省略 setting/getting 方法 public static class Producer { /** * Name of producer. */ private String group; /** * Millis of send message timeout. */ private int sendMessageTimeout = 3000; /** * Compress message body threshold, namely, message body larger than 4k will be compressed on default. */ private int compressMessageBodyThreshold = 1024 * 4; /** * Maximum number of retry to perform internally before claiming sending failure in synchronous mode. * This may potentially cause message duplication which is up to application developers to resolve. */ private int retryTimesWhenSendFailed = 2; /** * <p> Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p> * This may potentially cause message duplication which is up to application developers to resolve. */ private int retryTimesWhenSendAsyncFailed = 2; /** * Indicate whether to retry another broker on sending failure internally. */ private boolean retryNextServer = false; /** * Maximum allowed message size in bytes. */ private int maxMessageSize = 1024 * 1024 * 4; // ... 省略 setting/getting 方法 } } 6.2 RocketMQAutoConfiguration

org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration ,RocketMQ 自动配置类。代码如下:

// RocketMQAutoConfiguration.java @Configuration // 标识是配置类 @EnableConfigurationProperties(RocketMQProperties.class) // 指定 RocketMQProperties 自动配置 @ConditionalOnClass({ MQAdmin.class, ObjectMapper.class }) // 要求有 MQAdmin、ObjectMapper 类 @ConditionalOnProperty(prefix = "rocketmq", value = "name-server") // 要求有 rocketmq 开头,且 name-server 的配置 @Import({ JacksonFallbackConfiguration.class, ListenerContainerConfiguration.class }) // 引入 JacksonFallbackConfiguration 和 ListenerContainerConfiguration 配置类 @AutoConfigureAfter(JacksonAutoConfiguration.class) // 在 JacksonAutoConfiguration 之后初始化 public class RocketMQAutoConfiguration { // ... 省略配置方法 } 6.2.1 defaultMQProducer

#defaultMQProducer() 方法,创建 DefaultMQProducer Bean 对象。代码如下:

// RocketMQAutoConfiguration.java @Bean @ConditionalOnMissingBean(DefaultMQProducer.class) // 不存在 DefaultMQProducer Bean 对象 @ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "producer.group"}) // 要求有 rocketmq 开头,且 name-server、producer.group 的配置 public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties) { // 校验配置 RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer(); String nameServer = rocketMQProperties.getNameServer(); String groupName = producerConfig.getGroup(); Assert.hasText(nameServer, "[rocketmq.name-server] must not be null"); Assert.hasText(groupName, "[rocketmq.producer.group] must not be null"); // 创建 DefaultMQProducer 对象 DefaultMQProducer producer = new DefaultMQProducer(groupName); // 将 RocketMQProperties.Producer 配置,设置到 producer 中 producer.setNamesrvAddr(nameServer); producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout()); producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed()); producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed()); producer.setMaxMessageSize(producerConfig.getMaxMessageSize()); producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold()); producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer()); return producer; } 6.2.2 rocketMQTemplate

#rocketMQTemplate(DefaultMQProducer mqProducer, ObjectMapper rocketMQMessageObjectMapper) 方法,创建 RocketMQTemplate Bean 对象。代码如下:

// RocketMQAutoConfiguration.java @Bean(destroyMethod = "destroy") // 声明了销毁时,调用 destroy 方法 @ConditionalOnBean(DefaultMQProducer.class) // 有 DefaultMQProducer Bean 的情况下 @ConditionalOnMissingBean(RocketMQTemplate.class) // 不存在 RocketMQTemplate Bean 对象 public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer, ObjectMapper rocketMQMessageObjectMapper) { // 创建 RocketMQTemplate 对象 RocketMQTemplate rocketMQTemplate = new RocketMQTemplate(); // 设置其属性 rocketMQTemplate.setProducer(mqProducer); rocketMQTemplate.setObjectMapper(rocketMQMessageObjectMapper); return rocketMQTemplate; } 关于 RocketMQTemplate 类,详细解析,见 「9.4 RocketMQTemplate」 中。6.2.3 transactionHandlerRegistry

#transactionHandlerRegistry(RocketMQTemplate template) 方法,创建 TransactionHandlerRegistry Bean 对象。代码如下:

// RocketMQAutoConfiguration.java @Bean @ConditionalOnBean(RocketMQTemplate.class) // 有 RocketMQTemplate Bean 的情况下 @ConditionalOnMissingBean(TransactionHandlerRegistry.class) // 不存在 TransactionHandlerRegistry Bean 对象 public TransactionHandlerRegistry transactionHandlerRegistry(RocketMQTemplate template) { // 创建 TransactionHandlerRegistry 对象 return new TransactionHandlerRegistry(template); } 详细解析,见 「7.2 TransactionHandlerRegistry」 中。6.2.4 transactionAnnotationProcessor

#transactionAnnotationProcessor(TransactionHandlerRegistry transactionHandlerRegistry) 方法,创建 RocketMQTransactionAnnotationProcessor Bean 对象。代码如下:

// RocketMQAutoConfiguration.java @Bean(name = RocketMQConfigUtils.ROCKETMQ_TRANSACTION_ANNOTATION_PROCESSOR_BEAN_NAME) // Bean 的名字 @ConditionalOnBean(TransactionHandlerRegistry.class) // 有 TransactionHandlerRegistry Bean 的情况下 @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public static RocketMQTransactionAnnotationProcessor transactionAnnotationProcessor(TransactionHandlerRegistry transactionHandlerRegistry) { // 创建 RocketMQTransactionAnnotationProcessor 对象 return new RocketMQTransactionAnnotationProcessor(transactionHandlerRegistry); } // RocketMQConfigUtils.java /** * The bean name of the internally managed RocketMQ transaction annotation processor. */ public static final String ROCKETMQ_TRANSACTION_ANNOTATION_PROCESSOR_BEAN_NAME = "org.springframework.rocketmq.spring.starter.internalRocketMQTransAnnotationProcessor"; 详细解析,见 「7.3 RocketMQTransactionAnnotationProcessor」 中。6.3 JacksonFallbackConfiguration

org.apache.rocketmq.spring.autoconfigure.JacksonFallbackConfiguration ,创建 ObjectMapper Bean 对象的配置类。代码如下:

// JacksonFallbackConfiguration.java import com.fasterxml.jackson.databind.ObjectMapper; @Configuration @ConditionalOnMissingBean(ObjectMapper.class) // 不存在 ObjectMapper Bean 时 class JacksonFallbackConfiguration { @Bean public ObjectMapper rocketMQMessageObjectMapper() { return new ObjectMapper(); } } com.fasterxml.jackson.databind.ObjectMapper ,是 Jackson 提供的 JSON 序列化工具类。生产者发送消息时,将消息使用 Jackson 进行序列化。消费者拉取消息时,将消息使用 Jackson 进行反序列化。6.4 ListenerContainerConfiguration

org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration ,实现 ApplicationContextAware、SmartInitializingSingleton 接口,给每个带有注解的 @RocketMQMessageListener Bean 对象,生成对应的 DefaultRocketMQListenerContainer Bean 对象。

而 DefaultRocketMQListenerContainer 类,正如其名,是 DefaultRocketMQListener(RocketMQ 消费者的监听器)容器,负责创建 DefaultRocketMQListener 对象,并启动其对应的 DefaultMQPushConsumer(消费者),从而消费消息。

6.4.1 构造方法// ListenerContainerConfiguration.java @Configuration public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton { private static final Logger log = LoggerFactory.getLogger(ListenerContainerConfiguration.class); private ConfigurableApplicationContext applicationContext; /** * 计数器,用于在 {@link #registerContainer(String, Object)} 方法中,创建 DefaultRocketMQListenerContainer Bean 时,生成 Bean 的名字。 */ private AtomicLong counter = new AtomicLong(0); private StandardEnvironment environment; private RocketMQProperties rocketMQProperties; private ObjectMapper objectMapper; public ListenerContainerConfiguration(ObjectMapper rocketMQMessageObjectMapper, StandardEnvironment environment, RocketMQProperties rocketMQProperties) { this.objectMapper = rocketMQMessageObjectMapper; this.environment = environment; this.rocketMQProperties = rocketMQProperties; } @Override // 实现自 ApplicationContextAware 接口 public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = (ConfigurableApplicationContext) applicationContext; } } 严格来说,ListenerContainerConfiguration 并不能说是一个 Configuration 类。这么写的原因,猜测是为了提供给 RocketMQAutoConfiguration 类,进行引入。当然,如果我们将 @Configuration 注解,修改成 @Component 注解,也是能良好的运行。并且 @Configuration 注解,本身自带 @Component 注解。6.4.2 afterSingletonsInstantiated

#afterSingletonsInstantiated() 方法,给每个带有注解的 @RocketMQMessageListener Bean 对象,生成对应的 DefaultRocketMQListenerContainer Bean 对象。代码如下:

// ListenerContainerConfiguration.java @Override // 实现自 SmartInitializingSingleton 接口 public void afterSingletonsInstantiated() { // <1> 获得所有 @RocketMQMessageListener 注解的 Bean 们 Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class); // 遍历 beans 数组,生成(注册)对应的 DefaultRocketMQListenerContainer Bean 对象。 if (Objects.nonNull(beans)) { beans.forEach(this::registerContainer); } } <1> 处,获得所有 @RocketMQMessageListener 注解的 Bean 们。<2> 处,遍历 beans 数组,调用 #registerContainer(String beanName, Object bean) 方法,生成(注册)对应的 DefaultRocketMQListenerContainer Bean 对象。详细解析,见 「6.4.3 registerContainer」 中。6.4.3 registerContainer

#registerContainer(String beanName, Object bean) 方法,生成(注册)对应的 DefaultRocketMQListenerContainer Bean 对象。代码如下:

// ListenerContainerConfiguration.java private void registerContainer(String beanName, Object bean) { // <1.1> 获得 Bean 对应的 Class 类名。因为有可能被 AOP 代理过。 Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean); // <1.2> 如果未实现 RocketMQListener 接口,直接抛出 IllegalStateException 异常。 if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName()); } // <1.3> 获得 @RocketMQMessageListener 注解 RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); // <1.4> 校验注解配置 validate(annotation); // <2.1> 生成 DefaultRocketMQListenerContainer Bean 的名字 String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(), counter.incrementAndGet()); GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext; // <2.2> 创建 DefaultRocketMQListenerContainer Bean 对象,并注册到 Spring 容器中。 genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () -> createRocketMQListenerContainer(bean, annotation)); // <3.1> 从 Spring 容器中,获得刚注册的 DefaultRocketMQListenerContainer Bean 对象 DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName, DefaultRocketMQListenerContainer.class); // <3.2> 如果未启动,则进行启动 if (!container.isRunning()) { try { container.start(); } catch (Exception e) { log.error("Started container failed. {}", container, e); throw new RuntimeException(e); } } // 打印日志 log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName); } <1.1> 处,获得 Bean 对应的 Class 类名。因为有可能被 AOP 代理过。<1.2> 处,如果未实现 RocketMQListener 接口,直接抛出 IllegalStateException 异常。<1.3> 处,获得 @RocketMQMessageListener 注解。<1.4> 处,调用 #validate(RocketMQMessageListener annotation) 方法,校验注解配置。代码如下: // ListenerContainerConfiguration.java private void validate(RocketMQMessageListener annotation) { // 禁止顺序消费 + 广播消费 if (annotation.consumeMode() == ConsumeMode.ORDERLY && annotation.messageModel() == MessageModel.BROADCASTING) { throw new BeanDefinitionValidationException("Bad annotation definition in @RocketMQMessageListener, messageModel BROADCASTING does not support ORDERLY message!"); } } <2.1> 处,生成 DefaultRocketMQListenerContainer Bean 的名字。此处,就可以看到 counter 计数器。<2.2> 处,调用 #createRocketMQListenerContainer(Object bean, RocketMQMessageListener annotation) 方法,创建 DefaultRocketMQListenerContainer Bean 对象,然后注册到 Spring 容器中。代码如下: // ListenerContainerConfiguration.java private DefaultRocketMQListenerContainer createRocketMQListenerContainer(Object bean, RocketMQMessageListener annotation) { // 创建 DefaultRocketMQListenerContainer 对象 DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer(); // 设置其属性 container.setNameServer(rocketMQProperties.getNameServer()); container.setTopic(environment.resolvePlaceholders(annotation.topic())); container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup())); container.setRocketMQMessageListener(annotation); container.setRocketMQListener((RocketMQListener) bean); container.setObjectMapper(objectMapper); return container; } <3.1> 处,从 Spring 容器中,获得刚注册的 DefaultRocketMQListenerContainer Bean 对象。? 具体为什么这里要重新 get 一次,艿艿暂时不是很明白。不过呢,先暂时不纠结~继续往下看。<3.2> 处,调用 DefaultRocketMQListenerContainer#isRunning() 方法,判断到未启动,则调用 DefaultRocketMQListenerContainer#start() 方法,则进行启动。? 详细的,后续我们结合 「8.5 DefaultRocketMQListenerContainer」 一起瞅瞅。7. config 包7.1 TransactionHandler

org.apache.rocketmq.spring.config.TransactionHandler ,解析 @RocketMQTransactionListener 注解后,封装的对象。代码如下:

// TransactionHandler.java class TransactionHandler { /** * {@link RocketMQTransactionListener#txProducerGroup()} */ private String name; /** * {@link RocketMQTransactionListener#corePoolSize()} * {@link RocketMQTransactionListener#maximumPoolSize()} * {@link RocketMQTransactionListener#maximumPoolSize()} * {@link RocketMQTransactionListener#keepAliveTime()} * {@link RocketMQTransactionListener#blockingQueueSize()} */ private ThreadPoolExecutor checkExecutor; /** * Bean 的名字 */ private String beanName; /** * 对应的 RocketMQLocalTransactionListener 对象 */ private RocketMQLocalTransactionListener bean; private BeanFactory beanFactory; public void setCheckExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime, int blockingQueueSize) { // 创建 ThreadPoolExecutor 对象 this.checkExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(blockingQueueSize)); } // ... 省略 setting/getting 方法 } 7.2 TransactionHandlerRegistry

TransactionHandlerRegistry 对应的 Bean 对象,在 「6.2.3 transactionHandlerRegistry」 中被创建。

org.apache.rocketmq.spring.config.TransactionHandlerRegistry ,实现 DisposableBean 接口,是 TransactionHandler 的注册表。代码如下:

// TransactionHandlerRegistry.java public class TransactionHandlerRegistry implements DisposableBean { private RocketMQTemplate rocketMQTemplate; /** * {@link TransactionHandler#name} 的 集合 */ private final Set<String> listenerContainers = new ConcurrentSet<>(); public TransactionHandlerRegistry(RocketMQTemplate template) { this.rocketMQTemplate = template; } @Override public void destroy() throws Exception { listenerContainers.clear(); } public void registerTransactionHandler(TransactionHandler handler) throws MQClientException { // <1.1> 不能声明重复的 TransactionHandler if (listenerContainers.contains(handler.getName())) { throw new MQClientException(-1, String.format("The transaction name [%s] has been defined in TransactionListener [%s]", handler.getName(), handler.getBeanName())); } // <1.2> 添加到 listenerContainers 中 listenerContainers.add(handler.getName()); // <2> 创建并启动 TransactionMQProducer rocketMQTemplate.createAndStartTransactionMQProducer(handler.getName(), handler.getListener(), handler.getCheckExecutor()); } } 重心在于 #registerTransactionHandler(TransactionHandler handler) 方法,注册 TransactionHandler 对象。而正如我们在 「7.1 TransactionHandler」 所说,TransactionHandler 目前仅来自 @RocketMQTransactionListener 注解。<1.1> 处,通过 listenerContainers 里来判断是否存在, 不能声明有重复名字的 TransactionHandler 。<1.2> 处,添加到 listenerContainers 中。<2> 处,调用 RocketMQTemplate#createAndStartTransactionMQProducer(String txProducerGroup, RocketMQLocalTransactionListener transactionListener, ExecutorService executorService) 方法,创建并启动 TransactionMQProducer 。后续的逻辑,关于 RocketMQTemplate 类,详细解析,见 「9.4 RocketMQTemplate」 中。7.3 RocketMQTransactionAnnotationProcessor

TransactionHandlerRegistry 对应的 Bean 对象,在 「6.2.3 transactionHandlerRegistry」 中被创建。

org.apache.rocketmq.spring.config.RocketMQTransactionAnnotationProcessor ,实现 BeanPostProcessor、BeanFactoryAware、Ordered 接口,扫描 @RocketMQTransactionListener 注解的 Bean 对象,注册到 TransactionHandlerRegistry 中。

7.3.1 构造方法// RocketMQTransactionAnnotationProcessor.java public class RocketMQTransactionAnnotationProcessor implements BeanPostProcessor, Ordered, BeanFactoryAware { private static final Logger log = LoggerFactory.getLogger(RocketMQTransactionAnnotationProcessor.class); private BeanFactory beanFactory; /** * 不处理的类的集合 */ private final Set<Class<?>> nonProcessedClasses = Collections.newSetFromMap(new ConcurrentHashMap<>(64)); private TransactionHandlerRegistry transactionHandlerRegistry; public RocketMQTransactionAnnotationProcessor(TransactionHandlerRegistry transactionHandlerRegistry) { this.transactionHandlerRegistry = transactionHandlerRegistry; } @Override // 实现自 BeanFactoryAware 接口 public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.beanFactory = beanFactory; } @Override // 实现自 BeanPostProcessor 接口 public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean; } @Override // 实现自 Ordered 接口 public int getOrder() { return LOWEST_PRECEDENCE; } } 7.3.2 postProcessAfterInitialization

实现 #postProcessAfterInitialization(Object bean, String beanName) 方法,扫描 @RocketMQTransactionListener 注解的 Bean 对象,注册到 TransactionHandlerRegistry 中。代码如下:

// RocketMQTransactionAnnotationProcessor.java @Override // 实现自 BeanPostProcessor 接口 public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { // <1.1> 如果 nonProcessedClasses 不存在 if (!this.nonProcessedClasses.contains(bean.getClass())) { // <1.2> 获得 Bean 对应的 Class 类名。因为有可能被 AOP 代理过。 Class<?> targetClass = AopUtils.getTargetClass(bean); // <1.3> 添加到 nonProcessedClasses 中,表示后面不处理。 this.nonProcessedClasses.add(bean.getClass()); // <2.1> 获得 @RocketMQTransactionListener 注解 RocketMQTransactionListener listener = AnnotationUtils.findAnnotation(targetClass, RocketMQTransactionListener.class); // <2.2> 如果无注解,则不进行任何逻辑 if (listener == null) { // for quick search log.trace("No @RocketMQTransactionListener annotations found on bean type: {}", bean.getClass()); } else { // <2.3> 如果有注解,则注册到 TransactionHandlerRegistry 中 try { processTransactionListenerAnnotation(listener, bean); } catch (MQClientException e) { log.error("Failed to process annotation " + listener, e); throw new BeanCreationException("Failed to process annotation " + listener, e); } } } return bean; } <1.1> 处,如果 nonProcessedClasses 不存在。<1.2> 处,获得 Bean 对应的 Class 类名。因为有可能被 AOP 代理过。<1.3> 处,添加到 nonProcessedClasses 中,表示后面不处理。从本质上来说,通过使用 nonProcessedClasses ,来保证一个 @RocketMQTransactionListener 注解的 Bean 对象,只会被处理一次。? 不过貌似,也没碰到会处理多次的情况呀~<2.1> 处,获得 @RocketMQTransactionListener 注解。<2.2> 处,如果无注解,则不进行任何逻辑。<2.3> 处,如果有注解,则调用 #processTransactionListenerAnnotation(RocketMQTransactionListener listener, Object bean) 方法,注册 @RocketMQTransactionListener 到 TransactionHandlerRegistry 中。7.3.3 processTransactionListenerAnnotation

#processTransactionListenerAnnotation(RocketMQTransactionListener listener, Object bean) 方法,注册 @RocketMQTransactionListener 到 TransactionHandlerRegistry 中。代码如下:

// RocketMQTransactionAnnotationProcessor.java private void processTransactionListenerAnnotation(RocketMQTransactionListener listener, Object bean) throws MQClientException { // <1.1> 校验 @RocketMQTransactionListener 非空 if (transactionHandlerRegistry == null) { throw new MQClientException("Bad usage of @RocketMQTransactionListener, " + "the class must work with RocketMQTemplate", null); } // <1.2> 如果未实现 RocketMQLocalTransactionListener 接口,直接抛出 IllegalStateException 异常。 if (!RocketMQLocalTransactionListener.class.isAssignableFrom(bean.getClass())) { throw new MQClientException("Bad usage of @RocketMQTransactionListener, " + "the class must implement interface RocketMQLocalTransactionListener", null); } // <2> 将 @RocketMQTransactionListener 注解,创建成 TransactionHandler 对象 TransactionHandler transactionHandler = new TransactionHandler(); transactionHandler.setBeanFactory(this.beanFactory); transactionHandler.setName(listener.txProducerGroup()); transactionHandler.setBeanName(bean.getClass().getName()); transactionHandler.setListener((RocketMQLocalTransactionListener) bean); transactionHandler.setCheckExecutor(listener.corePoolSize(), listener.maximumPoolSize(), listener.keepAliveTime(), listener.blockingQueueSize()); // <3> 注册 TransactionHandler 到 transactionHandlerRegistry 中 transactionHandlerRegistry.registerTransactionHandler(transactionHandler); } <1.1> 处,校验 @RocketMQTransactionListener 注解非空。<1.2> 处,如果未实现 RocketMQLocalTransactionListener 接口,直接抛出 IllegalStateException 异常。<2> 处,将 @RocketMQTransactionListener 注解,创建成 TransactionHandler 对象。<3> 处,调用 TransactionHandlerRegistry#registerTransactionHandler(TransactionHandler handler) 方法,注册 TransactionHandler 对象。8. support 包8.1 RocketMQHeaders

org.apache.rocketmq.spring.support.RocketMQHeaders ,RocketMQ Message Header 属性名的枚举。代码如下:

// RocketMQHeaders.java /** * Represents the RocketMQ message protocol that is used during the data exchange. */ public class RocketMQHeaders { public static final String KEYS = "KEYS"; public static final String TAGS = "TAGS"; public static final String TOPIC = "TOPIC"; public static final String MESSAGE_ID = "MESSAGE_ID"; public static final String BORN_TIMESTAMP = "BORN_TIMESTAMP"; public static final String BORN_HOST = "BORN_HOST"; public static final String FLAG = "FLAG"; public static final String QUEUE_ID = "QUEUE_ID"; public static final String SYS_FLAG = "SYS_FLAG"; public static final String TRANSACTION_ID = "TRANSACTION_ID"; public static final String PROPERTIES = "PROPERTIES"; } 8.2 RocketMQUtil

org.apache.rocketmq.spring.support.RocketMQUtil ,RocketMQ 工具类。我们先不看这里的代码解析,等到需要看的时候,艿艿会专门提到。

Let's Go ~我们先跳到 「8.5 DefaultRocketMQListenerContainer」 中。

8.2.1 convertToRocketMessage

#convertToRocketMessage(objectMapper, charset, destination, message) 方法,将 message 转换成 RocketMQ Message 对象。代码如下:

// RocketMQUtil.java public static org.apache.rocketmq.common.message.Message convertToRocketMessage(ObjectMapper objectMapper, String charset, String destination, org.springframework.messaging.Message<?> message) { // 生成消息的 bytes 数组 Object payloadObj = message.getPayload(); byte[] payloads; if (payloadObj instanceof String) { payloads = ((String) payloadObj).getBytes(Charset.forName(charset)); } else if (payloadObj instanceof byte[]) { payloads = (byte[]) message.getPayload(); } else { try { // <X> String jsonObj = objectMapper.writeValueAsString(payloadObj); payloads = jsonObj.getBytes(Charset.forName(charset)); } catch (Exception e) { throw new RuntimeException("convert to RocketMQ message failed.", e); } } // 获得 topic、tag 属性 String[] tempArr = destination.split(":", 2); String topic = tempArr[0]; String tags = ""; if (tempArr.length > 1) { tags = tempArr[1]; } // 创建 Message 对象,传入上述变量到其构造方法 org.apache.rocketmq.common.message.Message rocketMsg = new org.apache.rocketmq.common.message.Message(topic, tags, payloads); // 如果 MessageHeaders 非空,逐个处理 MessageHeaders headers = message.getHeaders(); if (Objects.nonNull(headers) && !headers.isEmpty()) { // 设置 KEYS 属性 Object keys = headers.get(RocketMQHeaders.KEYS); if (!StringUtils.isEmpty(keys)) { // if headers has 'KEYS', set rocketMQ message key rocketMsg.setKeys(keys.toString()); } // 设置 FLAG 属性 Object flagObj = headers.getOrDefault("FLAG", "0"); int flag = 0; try { flag = Integer.parseInt(flagObj.toString()); } catch (NumberFormatException e) { // Ignore it log.info("flag must be integer, flagObj:{}", flagObj); } rocketMsg.setFlag(flag); // 设置 WAIT 属性 Object waitStoreMsgOkObj = headers.getOrDefault("WAIT_STORE_MSG_OK", "true"); boolean waitStoreMsgOK = Boolean.TRUE.equals(waitStoreMsgOkObj); rocketMsg.setWaitStoreMsgOK(waitStoreMsgOK); // 设置 USERS_ 属性 headers.entrySet().stream() .filter(entry -> !Objects.equals(entry.getKey(), RocketMQHeaders.KEYS) && !Objects.equals(entry.getKey(), "FLAG") && !Objects.equals(entry.getKey(), "WAIT_STORE_MSG_OK")) // exclude "KEYS", "FLAG", "WAIT_STORE_MSG_OK" .forEach(entry -> { rocketMsg.putUserProperty("USERS_" + entry.getKey(), String.valueOf(entry.getValue())); // add other properties with prefix "USERS_" }); } return rocketMsg; } 比较简单,就是创建 RocketMQ Message 对象的过程。<X> 处,我们会看到使用 objectMapper 写入,使用 JSON 序列化,将 messageType 转换成 String 类型。8.2.2 convert

#convert(RocketMQLocalTransactionListener listener) 方法,将 Spring Boot RocketMQ RocketMQLocalTransactionListener 监听器,转换成 RocketMQ TransactionListener 监听器。代码如下:

// RocketMQUtil.java public static TransactionListener convert(RocketMQLocalTransactionListener listener) { return new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object obj) { // <1> 转换 RocketMQ Message 转换成 Spring Message 对象 // <2> 回调 RocketMQLocalTransactionListener 监听器 RocketMQLocalTransactionState state = listener.executeLocalTransaction(convertToSpringMessage(message), obj); // <3> 转换 Spring Boot RocketMQ RocketMQLocalTransactionState 事务状态,成 RocketMQ LocalTransactionState 事务状态 return convertLocalTransactionState(state); } @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { // <1> 转换 RocketMQ Message 转换成 Spring Message 对象 // <2> 回调 RocketMQLocalTransactionListener 监听器 RocketMQLocalTransactionState state = listener.checkLocalTransaction(convertToSpringMessage(messageExt)); // <3> 转换 Spring Boot RocketMQ RocketMQLocalTransactionState 事务状态,成 RocketMQ LocalTransactionState 事务状态 return convertLocalTransactionState(state); } }; } <1> 处,调用 #convertToSpringMessage(Message message) 方法,转换 RocketMQ Message 转换成 Spring Message 对象。代码如下: // RocketMQUtil.java // checkLocalTransaction 调用 public static org.springframework.messaging.Message convertToSpringMessage(org.apache.rocketmq.common.message.MessageExt message) { org.springframework.messaging.Message retMessage = MessageBuilder.withPayload(message.getBody()). setHeader(RocketMQHeaders.KEYS, message.getKeys()). setHeader(RocketMQHeaders.TAGS, message.getTags()). setHeader(RocketMQHeaders.TOPIC, message.getTopic()). setHeader(RocketMQHeaders.MESSAGE_ID, message.getMsgId()). setHeader(RocketMQHeaders.BORN_TIMESTAMP, message.getBornTimestamp()). setHeader(RocketMQHeaders.BORN_HOST, message.getBornHostString()). setHeader(RocketMQHeaders.FLAG, message.getFlag()). setHeader(RocketMQHeaders.QUEUE_ID, message.getQueueId()). setHeader(RocketMQHeaders.SYS_FLAG, message.getSysFlag()). setHeader(RocketMQHeaders.TRANSACTION_ID, message.getTransactionId()). setHeader(RocketMQHeaders.PROPERTIES, message.getProperties()). build(); return retMessage; } // executeLocalTransaction 调用 public static org.springframework.messaging.Message convertToSpringMessage(org.apache.rocketmq.common.message.Message message) { org.springframework.messaging.Message retMessage = MessageBuilder.withPayload(message.getBody()). setHeader(RocketMQHeaders.KEYS, message.getKeys()). setHeader(RocketMQHeaders.TAGS, message.getTags()). setHeader(RocketMQHeaders.TOPIC, message.getTopic()). setHeader(RocketMQHeaders.FLAG, message.getFlag()). setHeader(RocketMQHeaders.TRANSACTION_ID, message.getTransactionId()). setHeader(RocketMQHeaders.PROPERTIES, message.getProperties()). build(); return retMessage; } <2> 处,回调 RocketMQLocalTransactionListener 监听器的对应的方法。<3> 处,调用 #convertLocalTransactionState(RocketMQLocalTransactionState state) 方法,转换 Spring Boot RocketMQ RocketMQLocalTransactionState 事务状态,成 RocketMQ LocalTransactionState 事务状态。代码如下: // RocketMQUtil.java private static LocalTransactionState convertLocalTransactionState(RocketMQLocalTransactionState state) { switch (state) { case UNKNOWN: return LocalTransactionState.UNKNOW; case COMMIT: return LocalTransactionState.COMMIT_MESSAGE; case ROLLBACK: return LocalTransactionState.ROLLBACK_MESSAGE; } // Never happen log.warn("Failed to covert enum type RocketMQLocalTransactionState.%s", state); return LocalTransactionState.UNKNOW; } 8.4 RocketMQListenerContainer

org.apache.rocketmq.spring.support.RocketMQListenerContainer ,实现 DisposableBean 接口,RocketMQ 消费者 Listener 容器(Container)接口。代码如下:

// RocketMQListenerContainer.java public interface RocketMQListenerContainer extends DisposableBean { /** * Setup the message listener to use. Throws an {@link IllegalArgumentException} if that message listener type is * not supported. */ void setupMessageListener(RocketMQListener<?> messageListener); } 关于 #setupMessageListener(RocketMQListener<?> messageListener) 的实现方法,我们会在 DefaultRocketMQListenerContainer 中看到。8.5 DefaultRocketMQListenerContainer

org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer ,实现 InitializingBean、RocketMQListenerContainer、SmartLifecycle 接口,DefaultRocketMQListener(RocketMQ 消费者的监听器)容器,负责创建 DefaultRocketMQListener 对象,并启动其对应的 DefaultMQPushConsumer(消费者),从而消费消息。

8.5.1 构造方法// DefaultRocketMQListenerContainer.java public class DefaultRocketMQListenerContainer implements InitializingBean, RocketMQListenerContainer, SmartLifecycle { private final static Logger log = LoggerFactory.getLogger(DefaultRocketMQListenerContainer.class); private long suspendCurrentQueueTimeMillis = 1000; /** * Message consume retry strategy<br> -1,no retry,put into DLQ directly<br> 0,broker control retry frequency<br> * >0,client control retry frequency. */ private int delayLevelWhenNextConsume = 0; private String nameServer; private String consumerGroup; private String topic; private int consumeThreadMax = 64; private String charset = "UTF-8"; private ObjectMapper objectMapper; private RocketMQListener rocketMQListener; private RocketMQMessageListener rocketMQMessageListener; /** * DefaultMQPushConsumer 对象。 * * 在 {@link #initRocketMQPushConsumer()} 方法中,进行创建 */ private DefaultMQPushConsumer consumer; /** * 消息类型 */ private Class messageType; /** * 是否在运行中 */ private boolean running; // The following properties came from @RocketMQMessageListener. private ConsumeMode consumeMode; private SelectorType selectorType; private String selectorExpression; private MessageModel messageModel; // 省略各种 setting/getting 方法 } 8.5.2 afterPropertiesSet

实现 #afterPropertiesSet() 方法,代码如下:

// DefaultRocketMQListenerContainer.java @Override public void afterPropertiesSet() throws Exception { // 初始化 DefaultMQPushConsumer 对象 initRocketMQPushConsumer(); // 获得 messageType 属性 this.messageType = getMessageType(); log.debug("RocketMQ messageType: {}", messageType.getName()); } <1> 处,初始化 DefaultMQPushConsumer 对象。详细解析,见 「8.5.2 initRocketMQPushConsumer」 。<2> 处,调用 #getMessageType() 方法,获得 messageType 属性。代码如下:// DefaultRocketMQListenerContainer.java private Class getMessageType() { // 获得 Bean 对应的 Class 类名。因为有可能被 AOP 代理过。 Class<?> targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener); // 获得接口的 Type 数组 Type[] interfaces = targetClass.getGenericInterfaces(); Class<?> superclass = targetClass.getSuperclass(); while ((Objects.isNull(interfaces) || 0 == interfaces.length) && Objects.nonNull(superclass)) { // 此处,是以父类的接口为准 interfaces = superclass.getGenericInterfaces(); superclass = targetClass.getSuperclass(); } if (Objects.nonNull(interfaces)) { // 遍历 interfaces 数组 for (Type type : interfaces) { // 要求 type 是泛型参数 if (type instanceof ParameterizedType) { ParameterizedType parameterizedType = (ParameterizedType) type; // 要求是 RocketMQListener 接口 if (Objects.equals(parameterizedType.getRawType(), RocketMQListener.class)) { Type[] actualTypeArguments = parameterizedType.getActualTypeArguments(); // 取首个元素 if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) { return (Class) actualTypeArguments[0]; } else { return Object.class; } } } } return Object.class; } else { return Object.class; } } 没啥好说,不理解的胖友,可以看看 《Java 中的 Type 类型详解》 。当然,也可以不看,知道意思就好列。8.5.2.1 initRocketMQPushConsumer

#initRocketMQPushConsumer() 方法,初始化 DefaultMQPushConsumer 对象。代码如下:

// DefaultRocketMQListenerContainer.java private void initRocketMQPushConsumer() throws MQClientException { Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required"); Assert.notNull(consumerGroup, "Property 'consumerGroup' is required"); Assert.notNull(nameServer, "Property 'nameServer' is required"); Assert.notNull(topic, "Property 'topic' is required"); // <1> 创建 DefaultMQPushConsumer 对象 consumer = new DefaultMQPushConsumer(consumerGroup); // <2.1> 设置其属性 consumer.setNamesrvAddr(nameServer); consumer.setConsumeThreadMax(consumeThreadMax); if (consumeThreadMax < consumer.getConsumeThreadMin()) { consumer.setConsumeThreadMin(consumeThreadMax); } // <2.2> 设置 messageModel 属性 switch (messageModel) { case BROADCASTING: consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING); break; case CLUSTERING: consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING); break; default: throw new IllegalArgumentException("Property 'messageModel' was wrong."); } // <2.3> 设置 selectorType 属性 switch (selectorType) { case TAG: consumer.subscribe(topic, selectorExpression); break; case SQL92: consumer.subscribe(topic, MessageSelector.bySql(selectorExpression)); break; default: throw new IllegalArgumentException("Property 'selectorType' was wrong."); } // <2.4> 设置 messageListener 属性 switch (consumeMode) { case ORDERLY: consumer.setMessageListener(new DefaultMessageListenerOrderly()); break; case CONCURRENTLY: consumer.setMessageListener(new DefaultMessageListenerConcurrently()); break; default: throw new IllegalArgumentException("Property 'consumeMode' was wrong."); } // <3> 如果实现了 RocketMQPushConsumerLifecycleListener 接口,则调用 prepareStart 方法,执行准备初始化的方法 if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) { ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer); } } <1> 处,创建 DefaultMQPushConsumer 对象。<2.1> 处,设置其属性。<2.2> 处,设置 messageModel 属性。<2.3> 处,设置 selectorType 属性。<2.4> 处,设置 messageListener 属性。其中,我们会看到,根据不同的 consumeMode 值,创建不同的 MessageListener 对象。所以,我们放在 「8.5.2.2 DefaultMessageListenerOrderly」 和 「8.5.2.3 DefaultMessageListenerConcurrently」 中解析。<3> 处,如果实现了 RocketMQPushConsumerLifecycleListener 接口,则调用 RocketMQPushConsumerLifecycleListener#prepareStart(consumer) 方法,执行准备初始化的方法。8.5.2.2 DefaultMessageListenerConcurrently

DefaultMessageListenerConcurrently ,是 DefaultRocketMQListenerContainer 的内部类,实现 org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently 接口,提供并发消费。代码如下:

// DefaultRocketMQListenerContainer#DefaultMessageListenerConcurrently.java public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently { @SuppressWarnings("unchecked") @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt messageExt : msgs) { log.debug("received msg: {}", messageExt); try { long now = System.currentTimeMillis(); // <1> 转换消息 // <2> 执行消费 rocketMQListener.onMessage(doConvertMessage(messageExt)); // 打印日志 long costTime = System.currentTimeMillis() - now; log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime); } catch (Exception e) { // <3.1> 发生异常,返回稍后再消费 log.warn("consume message failed. messageExt:{}", messageExt, e); context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } // <3.2> 返回消费成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } <1> 处,调用 #doConvertMessage(MessageExt messageExt) 方法,将 RocketMQ MessageExt 消息,转换成对应的 messageType 类型的消息。代码如下: // DefaultRocketMQListenerContainer.java private Object doConvertMessage(MessageExt messageExt) { // 如果是 MessageExt 类型,直接返回 if (Objects.equals(messageType, MessageExt.class)) { return messageExt; } else { // 先将 byte 数组(body),转换成 String 类型 String str = new String(messageExt.getBody(), Charset.forName(charset)); // 如果是 String 类型,直接返回 if (Objects.equals(messageType, String.class)) { return str; } else { // If msgType not string, use objectMapper change it. // <X> 使用 objectMapper 读取,使用 JSON 反序列化,将 String 转换成 messageType 类型 try { return objectMapper.readValue(str, messageType); } catch (Exception e) { log.info("convert failed. str:{}, msgType:{}", str, messageType); throw new RuntimeException("cannot convert message to " + messageType, e); } } } } 总体代码比较简单,胖友自己瞅瞅。在 <X> 处,我们会看到使用 objectMapper 读取,使用 JSON 反序列化,将 String 转换成 messageType 类型。<2> 处,调用 RocketMQListener#onMessage(T message) 方法,消费消息。<3.1> 处,发生异常,返回稍后再消费。<3.2> 处,返回消费成功。并发消费的逻辑,是 RocketMQ Consumer 内部逻辑所处理的。8.5.2.3 DefaultMessageListenerOrderly

DefaultMessageListenerOrderly ,是 DefaultRocketMQListenerContainer 的内部类,实现 org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly 接口,提供顺序消费。代码如下:

// DefaultRocketMQListenerContainer#DefaultMessageListenerOrderly.java public class DefaultMessageListenerOrderly implements MessageListenerOrderly { @SuppressWarnings("unchecked") @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { // 遍历 MessageExt 消息 for (MessageExt messageExt : msgs) { log.debug("received msg: {}", messageExt); try { long now = System.currentTimeMillis(); // 转换消息 // 执行消费 rocketMQListener.onMessage(doConvertMessage(messageExt)); // 打印日志 long costTime = System.currentTimeMillis() - now; log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime); } catch (Exception e) { // 发生异常,设置中断消费一会 log.warn("consume message failed. messageExt:{}", messageExt, e); context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } // 返回消费成功 return ConsumeOrderlyStatus.SUCCESS; } } 总体逻辑,和 「8.5.2.2 DefaultMessageListenerConcurrently」 类似,就不重复解析了。8.5.3 start

实现 #start() 方法,启动 DefaultMQPushConsumer 。代码如下:

// DefaultRocketMQListenerContainer.java @Override public void start() { // 如果已经启动,则抛出 IllegalStateException 异常 if (this.isRunning()) { throw new IllegalStateException("container already running. " + this.toString()); } // 启动 DefaultMQPushConsumer try { consumer.start(); } catch (MQClientException e) { throw new IllegalStateException("Failed to start RocketMQ push consumer", e); } // 标记已经启动 this.setRunning(true); log.info("running container: {}", this.toString()); } @Override // 实现自 SmartLifecycle->Lifecycle 接口 public boolean isRunning() { return running; } private void setRunning(boolean running) { this.running = running; } @Override // 实现自 SmartLifecycle 接口 public boolean isAutoStartup() { return true; } @Override // 实现自 SmartLifecycle->Phased 接口 public int getPhase() { // Returning Integer.MAX_VALUE only suggests that // we will be the first bean to shutdown and last bean to start return Integer.MAX_VALUE; } 8.5.4 stop

实现 #stop() 方法,关闭 DefaultMQPushConsumer 。代码如下:

// DefaultRocketMQListenerContainer.java @Override public void stop() { // 必须在运行中 if (this.isRunning()) { // 关闭 DefaultMQPushConsumer if (Objects.nonNull(consumer)) { consumer.shutdown(); } // 标记不在启动 setRunning(false); } } @Override public void stop(Runnable callback) { stop(); callback.run(); } 8.5.5 destroy

实现 #destroy() 方法,关闭 DefaultMQPushConsumer 。代码如下:

// DefaultRocketMQListenerContainer.java @Override public void destroy() { // 标记已经停止 this.setRunning(false); // 关闭 DefaultMQPushConsumer if (Objects.nonNull(consumer)) { consumer.shutdown(); } log.info("container destroyed, {}", this.toString()); } 9. core 包9.1 RocketMQListener

org.apache.rocketmq.spring.core.RocketMQListener ,RocketMQ 消费者的消费消息监听器。代码如下:

// RocketMQListener.java public interface RocketMQListener<T> { // <T> 泛型,声明消费的消息类型 /** * 消费消息 * * @param message 消息 */ void onMessage(T message); } 9.2 RocketMQPushConsumerLifecycleListener

org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener ,继承 RocketMQConsumerLifecycleListener 接口,Push 模式的消费者的 RocketMQConsumerLifecycleListener 接口。代码如下:

// RocketMQPushConsumerLifecycleListener.java public interface RocketMQPushConsumerLifecycleListener extends RocketMQConsumerLifecycleListener<DefaultMQPushConsumer> { } 目前暂无任何方法。使用时,建议实现该接口。未来来说,估计会新增 RocketMQPullConsumerLifecycleListener 接口,Pull 模式的消费者的 RocketMQConsumerLifecycleListener 接口。9.3 RocketMQLocalTransactionListener

org.springframework.messaging.Message.RocketMQLocalTransactionListener ,RocketMQ 本地事务监听器接口。代码如下:

// RocketMQLocalTransactionListener.java public interface RocketMQLocalTransactionListener { /** * 执行本地事务 * * @param msg 消息 * @param arg 方法参数 * @return 本地事务状态 */ RocketMQLocalTransactionState executeLocalTransaction(final Message msg, final Object arg); /** * 检查本地事务状态 * * @param msg 消息 * @return 本地事务状态 */ RocketMQLocalTransactionState checkLocalTransaction(final Message msg); } 其中,org.apache.rocketmq.spring.core.RocketMQLocalTransactionState ,RocketMQ 本地事务状态。代码如下:// RocketMQLocalTransactionState.java public enum RocketMQLocalTransactionState { /** * 已提交 */ COMMIT, /** * 已回滚 */ ROLLBACK, /** * 未知 */ UNKNOWN } 9.4 RocketMQTemplate

org.apache.rocketmq.spring.core.RocketMQTemplate ,继承 AbstractMessageSendingTemplate 抽象类,实现 InitializingBean、DisposableBean 接口,RocketMQ 客户端的方法模板类。

9.4.1 构造方法// RocketMQTemplate.java private static final Logger log = LoggerFactory.getLogger(RocketMQTemplate.class); /** * 消息生产者 */ private DefaultMQProducer producer; private ObjectMapper objectMapper; private String charset = "UTF-8"; /** * 消息队列选择器 */ private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash(); /** * TransactionMQProducer 的映射 * * KEY:{@link TransactionMQProducer#getProducerGroup()} 事务生产者对应的分组 */ private final Map<String, TransactionMQProducer> cache = new ConcurrentHashMap<>(); //only put TransactionMQProducer by now!!! 9.4.2 afterPropertiesSet

实现 #afterPropertiesSet() 方法,启动 DefaultMQProducer 。代码如下:

// RocketMQTemplate.java @Override public void afterPropertiesSet() throws Exception { Assert.notNull(producer, "Property 'producer' is required"); producer.start(); } 9.4.3 发送消息

发送消息的方法非常多,如下:

syncSendsyncSendOrderlyasyncSendasyncSendOrderlysendOneWaysendOneWayOrderly

#syncSend(String destination, Message<?> message)方法,同步发送消息。代码如下:

// RocketMQTemplate.java /** * <p> Send message in synchronous mode. This method returns only when the sending procedure totally completes. * Reliable synchronous transmission is used in extensive scenes, such as important notification messages, SMS * notification, SMS marketing system, etc.. </p> * <p> * <strong>Warn:</strong> this method has internal retry-mechanism, that is, internal implementation will retry * {@link DefaultMQProducer#getRetryTimesWhenSendFailed} times before claiming failure. As a result, multiple * messages may potentially delivered to broker(s). It's up to the application developers to resolve potential * duplication issue. * * @param destination formats: `topicName:tags` * @param message {@link org.springframework.messaging.Message} * @return {@link SendResult} */ public SendResult syncSend(String destination, Message<?> message) { return syncSend(destination, message, producer.getSendMsgTimeout()); } public SendResult syncSend(String destination, Message<?> message, long timeout) { return syncSend(destination, message, timeout, 0); } public SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel) { // <1> 校验消息 if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { log.error("syncSend failed. destination:{}, message is null ", destination); throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); } try { long now = System.currentTimeMillis(); // <2> 将 message 转换成 RocketMQ Message 对象 org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper, charset, destination, message); // <3> 设置 delayLevel 属性 if (delayLevel > 0) { rocketMsg.setDelayTimeLevel(delayLevel); } // <4> 同步发送消息 SendResult sendResult = producer.send(rocketMsg, timeout); // 打印日志 long costTime = System.currentTimeMillis() - now; log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId()); return sendResult; } catch (Exception e) { log.error("syncSend failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); } } <1> 处,校验消息。<2> 处,调用 RocketMQUtil#convertToRocketMessage(objectMapper, charset, destination, message) 方法,将 message 转换成 RocketMQ Message 对象。此时,我们可以跳转到 「8.2.1 convertToRocketMessage」 。当然,也可以不看。嘿嘿~<3> 处,设置 delayLevel 属性。<4> 处,调用 DefaultMQProducer#send(rocketMsg, timeout) 方法,同步发送消息。另外,有一点要注意,传入的方法 message 参数,是 org.apache.rocketmq.common.message.Message 对象。所以说,Spring Boot RocketMQ 库,还有一个作用,是屏蔽底层的 RocketMQ 的存在,这样呢,未来如果我们希望替换掉 RocketMQ ,也是非常方便的。? 算是一个“彩蛋”。感兴趣的胖友,可以再看看其它地方,也会发现有这样一个设计目的。9.4.4 destroy

实现 #destroy() 方法,关闭生产者们。代码如下:

// RocketMQTemplate.java @Override // 实现自 DisposableBean 接口 public void destroy() { // 关闭 producer if (Objects.nonNull(producer)) { producer.shutdown(); } // 关闭 cache for (Map.Entry<String, TransactionMQProducer> kv : cache.entrySet()) { if (Objects.nonNull(kv.getValue())) { kv.getValue().shutdown(); } } cache.clear(); } 9.4.5 createAndStartTransactionMQProducer

#createAndStartTransactionMQProducer(String txProducerGroup, RocketMQLocalTransactionListener transactionListener, ExecutorService executorService) 方法,创建并启动 TransactionMQProducer 对象。代码如下:

// RocketMQTemplate.java /** * Create and start a transaction MQProducer, this new producer is cached in memory. * <p>Note: This method is invoked internally when processing {@code @RocketMQLocalTransactionListener}, it is not * recommended to directly use this method by user. * * @param txProducerGroup Producer (group) name, unique for each producer * @param transactionListener TransactoinListener impl class * @param executorService Nullable. * @return true if producer is created and started; false if the named producer already exists in cache. * @throws MessagingException */ public boolean createAndStartTransactionMQProducer(String txProducerGroup, RocketMQLocalTransactionListener transactionListener, ExecutorService executorService) throws MessagingException { // <1> 如果已经存在,则直接返回 txProducerGroup = getTxProducerGroupName(txProducerGroup); if (cache.containsKey(txProducerGroup)) { log.info(String.format("get TransactionMQProducer '%s' from cache", txProducerGroup)); return false; } // <2> 创建 TransactionMQProducer 对象 TransactionMQProducer txProducer = createTransactionMQProducer(txProducerGroup, transactionListener, executorService); try { // <3> 启动 TransactionMQProducer 对象 txProducer.start(); // <4> 添加到 cache 中 cache.put(txProducerGroup, txProducer); } catch (MQClientException e) { throw RocketMQUtil.convert(e); } return true; } <1> 处,如果已经存在,则直接返回。<2> 处,调用 #createAndStartTransactionMQProducer(String name, RocketMQLocalTransactionListener transactionListener, ExecutorService executorService) 方法,创建 TransactionMQProducer 对象。代码如下: // RocketMQTemplate.java private TransactionMQProducer createTransactionMQProducer(String name, RocketMQLocalTransactionListener transactionListener, ExecutorService executorService) { Assert.notNull(producer, "Property 'producer' is required"); Assert.notNull(transactionListener, "Parameter 'transactionListener' is required"); // 创建 TransactionMQProducer 对象 TransactionMQProducer txProducer = new TransactionMQProducer(name); // <X> 转换监听器,并设置到 txProducer 中 txProducer.setTransactionListener(RocketMQUtil.convert(transactionListener)); // 设置其它属性 txProducer.setNamesrvAddr(producer.getNamesrvAddr()); if (executorService != null) { txProducer.setExecutorService(executorService); } txProducer.setSendMsgTimeout(producer.getSendMsgTimeout()); txProducer.setRetryTimesWhenSendFailed(producer.getRetryTimesWhenSendFailed()); txProducer.setRetryTimesWhenSendAsyncFailed(producer.getRetryTimesWhenSendAsyncFailed()); txProducer.setMaxMessageSize(producer.getMaxMessageSize()); txProducer.setCompressMsgBodyOverHowmuch(producer.getCompressMsgBodyOverHowmuch()); txProducer.setRetryAnotherBrokerWhenNotStoreOK(producer.isRetryAnotherBrokerWhenNotStoreOK()); return txProducer; } <X> 处,会调用 RocketMQUtil#convert(RocketMQLocalTransactionListener listener) 方法,将 RocketMQLocalTransactionListener 转换成 RocketMQ TransactionListener 的监听器。详细解析,见 「8.5.3 convert」 。当然,也可以不看,哈哈哈哈。<3> 处,启动 TransactionMQProducer 对象。<4> 处,添加到 cache 中。9.4.6 sendMessageInTransaction

#sendMessageInTransaction(final String txProducerGroup, final String destination, final Message<?> message, final Object arg) 方法,发送事务消息。代码如下:

// RocketMQTemplate.java /** * Send Spring Message in Transaction * * @param txProducerGroup the validate txProducerGroup name, set null if using the default name * @param destination destination formats: `topicName:tags` * @param message message {@link org.springframework.messaging.Message} * @param arg ext arg * @return TransactionSendResult * @throws MessagingException */ public TransactionSendResult sendMessageInTransaction(final String txProducerGroup, final String destination, final Message<?> message, final Object arg) throws MessagingException { try { // <1> 获得 TransactionMQProducer 对象 TransactionMQProducer txProducer = this.stageMQProducer(txProducerGroup); // <2> 将 message 转换成 RocketMQ Message 对象 org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper, charset, destination, message); // <3> 发送事务消息 return txProducer.sendMessageInTransaction(rocketMsg, arg); } catch (MQClientException e) { throw RocketMQUtil.convert(e); } } <1> 处,调用 #stageMQProducer(String name) 方法,获得 TransactionMQProducer 对象。代码如下: // RocketMQTemplate.java private TransactionMQProducer stageMQProducer(String name) throws MessagingException { name = getTxProducerGroupName(name); // 获得 TransactionMQProducer 对象 TransactionMQProducer cachedProducer = cache.get(name); if (cachedProducer == null) { throw new MessagingException(String.format("Can not found MQProducer '%s' in cache! please define @RocketMQLocalTransactionListener class or invoke createOrGetStartedTransactionMQProducer() to create it firstly", name)); } return cachedProducer; } private String getTxProducerGroupName(String name) { return name == null ? RocketMQConfigUtils.ROCKETMQ_TRANSACTION_DEFAULT_GLOBAL_NAME : name; } <2> 处,将 message 转换成 RocketMQ Message 对象。<3> 处,调用 TransactionMQProducer#sendMessageInTransaction(Message msg, Object arg) 方法,发送事务消息。 ---来自腾讯云社区的---芋道源码

关于作者: 瞎采新闻

这里可以显示个人介绍!这里可以显示个人介绍!

热门文章

留言与评论(共有 0 条评论)
   
验证码: