在現代的分布式系統中,消息隊列扮演著至關重要的角色。Apache Kafka 高吞吐量、分布式的消息系統,被廣泛應用于日志收集、流處理、事件驅動架構等場景。Spring Kafka 是 Spring 框架對 Kafka 的集成,提供了簡潔的 API 來與 Kafka 進行交互。其中,@KafkaListener 注解是 Spring Kafka 中用于監聽 Kafka 消息的核心注解之一。本文將詳細介紹 @KafkaListener 的使用方法,幫助開發者更好地理解和應用這一功能。
@KafkaListener 是 Spring Kafka 提供的一個注解,用于標記一個方法作為 Kafka 消息的監聽器。當 Kafka 主題中有新消息到達時,被注解的方法會被自動調用,從而處理這些消息。通過 @KafkaListener,開發者可以輕松地將 Kafka 消息與業務邏輯進行綁定,實現消息的消費和處理。
首先,在 Spring Boot 項目中,我們需要添加 spring-kafka 依賴。在 pom.xml 文件中添加以下依賴:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
在 application.properties 或 application.yml 文件中配置 Kafka 消費者的相關屬性:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
接下來,我們可以在 Spring 管理的 Bean 中使用 @KafkaListener 注解來監聽 Kafka 主題中的消息。以下是一個簡單的示例:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listen(String message) {
System.out.println("Received message: " + message);
}
}
在這個示例中,listen 方法被標記為 @KafkaListener,并且指定了要監聽的 Kafka 主題 my-topic 和消費者組 my-group。當 my-topic 主題中有新消息到達時,listen 方法會被自動調用,并將消息內容作為參數傳遞給方法。
@KafkaListener 注解允許同時監聽多個主題??梢酝ㄟ^ topics 屬性指定多個主題名稱,或者使用 topicPattern 屬性通過正則表達式匹配多個主題:
@KafkaListener(topics = {"topic1", "topic2"}, groupId = "my-group")
public void listenMultipleTopics(String message) {
System.out.println("Received message from multiple topics: " + message);
}
@KafkaListener(topicPattern = "my-topic-.*", groupId = "my-group")
public void listenTopicPattern(String message) {
System.out.println("Received message from topic pattern: " + message);
}
Kafka 消息可以包含消息頭(headers),這些消息頭可以用于傳遞額外的元數據信息。@KafkaListener 允許通過 @Header 注解來獲取消息頭:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listenWithHeaders(String message, @Header("custom-header") String customHeader) {
System.out.println("Received message: " + message);
System.out.println("Custom header: " + customHeader);
}
}
Kafka 消息可以包含一個鍵(key),用于分區和消息路由。@KafkaListener 允許通過 @Header(KafkaHeaders.RECEIVED_KEY) 注解來獲取消息鍵:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listenWithKey(String message, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
System.out.println("Received message: " + message);
System.out.println("Message key: " + key);
}
}
@KafkaListener 還允許獲取消息的分區和偏移量信息??梢酝ㄟ^ @Header(KafkaHeaders.RECEIVED_PARTITION_ID) 和 @Header(KafkaHeaders.OFFSET) 注解來獲取這些信息:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listenWithPartitionAndOffset(String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
System.out.println("Received message: " + message);
System.out.println("Partition: " + partition);
System.out.println("Offset: " + offset);
}
}
默認情況下,Spring Kafka 會自動提交消費者的偏移量。但在某些場景下,開發者可能需要手動控制偏移量的提交??梢酝ㄟ^設置 enableAutoCommit 為 false 并手動調用 Acknowledgment 對象的 acknowledge() 方法來實現:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "my-topic", groupId = "my-group", containerFactory = "kafkaManualAckListenerContainerFactory")
public void listenWithManualAck(String message, Acknowledgment ack) {
System.out.println("Received message: " + message);
// 處理消息
ack.acknowledge(); // 手動提交偏移量
}
}
在配置類中,需要定義一個 ConcurrentKafkaListenerContainerFactory 并設置 enableAutoCommit 為 false:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaManualAckListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
在某些場景下,可能需要一次性消費多條消息??梢酝ㄟ^設置 batchListener 為 true 來實現批量消費:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "my-topic", groupId = "my-group", containerFactory = "kafkaBatchListenerContainerFactory")
public void listenBatch(List<String> messages) {
System.out.println("Received batch of messages: " + messages);
}
}
在配置類中,需要定義一個 ConcurrentKafkaListenerContainerFactory 并設置 batchListener 為 true:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaBatchListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
return factory;
}
}
在消費 Kafka 消息時,可能會遇到各種異常情況。Spring Kafka 提供了多種方式來處理這些異常??梢酝ㄟ^ errorHandler 屬性指定一個自定義的異常處理器:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "my-topic", groupId = "my-group", errorHandler = "myErrorHandler")
public void listenWithErrorHandler(String message) {
System.out.println("Received message: " + message);
throw new RuntimeException("Simulated error");
}
}
在配置類中,定義一個自定義的異常處理器:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.stereotype.Component;
@Component
public class MyErrorHandler implements ErrorHandler {
@Override
public void handle(Exception thrownException, ConsumerRecord<?, ?> data) {
System.err.println("Error occurred while processing message: " + data.value());
System.err.println("Exception: " + thrownException.getMessage());
}
}
@KafkaListener 是 Spring Kafka 中用于監聽 Kafka 消息的核心注解,提供了豐富的功能來滿足不同的消費需求。通過本文的介紹,開發者可以掌握 @KafkaListener 的基本用法和高級特性,包括監聽多個主題、處理消息頭和鍵、手動提交偏移量、批量消費以及異常處理等。希望本文能夠幫助開發者更好地理解和應用 @KafkaListener,從而構建高效、可靠的 Kafka 消費者應用。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。