最后更新于
这有帮助吗?
最后更新于
这有帮助吗?
除了 send() 和 sendDefault() 的惟一方法签名之外,KafkaTemplate 与 JmsTemplate 和 RabbitTemplate 的不同之处在于它不提供任何接收消息的方法。这意味着使用 Spring 消费来自 Kafka 主题的消息的唯一方法是编写消息监听器。
对于 Kafka,消息监听器被定义为被 @KafkaListener 注解的方法。@KafkaListener 注解大致类似于 @JmsListener 和 @RabbitListener,其使用方式大致相同。下面程序清单显示了为 Kafka 编写的基于 listener 的订单接收程序。
handle() 方法由 @KafkaListener 注解,表示当消息到达名为 tacocloud.orders.topic 的主题时应该调用它。正如程序清单 8.9 中所写的,只为 handle() 方法提供了一个 Order(payload)参数 。但是,如果需要来自消息的其他元数据,它也可以接受一个 ConsumerRecord 或 Message 对象。
例如,handle() 的以下实现接受一个 ConsumerRecord,这样就可以记录消息的分区和时间戳:
类似地,可以使用 Message 而不是 ConsumerRecord,并达到同样的效果:
值得注意的是,消息有效负载也可以通过 ConsumerRecord.value() 或 Message.getPayload() 获得。这意味着可以通过这些对象请求 Order,而不是直接将其作为 handle() 的参数。