消息总线
消息总线
功能描述
blade-mq-spring-boot-starter 是基于 SpringBoot 的 MQ启动器,提供了对消息队列的集成和封装,简化了消息队列的配置和使用。该启动器支持公共消息总线、点对点的服务总线主题,点对点服务总线支持版本控制功能。
GAV 坐标
blade 全模块 2.2.5+ 版本,支持 SpringBoot 2.0+、jdk21+
<dependency>
<groupId>team.aikero.blade</groupId>
<artifactId>blade-mq-spring-boot-starter</artifactId>
</dependency>implementation(commonLibs.blade.mq.spring.boot.starter)blade 全模块 3.0.0+ 版本,支持 SpringBoot 3.0+、jdk21+
<dependency>
<groupId>team.aikero.blade</groupId>
<artifactId>blade-mq-spring-boot-starter</artifactId>
</dependency>implementation(commonLibs.blade.mq.spring.boot.starter)快速集成
- 选择合适的 GAV,在项目中的 pom.xml 添加依赖。
<dependency>
<groupId>team.aikero.blade</groupId>
<artifactId>blade-mq-spring-boot-starter</artifactId>
</dependency>- 在项目的
application.yaml中添加下方代码。
spring:
config:
import:
- optional:nacos:blade-config <--- 在项目配置之前引入此配置
- optional:nacos:${spring.application.name}详细说明
配置示例
在application.yml文件中,可以进行以下配置:
spring:
rabbitmq:
host: ${rabbit.host}
port: ${rabbit.port}
username: ${rabbit.username}
password: ${rabbit.passwd}
virtual-host: ${rabbit.vhost}
application:
name: blade-mq-spring-boot-starter
blade:
mq:
enabled: true # 是否启用MQ
name: ${spring.application.name} # 应用名称,默认取 spring.application.name
version: 1.0.0 # 应用版本
autoDowngrading: true # 是否开启自动降级,默认为true。当发送应用总线消息的目标服务为自己时,会自动降级为本地处理。
dynamic: # 动态配置,多实例支持
xxx:
host: ${rabbit.host}
port: ${rabbit.port}
username: ${rabbit.username}
password: ${rabbit.passwd}
virtual-host: ${rabbit.vhost}代码示例
通过以下方式,可以轻松集成 MQ:
注解方式:@EventListener
继承实现:GenericApplicationListener、SimpleApplicationApplication
以下是一个注解方式的简单示例,展示了如何发送和接收消息:
@Autowired
ApplicationEventPublisher publisher;
// 发送公共总线消息
publisher.publishEvent(BusEvents.messageBus("message bus test"));
// 接收公共总线消息
@EventListener
public void handleEvent(MessageBusEvent<String> event) {
System.out.println("Received message: " + event.getMessage());
}
// 发送本应用消息,若开启自动降级,将转为本地处理
publisher.publishEvent(BusEvents.appBus("app bus event","custom id"));
// 发送本应用消息,忽略版本
publisher.publishEvent(BusEvents.appBus("*","app bus event"));
// 向订单服务发送消息,忽略版本
publisher.publishEvent(BusEvents.appBus("order-service","*","app bus event"));
// 向订单服务发送消息,指定版本
publisher.publishEvent(BusEvents.appBus("order-service","1.0.0","app bus event"));
// 向订单服务发送消息,版本与当前服务版本一致
publisher.publishEvent(BusEvents.appBus("order-service","app bus event"));
// 甚至自定义 exchange 和 routerKey(不建议,但可以
ApplicationBusEvent<String> customExchange = new ApplicationBusEvent<>(
data = "消息体",
id = "消息 id",
routerKey = "你的自定义路由",
exchange = "你的自定义交换机",
name = "当前应用名",
version = "当前应用版本",
targetName = "目标应用名",
targetVersion = "目标应用版本"
)
publisher.publishEvent(event);
// 可以接受上诉所有泛型为 String 的 ApplicationBusEvent
@EventListener
public void handleEvent(ApplicationBusEvent<String> event) {
System.out.println("Received message: " + event.getData());
}继承方式:
// 使用 GenericApplicationListener
public interface CustomGenericApplicationListener implements GenericApplicationListener {
/**
* 确定此侦听器是否实际支持给定的事件类型
*/
boolean supportsEventType(ResolvableType eventType) {
// eventType 提供当前事件类型及嵌套泛型信息
// 例如:ApplicationBusEvent<String>
resolvableType.resolve() == ApplicationBusEvent.class
resolvableType.getGeneric(0).resolve() == String.class
}
void onApplicationEvent(ApplicationEvent event) {
// do something
}
}
// 使用 SmartApplicationListener
public interface CustomSmartApplicationListener implements SmartApplicationListener {
/**
* 确定此侦听器是否实际支持给定的事件类型
*/
boolean supportsEventType(Class<?> eventClass) {
// eventClass 提供当前事件类型
// 例如:ApplicationBusEvent<String>,eventType 为 ApplicationBusEvent
}
boolean supportsSourceType(Class<?> sourceClass) {
// sourceClass 提供事件泛型类型
// 例如:ApplicationBusEvent<String>,sourceClass 为 String
}
void onApplicationEvent(ApplicationEvent event) {
// do something
}
}vhost 切换
某些情况下我们希望发送到外部消息队列,此时需要临时切换 RabbitMq,消息队列工具提供了动态切换能力。
配置如下:
blade:
mq:
enabled: true # 是否启用MQ
name: ${spring.application.name} # 应用名称,默认取 spring.application.name
version: 1.0.0 # 应用版本
autoDowngrading: true # 是否开启自动降级,默认为true。当发送应用总线消息的目标服务为自己时,会自动降级为本地处理。
dynamic: # 动态配置,多实例支持
demo:
host: ${rabbit.host}
port: ${rabbit.port}
username: ${rabbit.username}
password: ${rabbit.passwd}
virtual-host: ${rabbit.vhost}可以通过 DynamicRabbitHolder.push() 切换当前线程的 vhost,例如:
// 1.
try {
// 切换到 demo 实例
DynamicRabbitHolder.push("demo")
publisher.publishEvent(BusEvents.messageBus("message"))
} finally {
DynamicRabbitHolder.poll()
}
// 注入 BladeRabbitTemplate Bean
BladeRabbitTemplate.use("demo").send()自定义队列/新增队列
一般情况下,上述方式支持大部分消息队列需求。如果确实需要新增队列,可以通过如下方式进行扩展:
// 1. 定义队列关系, Starter 自动检测进行声明,同名会自动忽略,同名不同类型可能会导致 MQ 消息队列错误,需手动清理后使用。
// 消费者:需进行配置队列关系配置,用于注册队列监听器。
@Bean
public Queue queue() {
return new Queue("queue.order)
}
@Bean
public Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("queue.order");
}
@Bean
public Exchange exchange() {
return new TopicExchange("exchange.order");
}
// 2. 定义监听器
@EventListener
public void handleEvent(OrderEvent event) {
System.out.println("收到订单消息: " + event.getData());
}
// 3. 定义事件,继承于 AbstractBusEvent
// 生产者:提供事件类型
public class OrderEvent extends AbstractBusEvent<Order> {
...
}
// 4. 发送事件
// 生产者:提供事件数据
OrderEvent customExchange = new OrderEvent(
data = new Order("你的订单数据"),
id = "消息 id",
routerKey = "queue.order", // 新增的定义的路由键,FanoutExchange 时,传空字符串
exchange = "exchange.order", // 新增的交换机名
name = BusEvents.appName(),
version = BusEvents.appVersion(),
)
publisher.publishEvent(new OrderEvent(order));实现方案
核心功能
公共消息总线(广播)
公共监听的queue,持久化,无版本区分应用总线(点对点) 应用总线,持久化,自动删除。队列名区分服务版本,例: queue.order.1.0.0、queue.order.1.0.1
每个应用均自动创建当前版本订阅
可通过 routing-key 多播,跨服务、跨版本 消息传送版本控制
避免流量切换时引发链路异常
思路
Spring 事件监听器,识别 BusEvent 泛型
识别是否为总线消息事件,组装 routerKey 转发至消息队列
注册消息队列 queue 监听器
收到消息后,降级为本地事件,根据泛型类型投递到 Spring 事件广播器
技术细节
事件类型均是 BusEvent,如何区分是本地/远程事件?
基于 BusEvent.downgraded 属性,若为 true,则为本地事件
详见 LocalForwardingToRemoteAdapter事件类型泛型类型,如何识别?
消费者:扩展 Spring @EventListener 后置处理器,获取方法参数泛型。
生产者:扩展 MessageConverter,识别泛型类型信息,记录在消息体。
事件送达:根据生产者泛型类型,获取监听器集合,投递到 Spring 事件广播器。若为空,则不消费。
详见 GenericEventListenerFactory
流程图
优缺点
优点
深度集成 Spring 事件机制,简化操作;
IDEA 智能提示支持: 方便快速查找事件发布点
全功能 @EventListener,支持 condition 通过 SPEL
表达式动态确定是否消费消息
服务消息版本隔离便于开启灰度、蓝绿部署,降低版本滚动发布兼容负担
平台无关,不关心底层消息队列 API,无需手动确认、重试
事件消息无异常,自动 ack 消息
异常时,自动 requeue基于事件泛型类型分发消息,无需关心 queue、exchange、binding 配置
例如:ApplicationBusEvent<ProductRemoval>、ApplicationBusEvent<PlaceAnOrder>
在服务总线内,但自动根据泛型分发事件。 即 queue、exchange、routing-key
相同,事件类型不同,也能被分发至正确的监听器。自动声明队列、交换机、路由,提供可读异常
缺点
当前基于 RabbitMQ,未支持广播模式。Rabbit 对于每个 queue 下的所有消费者,进行轮询分发,队列的消费者仅接受一次消息 ack 后不会重新入队,故此无法进行实际意义上的广播,多个消费者会进行故障转移。
有两种方式可以实现,但基于实际场景,暂未实现:
可以通过 Redis 进行 count down,所有消费者消费后进行 ack,引入额外组件,暂不考虑;某种程度上说,引入的话不如直接基于 RedisStream 数据结构重新实现发布订阅。)
Fanout 模式,对于每个服务每个版本均创建 queue,通过 fanout exchange 发送到每个 queue。但这会使队列数量激增,影响计费,暂未进行实现。
