9.2.1 消息通道

消息通道意指消息移动的集成管道移动。它们是连接 Spring Integration 所有其他部分的管道。

Spring Integration 提供了多个管道的实现,包括以下这些:

  • PublishSubscribeChannel —— 消息被发布到 PublishSubscribeChannel 后又被传递给一个或多个消费者。如果有多个消费者,他们都将会收到消息。

  • QueueChannel —— 消息被发布到 QueueChannel 后被存储到一个队列中,直到消息被消费者以先进先出(FIFO)的方式拉取。如果有多个消费者,他们中只有一个能收到消息。

  • PriorityChannel —— 与 QueueChannel 类似,但是与 FIFO 方式不同,消息被冠以 priority 的消费者拉取。

  • RendezvousChannel —— 与 QueueChannel 期望发送者阻塞通道,直到消费者接收这个消息类似,这种方式有效的同步了发送者与消费者。

  • DirectChannel —— 与 PublishSubscribeChannel 类似,但是是通过在与发送方相同的线程中调用消费者来将消息发送给单个消费者,此通道类型允许事务跨越通道。

  • ExecutorChannel —— 与 DirectChannel 类似,但是消息分派是通过 TaskExecutor 进行的,在与发送方不同的线程中进行,此通道类型不支持事务跨通道。

  • FluxMessageChannel —— Reactive Streams Publisher 基于 Project Reactor Flux 的消息通道。(我们将会在第 10 章讨论 Reactive Streams、Reactor 和 Flux)

在 Java 配置和 Java DSL 样式中,输入通道都是自动创建的,默认是 DirectChannel。但是,如果希望使用不同的通道实现,则需要显式地将通道声明为 bean 并在集成流中引用它。例如,要声明 PublishSubscribeChannel,需要声明以下 @Bean 方法:

@Bean
public MessageChannel orderChannel() {
    return new PublishSubscribeChannel();
}

然后在集成流定义中通过名称引用这个通道。例如,如果一个服务 activator bean 正在使用这个通道,那么可以在 @ServiceActivator 的 inputChannel 属性中引用它:

@ServiceActovator(inputChannel="orderChannel")

或者,如果使用 Java DSL 配置方式,需要通过调用 channel() 方法引用它:

@Bean
public IntegrationFlow orderFlow() {
    return IntegrationFlows
        ...
        .channel("orderChannel")
        ...
        .get();
}

需要注意的是,如果使用 QueueChannel,则必须为使用者配置一个轮询器。例如,假设已经声明了一个这样的 QueueChannel bean:

@Bean
public MessageChannel orderChannel() {
    return new QueueChannel();
}

需要确保将使用者配置为轮询消息通道。在服务激活器的情况下,@ServiceActivator 注解可能是这样的:

@ServiceActivator(inputChannel="orderChannel", poller=@Poller(fixedRate="1000"))

在本例中,服务激活器每秒(或 1,000 ms)从名为 orderChannel 的通道轮询一次。

最后更新于