9.2.8 通道适配器
通道适配器代表集成信息流的入口点和出口点。数据通过入站信道适配器的方式进入到集成流中,通过出站信道适配器的方式离开集成流。
入站信道的适配器可以采取多种形式,这取决于它们引入到流的数据源。例如,声明一个入站通道适配器,它采用从 AtomicInteger 到流递增的数字。使用 Java 配置,它可能是这样的:
@Bean
@InboundChannelAdapter(
poller=@Poller(fixedRate="1000"), channel="numberChannel")
public MessageSource<Integer> numberSource(AtomicInteger source) {
return () -> {
return new GenericMessage<>(source.getAndIncrement());
};
}
此 @Bean 方法声明了一个入站信道适配器 bean,后面跟随着 @InboundChannelAdapter 注解,它们每 1 秒(1000 ms)从注入的 AtomicInteger 提交一个数字到名 numberChannel 的通道中。
当使用 Java 配置时,@InboundChannelAdapter 意味着是一个入站通道适配器,from() 方法就是使用 Java DSL 来定义流的时候,表明它是怎么处理的。下面对于流定义的一个片段展示了在 Java DSL 配置中类似的输入通道适配器:
@Bean
public IntegrationFlow someFlow(AtomicInteger integerSource) {
return IntegrationFlows
.from(integerSource, "getAndIncrement",
c -> c.poller(Pollers.fixedRate(1000)))
...
.get();
}
通常情况下,通道适配器通过的 Spring Integration 的多端点模块之一进行提供。举个例子,假设需要一个入站通道适配器,用它来监视指定的目录,同时将任何写入到那个目录中的文件作为消息,提交到名为 file-channel 的通道中。下面的 Java 配置使用 FileReadingMessageSource 从 Spring Integration 的文件端点模块来实现这一目标:
@Bean
@InboundChannelAdapter(channel="file-channel",
poller=@Poller(fixedDelay="1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource sourceReader = new FileReadingMessageSource();
sourceReader.setDirectory(new File(INPUT_DIR));
sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
return sourceReader;
}
当在 Java DSL 中写入同样的 file-reading 入站通道适配器时,来自 Files 类的 inboundAdapter() 方法达到的同样的目的。出站通道适配器位于集成信息流的最后位置,将最终消息扇出到应用程序或是其他系统中:
@Bean
public IntegrationFlow fileReaderFlow() {
return IntegrationFlows
.from(Files.inboundAdapter(new File(INPUT_DIR))
.patternFilter(FILE_PATTERN))
.get();
}
服务激活器(作为消息处理的实现)往往是为出站通道适配器而存在的,特别是当数据需要被扇出到应用程序本身的时候。
值得一提的,Spring Integration 的端点模块为几种常见的用例提供了有用的消息处理程序。如在程序清单 9.3 中所看到的 FileWritingMessageHandler 出站通道适配器,这就是一个很好的例子。说到 Spring Integration 端点模块,让我们快速浏览一下准备使用的集成端点模块。
复制链接