本文介绍: Binder 抽象使 Spring Cloud Stream 应用程序能够灵活地连接中间件。Spring Cloud Stream 为 Kafka 和 RabbitMQ 提供了 Binder 实现。RocketMQ Binder 已由 Spring Cloud Alibaba 实现。Binder 抽象也是该框架扩展点之一,我们可以在 Spring Cloud Stream 之上实现自定义 Binder。

引言

当前的微服务架构下,使用消息队列(MQ)技术实现服务解耦和削峰填谷的重要策略为了保证系统的灵活性和可替换性,我们需要避免对单一开源技术依赖

市面上有多种消息队列技术,如 Kafka、RocketMQ、RabbitMQ 等。关键在于如何在微服务体系实现这些MQ组件的无缝切换,以减少代码修改需求

Spring Cloud Stream 通过其与主流消息中间件的灵活集成实现了通过仅修改配置文件方式切换不同的MQ实现,从而提高系统适应性和可维护性。

什么是 Spring Cloud Stream

Spring Cloud Stream一个用于构建消息驱动的微服务应用程序框架

基于 Spring Boot 构建用于创建独立的生产级 Spring 应用程序,并使用 Spring Integration 提供与消息代理连接。它提供了来自多个供应商中间件的固定配置引入持久发布订阅语义消费者组和分区概念

简单来说 Spring Cloud Stream 是对 Spring Integration 和 Spring Boot 的合并

图一

主要概念

1. application model应用模型

图二.Spring Cloud Stream 应用程序

中间件提供的 Binder 来处理绑定 应用程序通过绑定这个 Binder 与其建立联系,发送消息应用程序通过 outputs 通道消息传递BinderBinder 再把消息消息中间件接收消息消息中间件消息传递BinderBinder 再把消息通过 inputs 通道传递给应用程序

比如 Kafka Binder 依赖如下图:

图三 spring cloud stream kafka依赖

2. The Binder Abstraction(Binder抽象

Binder 抽象使 Spring Cloud Stream 应用程序能够灵活地连接中间件

Spring Cloud Stream 为 Kafka 和 RabbitMQ 提供了 Binder 实现。 RocketMQ Binder 已由 Spring Cloud Alibaba 实现

Binder 抽象也是该框架扩展点之一,我们可以在 Spring Cloud Stream 之上实现自定义 Binder。

3. Programming Model编程模型

核心概念

图四

环境搭建

本文环境

  • Java:17

  • Spring Boot:3.0.2

  • Spring Cloud:2022.0.2

  • Spring Cloud Alibaba:2022.0.0.0

maven依赖配置

pom.xml依赖如下:

消息驱动jar,用哪个mq引入哪个即可。

<dependencies>
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>${spring-boot.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-alibaba-dependencies</artifactId>
            <version>${spring-cloud-alibaba.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
  </dependencies>
</dependencyManagement>

配置文件

application.yml RocketMq 配置信息

spring:
  cloud:
    stream:
      stream:
        rocketmq:
          binder:
            name-server: 127.0.0.1:9876;127.0.0.1:9877
        function:
          # 组装和绑定
          definitionmyTopicC
        binders:
          default:
            type: rocketmq
        bindings:
          ## 生产者 新版本固定格式  函数名-{out/in}-{index}
          demoChannel-out-0:
            destination: boot-mq-topic
          ## 消费者 新版本固定格式  函数名字-{out/in}-{index}
          demoChannel-in-0:
            destination: boot-mq-topic

application.yml Kafka 配置信息:

spring:
  cloud:
    stream:
      stream:
        kafka:
          binder:
            brokers: 127.0.0.1:9092
        function:
          # 组装和绑定
          definitionmyTopicC
        binders:
          default:
            type: kafka
        bindings:
          ## 生产者 新版本固定格式  函数名-{out/in}-{index}
          demoChannel-out-0:
            destination: boot-mq-topic
          ## 消费者 新版本固定格式  函数名字-{out/in}-{index}
          demoChannel-in-0:
            destination: boot-mq-topic

消息生产者

创建一个简单的消息生产者

@RestController
@Slf4j
public class ProducerStream {

    @Autowired
    private StreamBridge streamBridge;

    @GetMapping("/test-stream")
    public String testStream() {
        streamBridge.send("demoChannel-out-0",
                MessageBuilder
                        .withPayload("消息体")
                        .build()
        );
        return "success";
    }
}

消息消费者

创建一个消息消费者接收消息:

@Slf4j
@Configuration
public class TestStreamConsumer {

    @Bean
    public Consumer<String> demoChannel() {
        return message -> {
             log.info("demoChannel接到消息:{}", message);
        };
    }
}

假如需要从 Kafka 替换成 RocketMq ,只需要修改pom文件配置文件即可。

在之前的 Spring Cloud Stream 版本中是采用注解方式来实现绑定,在新版本中是通过函数式编程模型来绑定名称。采用约定大于配置的思想,简化了应用程序配置

具体可见官方文档https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_functional_binding_names

Spring Cloud Stream 发送消息流程

图五 spring cloud stream消息流程图

消息模型

通过图三可以看到 Sping Cloud Stream 的依赖关系。

Sping Cloud Stream -> Spring Integration -> Spring Messaging

可以看出来 Sping Cloud Stream基于 Spring Integration 做了一层封装,是依赖于 Spring Integration 这个组件的,而 Spring Integration依赖于 Spring Messaging 组件来实现消息处理机制的基础设施。

Spring Integration 是对 Spring Messaging扩展设计目标是系统集成,因此内部提供了大量的集成化端点方便应用程序直接使用。

各个异构系统相互集成时,Spring Integration 通过通道之间的消息传递,让我们可以在消息的入口和出口使用通道适配器和消息网关这两种典型的端点对消息进行同构化处理

Spring MessagingSpring 框架中的一个底层模块用于提供统一的消息编程模型。

消息 Message 接口定义

public interface Message<T> {
  //消息体
  T getPayload();

  //消息头
  MessageHeaders getHeaders();
}

消息通道 MessageChannel 接口定义

@FunctionalInterface
public interface MessageChannel {
 long INDEFINITE_TIMEOUT = -1;
    
    //发送消息,无限期阻塞
 default boolean send(Message<?> message) {
  return send(message, INDEFINITE_TIMEOUT);
 }

    //发送消息,阻塞直到到达指定超时时间
 boolean send(Message<?> message, long timeout);
}

消息通道 MessageChannel 接收消息,调用send()方法将消息发送至该消息通道。

消息通道可简单理解为对队列的一种抽象。通道的名称对应队列的名称。

Spring message 把通道抽象成两种基本表现形式

两个通道都继承自具有消息发送功能MessageChannel

public interface SubscribableChannel extends MessageChannel {
 //通过注册回调函数MessageHandler来实现事件响应
    //注册消息处理器
 boolean subscribe(MessageHandler handler);

 //取消注册消息处理器
 boolean unsubscribe(MessageHandler handler);
}
publiinterface PollableChannel extends MessageChannel {
 //通过轮询操作主动获取消息
 //从通道中接收消息
 @Nullable
 Message<?> receive();

    //指定超时时间,从通道中接收消息
 @Nullable
 Message<?> receive(long timeout);
}

MessageHandler接口定义

@FunctionalInterface
publiinterface MessageHandler {
 //处理消息方法
 void handleMessage(Message<?> message) throws MessagingException;
}

再回到图五流程图中,我们最终可以看到 KafkaRocketMQ 通过继承 AbstractMessageHandler 抽象类AbstractMessageHandler 抽象类是实现了 MessageHandler 接口)来实现不同中间件的消息发送操作。而这些都是封装在各自中间件对应的 Binder 代码中来实现。

结论

回到我们主题Spring Cloud Stream 如何屏蔽不同 MQ 带来的差异性?

参考资料

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注