8.3.2 使用 KafkaTemplate 发送消息
在许多方面,KafkaTemplate 与 JMS 和 RabbitMQ 类似。与此同时,它也是不同的,尤其是在我们考虑它发送消息的方法时:
注意到的第一件事是没有 convertAndSend() 方法。这是因为 KafkaTemplate 是用的泛型,同时能够在发送消息时直接处理域类型。在某种程度上,所有的 send() 方法都在做 convertAndSend() 的工作。
再者 send() 和 sendDefault() 的参数,它们与 JMS 和 Rabbit 中使用的参数完全不同。当使用 Kafka 发送消息时,可以指定以下参数来指导如何发送消息:
发送消息的 topic(send() 方法必要的参数)
写入 topic 的分区(可选)
发送记录的键(可选)
时间戳(可选;默认为 System.currentTimeMillis())
payload(必须)
topic 和 payload 是两个最重要的参数。分区和键对如何使用 KafkaTemplate 几乎没有影响,除了作为 send() 和 sendDefault() 的参数用于提供额外信息。出于我们的目的,我们将把重点放在将消息有效负载发送到给定主题上,而不考虑分区和键。
对于 send() 方法,还可以选择发送一个 ProducerRecord,它与在单个对象中捕获所有上述参数的类型差不多。也可以发送 Message 对象,但是这样做需要将域对象转换为 Message。通常,使用其他方法比创建和发送 ProducerRecord 或 Message 对象更容易。
使用 KafkaTemplate 及其 send() 方法,可以编写一个基于 kafka 的 OrderMessagingService 实现。下面的程序清单显示了这样一个实现。
在 OrderMessagingService 的这个实现中,sendOrder() 方法使用注入的 KafkaTemplate 的 send() 方法向名为tacocloud.orders.topic 的主题发送 Order。代码中除了使用 "Kafka" 这个名称外,这与为 JMS 和 Rabbit 编写的代码没有太大的不同。
如果设置了默认主题,可以稍微简化 sendOrder() 方法。首先,通过设置 spring.kafka.template.default-topic 属性,将默认主题设置为 tacocloud.orders.topic:
然后,在 sendOrder() 方法中,可以调用 sendDefault() 而不是 send(),并且不指定主题名称:
现在已经编写了消息发送代码了,让我们将注意力转向编写从 Kafka 接收这些消息的代码。
最后更新于