# 8.3.3 编写 Kafka 监听器

除了 send() 和 sendDefault() 的惟一方法签名之外，KafkaTemplate 与 JmsTemplate 和 RabbitTemplate 的不同之处在于它不提供任何接收消息的方法。这意味着使用 Spring 消费来自 Kafka 主题的消息的唯一方法是编写消息监听器。

对于 Kafka，消息监听器被定义为被 @KafkaListener 注解的方法。@KafkaListener 注解大致类似于 @JmsListener 和 @RabbitListener，其使用方式大致相同。下面程序清单显示了为 Kafka 编写的基于 listener 的订单接收程序。

{% code title="程序清单 8.9 使用 @KafkaListener 接收订单" %}

```java
package tacos.kitchen.messaging.kafka.listener;
​
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import tacos.Order;
import tacos.kitchen.KitchenUI;
​
@Component
public class OrderListener {
    private KitchenUI ui;
    
    @Autowired
    public OrderListener(KitchenUI ui) {
        this.ui = ui;
    }
    
    @KafkaListener(topics="tacocloud.orders.topic")
    public void handle(Order order) {
        ui.displayOrder(order);
    }
}
```

{% endcode %}

handle() 方法由 @KafkaListener 注解，表示当消息到达名为 tacocloud.orders.topic 的主题时应该调用它。正如程序清单 8.9 中所写的，只为 handle() 方法提供了一个 Order（payload）参数 。但是，如果需要来自消息的其他元数据，它也可以接受一个 ConsumerRecord 或 Message 对象。

例如，handle() 的以下实现接受一个 ConsumerRecord，这样就可以记录消息的分区和时间戳：

```java
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, ConsumerRecord<Order> record) {
    log.info("Received from partition {} with timestamp {}",
             record.partition(), record.timestamp());
    ui.displayOrder(order);
}
```

类似地，可以使用 Message 而不是 ConsumerRecord，并达到同样的效果：

```java
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, Message<Order> message) {
    MessageHeaders headers = message.getHeaders();
    log.info("Received from partition {} with timestamp {}",
             headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)
             headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));
    ui.displayOrder(order);
}
```

值得注意的是，消息有效负载也可以通过 ConsumerRecord.value() 或 Message.getPayload() 获得。这意味着可以通过这些对象请求 Order，而不是直接将其作为 handle() 的参数。
