溫馨提示×

springboot 整合 kafka 配置文件詳解

小樊
229
2024-12-14 22:35:33
欄目: 大數據

在Spring Boot項目中整合Kafka主要涉及添加依賴、配置文件以及相關類的定義。以下是基于Spring Boot 2.x版本整合Kafka的詳細步驟和配置文件詳解:

依賴添加

pom.xml文件中添加Spring Kafka組件的依賴:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-kafka</artifactId>
</dependency>

配置文件

application.propertiesapplication.yml文件中配置Kafka的相關屬性。以下是一個基本的配置示例:

application.properties

# 第一個Kafka實例的配置
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=group1

# 第二個Kafka實例的配置
spring.kafka.second.bootstrap-servers=localhost:9093
spring.kafka.second.consumer.group-id=group2

application.yml

spring:
  kafka:
    bootstrap-servers:
      - localhost:9092
      - localhost:9093
    consumer:
      group-id:
        - group1
        - group2

創建KafkaTemplate和消費者監聽器

在配置類中創建KafkaTemplate Bean對象和消費者監聽器。

@Configuration
public class KafkaConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfigs()));
    }

    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @KafkaListener(topics = "topic1", groupId = "group1")
    public void consumeMessage1(String message) {
        System.out.println("Consumer 1: " + message);
    }

    @KafkaListener(topics = "topic2", groupId = "group2")
    public void consumeMessage2(String message) {
        System.out.println("Consumer 2: " + message);
    }
}

通過以上步驟,您可以在Spring Boot項目中成功整合Kafka,并實現消息的生產和消費。

以上信息僅供參考,具體配置可能會根據實際需求有所不同。

0
亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女