溫馨提示×

golang操作kafka怎樣實現

小樊
98
2024-12-17 11:05:41
欄目: 編程語言

在Golang中操作Kafka,你可以使用第三方庫,如saramaconfluent-kafka-go。這里我將為你提供一個使用sarama庫的示例。首先,你需要安裝sarama庫:

go get github.com/Shopify/sarama

接下來,我將為你提供一個簡單的示例,展示如何使用sarama庫創建一個Kafka生產者和一個消費者。

  1. 創建一個Kafka生產者:
package main

import (
	"fmt"
	"log"

	"github.com/Shopify/sarama"
)

func main() {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Retry.Max = 5
	config.Producer.Return.Successes = true

	producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
	if err != nil {
		log.Fatalf("Error creating producer: %v", err)
	}
	defer func() {
		if err := producer.Close(); err != nil {
			log.Fatalf("Error closing producer: %v", err)
		}
	}()

	topic := "test_topic"
	message := "Hello, Kafka!"

	partition, offset, err := producer.SendMessage(&sarama.ProducerMessage{
		Topic: topic,
		Value: sarama.StringEncoder(message),
	})
	if err != nil {
		log.Printf("Error sending message: %v", err)
		return
	}

	fmt.Printf("Message sent to topic: %s, partition: %d, offset: %d\n", topic, partition, offset)
}
  1. 創建一個Kafka消費者:
package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"
	"sync"

	"github.com/Shopify/sarama"
)

func main() {
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true
	config.Version = sarama.V2_6_0

	brokers := []string{"localhost:9092"}
	groupID := "test_group"

	consumer, err := sarama.NewConsumerGroup(brokers, groupID, config)
	if err != nil {
		log.Fatalf("Error creating consumer group: %v", err)
	}
	defer func() {
		if err := consumer.Close(); err != nil {
			log.Fatalf("Error closing consumer: %v", err)
		}
	}()

	topic := "test_topic"
	handler := exampleConsumerGroupHandler{}

	go func() {
		for {
			err := consumer.Consume(context.Background(), []string{topic}, handler)
			if err != nil {
				log.Printf("Error consuming messages: %v", err)
			}
		}
	}()

	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)
	<-signals

	consumer.GracefulStop()
}

type exampleConsumerGroupHandler struct{}

func (exampleConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (exampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		fmt.Printf("Message: %s, Partition: %d, Offset: %d\n", string(msg.Value), msg.Partition, msg.Offset)
		sess.MarkMessage(msg, "")
	}

	return nil
}

這個示例中,我們創建了一個Kafka生產者,將一條消息發送到名為test_topic的主題。然后,我們創建了一個Kafka消費者,訂閱了相同的主題,并在接收到消息時打印其內容。

注意:在運行這些示例之前,請確保你已經啟動了一個Kafka實例。你可以使用Docker運行一個簡單的Kafka實例,如下所示:

docker run -d --name kafka -p 9092:9092 -e KAFKA_ADVERTISED_LISTENERS=localhost:9092 -e KAFKA_BROKER_ID=1 confluentinc/cp-kafka:2.6.0

這將啟動一個名為kafka的Docker容器,監聽本地的9092端口。

0
亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女