在Linux中,使用Golang實現消息隊列有多種方法。以下是一些常見的實現方式:
RabbitMQ是一個廣泛使用的消息代理和隊列服務器,支持多種消息協議。
首先,你需要在Linux上安裝RabbitMQ??梢允褂靡韵旅睿?/p>
sudo apt-get update
sudo apt-get install rabbitmq-server
你可以使用streadway/amqp
庫來與RabbitMQ交互。
go get github.com/streadway/amqp
以下是一個簡單的生產者示例:
package main
import (
"fmt"
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"hello", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
body := "Hello World!"
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
fmt.Println(" [x] Sent %s", body)
}
消費者示例:
package main
import (
"fmt"
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"hello", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
fmt.Printf("Received a message: %s\n", d.Body)
}
}()
fmt.Println(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
Kafka是一個分布式流處理平臺,也可以用作消息隊列。
你可以使用以下命令安裝Kafka:
wget https://downloads.apache.org/kafka/2.8.0/kafka_2.13-2.8.0.tgz
tar -xzf kafka_2.13-2.8.0.tgz
cd kafka_2.13-2.8.0
啟動Zookeeper和Kafka服務器:
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &
你可以使用confluent-kafka-go
庫來與Kafka交互。
go get github.com/confluentinc/confluent-kafka-go/kafka
生產者示例:
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
if err != nil {
panic(err)
}
defer p.Close()
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", ev.TopicPartition.Error)
} else {
fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
}
}
}
}()
topic := "test-topic"
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte("Hello Kafka"),
}, nil)
p.Flush(15 * 1000)
}
消費者示例:
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "test-consumer-group",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
defer c.Close()
c.SubscribeTopics([]string{"test-topic"}, nil)
for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Received message: %s\n", string(msg.Value))
} else {
fmt.Printf("Consumer error: %v\n", err)
}
}
}
Redis也可以用作消息隊列,特別是使用其發布/訂閱功能。
你可以使用以下命令安裝Redis:
sudo apt-get update
sudo apt-get install redis-server
你可以使用go-redis
庫來與Redis交互。
go get github.com/go-redis/redis/v8
生產者示例:
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
)
var ctx = context.Background()
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password set
DB: 0, // use default DB
})
err := rdb.Publish(ctx, "channel", "Hello Redis").Err()
if err != nil {
panic(err)
}
fmt.Println("Message published")
}
消費者示例:
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
)
var ctx = context.Background()
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password set
DB: 0, // use default DB
})
pubsub := rdb.Subscribe(ctx, "channel")
defer pubsub.Close()
ch := pubsub.Channel()
for msg := range ch {
fmt.Printf("Received message: %s\n", msg.Payload)
}
}
這些示例展示了如何在Linux中使用Golang實現消息隊列。你可以根據自己的需求選擇合適的消息隊列系統,并使用相應的Golang客戶端庫進行開發。