> For the complete documentation index, see [llms.txt](https://potoyang.gitbook.io/spring-in-action-v5/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://potoyang.gitbook.io/spring-in-action-v5/di-8-zhang-fa-song-yi-bu-xiao-xi/8.3-shi-yong-kafka-fa-song-xiao-xi/8.3.3-bian-xie-kafka-jian-ting-qi.md).

# 8.3.3 编写 Kafka 监听器

除了 send() 和 sendDefault() 的惟一方法签名之外，KafkaTemplate 与 JmsTemplate 和 RabbitTemplate 的不同之处在于它不提供任何接收消息的方法。这意味着使用 Spring 消费来自 Kafka 主题的消息的唯一方法是编写消息监听器。

对于 Kafka，消息监听器被定义为被 @KafkaListener 注解的方法。@KafkaListener 注解大致类似于 @JmsListener 和 @RabbitListener，其使用方式大致相同。下面程序清单显示了为 Kafka 编写的基于 listener 的订单接收程序。

{% code title="程序清单 8.9 使用 @KafkaListener 接收订单" %}

```java
package tacos.kitchen.messaging.kafka.listener;
​
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import tacos.Order;
import tacos.kitchen.KitchenUI;
​
@Component
public class OrderListener {
    private KitchenUI ui;
    
    @Autowired
    public OrderListener(KitchenUI ui) {
        this.ui = ui;
    }
    
    @KafkaListener(topics="tacocloud.orders.topic")
    public void handle(Order order) {
        ui.displayOrder(order);
    }
}
```

{% endcode %}

handle() 方法由 @KafkaListener 注解，表示当消息到达名为 tacocloud.orders.topic 的主题时应该调用它。正如程序清单 8.9 中所写的，只为 handle() 方法提供了一个 Order（payload）参数 。但是，如果需要来自消息的其他元数据，它也可以接受一个 ConsumerRecord 或 Message 对象。

例如，handle() 的以下实现接受一个 ConsumerRecord，这样就可以记录消息的分区和时间戳：

```java
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, ConsumerRecord<Order> record) {
    log.info("Received from partition {} with timestamp {}",
             record.partition(), record.timestamp());
    ui.displayOrder(order);
}
```

类似地，可以使用 Message 而不是 ConsumerRecord，并达到同样的效果：

```java
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, Message<Order> message) {
    MessageHeaders headers = message.getHeaders();
    log.info("Received from partition {} with timestamp {}",
             headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)
             headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));
    ui.displayOrder(order);
}
```

值得注意的是，消息有效负载也可以通过 ConsumerRecord.value() 或 Message.getPayload() 获得。这意味着可以通过这些对象请求 Order，而不是直接将其作为 handle() 的参数。


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://potoyang.gitbook.io/spring-in-action-v5/di-8-zhang-fa-song-yi-bu-xiao-xi/8.3-shi-yong-kafka-fa-song-xiao-xi/8.3.3-bian-xie-kafka-jian-ting-qi.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
