在C#中開發Kafka客戶端,您可以使用Confluent.Kafka
庫。這個庫提供了對Apache Kafka的完整支持,包括生產者和消費者。以下是一個簡單的示例,展示了如何使用Confluent.Kafka
庫創建一個生產者和一個消費者。
首先,您需要安裝Confluent.Kafka
庫。在Visual Studio中,您可以通過NuGet包管理器來安裝:
Install-Package Confluent.Kafka
接下來,創建一個生產者:
using System;
using System.Threading.Tasks;
using Confluent.Kafka;
namespace KafkaProducer
{
class Program
{
static async Task Main(string[] args)
{
var conf = new ProducerConfig
{
BootstrapServers = "localhost:9092",
KeySerializer = typeof(string).AssemblyQualifiedName,
ValueSerializer = typeof(string).AssemblyQualifiedName
};
using (var producer = new Producer<string, string>(conf))
{
await producer.ProduceAsync("my-topic", new Message<string, string> { Key = "key", Value = "value" });
}
}
}
}
然后,創建一個消費者:
using System;
using System.Threading.Tasks;
using Confluent.Kafka;
namespace KafkaConsumer
{
class Program
{
static async Task Main(string[] args)
{
var conf = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "my-group",
KeyDeserializer = typeof(string).AssemblyQualifiedName,
ValueDeserializer = typeof(string).AssemblyQualifiedName
};
using (var consumer = new Consumer<string, string>(conf))
{
consumer.Subscribe(new[] { "my-topic" });
while (true)
{
var msg = await consumer.ConsumeAsync();
Console.WriteLine($"Received message: key={msg.Key}, value={msg.Value}");
}
}
}
}
}
這個示例中,生產者將一個鍵值對發送到名為my-topic
的主題,消費者則從該主題接收消息。請注意,您需要根據您的Kafka集群配置修改BootstrapServers
和其他設置。