溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

@KafkaListener怎么使用

發布時間:2023-02-25 11:58:43 來源:億速云 閱讀:288 作者:iii 欄目:開發技術

@KafkaListener怎么使用

在現代的分布式系統中,消息隊列扮演著至關重要的角色。Apache Kafka 高吞吐量、分布式的消息系統,被廣泛應用于日志收集、流處理、事件驅動架構等場景。Spring Kafka 是 Spring 框架對 Kafka 的集成,提供了簡潔的 API 來與 Kafka 進行交互。其中,@KafkaListener 注解是 Spring Kafka 中用于監聽 Kafka 消息的核心注解之一。本文將詳細介紹 @KafkaListener 的使用方法,幫助開發者更好地理解和應用這一功能。

1. 什么是 @KafkaListener

@KafkaListener 是 Spring Kafka 提供的一個注解,用于標記一個方法作為 Kafka 消息的監聽器。當 Kafka 主題中有新消息到達時,被注解的方法會被自動調用,從而處理這些消息。通過 @KafkaListener,開發者可以輕松地將 Kafka 消息與業務邏輯進行綁定,實現消息的消費和處理。

2. 基本用法

2.1 添加依賴

首先,在 Spring Boot 項目中,我們需要添加 spring-kafka 依賴。在 pom.xml 文件中添加以下依賴:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2.2 配置 Kafka 消費者

application.propertiesapplication.yml 文件中配置 Kafka 消費者的相關屬性:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest

2.3 使用 @KafkaListener 注解

接下來,我們可以在 Spring 管理的 Bean 中使用 @KafkaListener 注解來監聽 Kafka 主題中的消息。以下是一個簡單的示例:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void listen(String message) {
        System.out.println("Received message: " + message);
    }
}

在這個示例中,listen 方法被標記為 @KafkaListener,并且指定了要監聽的 Kafka 主題 my-topic 和消費者組 my-group。當 my-topic 主題中有新消息到達時,listen 方法會被自動調用,并將消息內容作為參數傳遞給方法。

3. 高級用法

3.1 監聽多個主題

@KafkaListener 注解允許同時監聽多個主題??梢酝ㄟ^ topics 屬性指定多個主題名稱,或者使用 topicPattern 屬性通過正則表達式匹配多個主題:

@KafkaListener(topics = {"topic1", "topic2"}, groupId = "my-group")
public void listenMultipleTopics(String message) {
    System.out.println("Received message from multiple topics: " + message);
}

@KafkaListener(topicPattern = "my-topic-.*", groupId = "my-group")
public void listenTopicPattern(String message) {
    System.out.println("Received message from topic pattern: " + message);
}

3.2 處理消息頭

Kafka 消息可以包含消息頭(headers),這些消息頭可以用于傳遞額外的元數據信息。@KafkaListener 允許通過 @Header 注解來獲取消息頭:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void listenWithHeaders(String message, @Header("custom-header") String customHeader) {
        System.out.println("Received message: " + message);
        System.out.println("Custom header: " + customHeader);
    }
}

3.3 處理消息鍵

Kafka 消息可以包含一個鍵(key),用于分區和消息路由。@KafkaListener 允許通過 @Header(KafkaHeaders.RECEIVED_KEY) 注解來獲取消息鍵:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void listenWithKey(String message, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
        System.out.println("Received message: " + message);
        System.out.println("Message key: " + key);
    }
}

3.4 處理消息分區和偏移量

@KafkaListener 還允許獲取消息的分區和偏移量信息??梢酝ㄟ^ @Header(KafkaHeaders.RECEIVED_PARTITION_ID)@Header(KafkaHeaders.OFFSET) 注解來獲取這些信息:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void listenWithPartitionAndOffset(String message,
                                            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                                            @Header(KafkaHeaders.OFFSET) long offset) {
        System.out.println("Received message: " + message);
        System.out.println("Partition: " + partition);
        System.out.println("Offset: " + offset);
    }
}

3.5 手動提交偏移量

默認情況下,Spring Kafka 會自動提交消費者的偏移量。但在某些場景下,開發者可能需要手動控制偏移量的提交??梢酝ㄟ^設置 enableAutoCommitfalse 并手動調用 Acknowledgment 對象的 acknowledge() 方法來實現:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "my-topic", groupId = "my-group", containerFactory = "kafkaManualAckListenerContainerFactory")
    public void listenWithManualAck(String message, Acknowledgment ack) {
        System.out.println("Received message: " + message);
        // 處理消息
        ack.acknowledge(); // 手動提交偏移量
    }
}

在配置類中,需要定義一個 ConcurrentKafkaListenerContainerFactory 并設置 enableAutoCommitfalse

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaManualAckListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

3.6 批量消費

在某些場景下,可能需要一次性消費多條消息??梢酝ㄟ^設置 batchListenertrue 來實現批量消費:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "my-topic", groupId = "my-group", containerFactory = "kafkaBatchListenerContainerFactory")
    public void listenBatch(List<String> messages) {
        System.out.println("Received batch of messages: " + messages);
    }
}

在配置類中,需要定義一個 ConcurrentKafkaListenerContainerFactory 并設置 batchListenertrue

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaBatchListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        return factory;
    }
}

3.7 異常處理

在消費 Kafka 消息時,可能會遇到各種異常情況。Spring Kafka 提供了多種方式來處理這些異常??梢酝ㄟ^ errorHandler 屬性指定一個自定義的異常處理器:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "my-topic", groupId = "my-group", errorHandler = "myErrorHandler")
    public void listenWithErrorHandler(String message) {
        System.out.println("Received message: " + message);
        throw new RuntimeException("Simulated error");
    }
}

在配置類中,定義一個自定義的異常處理器:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.stereotype.Component;

@Component
public class MyErrorHandler implements ErrorHandler {

    @Override
    public void handle(Exception thrownException, ConsumerRecord<?, ?> data) {
        System.err.println("Error occurred while processing message: " + data.value());
        System.err.println("Exception: " + thrownException.getMessage());
    }
}

4. 總結

@KafkaListener 是 Spring Kafka 中用于監聽 Kafka 消息的核心注解,提供了豐富的功能來滿足不同的消費需求。通過本文的介紹,開發者可以掌握 @KafkaListener 的基本用法和高級特性,包括監聽多個主題、處理消息頭和鍵、手動提交偏移量、批量消費以及異常處理等。希望本文能夠幫助開發者更好地理解和應用 @KafkaListener,從而構建高效、可靠的 Kafka 消費者應用。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女