溫馨提示×

kafka依賴怎樣正確配置

小樊
135
2024-12-14 02:06:42
欄目: 大數據

要正確配置Kafka依賴,請按照以下步驟操作:

  1. 添加Kafka客戶端依賴

在Maven項目的pom.xml文件中,添加Kafka客戶端的依賴項。將以下代碼添加到<dependencies>標簽內:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version> <!-- 請使用適合您項目的版本 -->
</dependency>

對于Gradle項目,在build.gradle文件的dependencies部分添加以下代碼:

implementation 'org.apache.kafka:kafka-clients:2.8.0' // 請使用適合您項目的版本
  1. 配置Kafka屬性

在項目的配置文件(如application.properties或application.yml)中,添加Kafka相關配置。以下是一些常用配置示例:

application.properties:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

application.yml:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: my-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

請根據您的實際需求修改這些配置,例如Kafka服務器地址、消費者組ID、序列化/反序列化器等。

  1. 在代碼中使用Kafka

在您的項目中創建Kafka生產者(Producer)和消費者(Consumer)類,并使用上面配置的屬性進行實例化。以下是一個簡單的示例:

Kafka生產者示例(Producer):

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class MyKafkaProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        producer.send(new ProducerRecord<>("my-topic", "key", "value"));
        producer.close();
    }
}

Kafka消費者示例(Consumer):

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class MyKafkaConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("my-topic"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
}

確保Kafka服務器正在運行,并根據需要修改生產者和消費者代碼中的主題(topic)名稱?,F在,您已經正確配置了Kafka依賴并在項目中使用了Kafka生產者與消費者。

0
亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女