Apache Kafka 是一個分布式流處理平臺,廣泛應用于實時數據管道和流處理應用。Kafka 提供了多種編程模型,以滿足不同場景下的需求。本文將詳細介紹 Kafka 的主要編程模型,包括生產者-消費者模型、流處理模型、連接器模型以及事務模型。
生產者-消費者模型是 Kafka 最基本的編程模型,也是 Kafka 最核心的功能之一。該模型包括兩個主要角色:生產者和消費者。
生產者負責將消息發布到 Kafka 的 Topic 中。生產者可以是任何能夠生成數據的應用程序或服務。生產者通過 Kafka 提供的 API 將消息發送到指定的 Topic,Kafka 會將這些消息持久化存儲在 Broker 中。
Kafka 提供了多種編程語言的客戶端庫,如 Java、Python、Go 等。以 Java 為例,生產者 API 的核心類是 KafkaProducer。以下是一個簡單的 Java 生產者示例:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class SimpleProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
}
producer.close();
}
}
消費者負責從 Kafka 的 Topic 中讀取消息并進行處理。消費者可以是任何需要消費數據的應用程序或服務。消費者通過 Kafka 提供的 API 從指定的 Topic 中拉取消息,并根據業務邏輯進行處理。
Kafka 的消費者 API 核心類是 KafkaConsumer。以下是一個簡單的 Java 消費者示例:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
public class SimpleConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
Kafka Streams 是 Kafka 提供的一個輕量級流處理庫,允許開發者以流式的方式處理 Kafka 中的數據。Kafka Streams 提供了豐富的 API,支持窗口操作、狀態管理、流-表連接等高級功能。
Kafka Streams 的核心類是 KafkaStreams 和 StreamsBuilder。以下是一個簡單的 Kafka Streams 示例,展示了如何將輸入流中的消息轉換為大寫并輸出到另一個 Topic:
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;
public class StreamsApp {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("application.id", "streams-app");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("input-topic");
KStream<String, String> upperCaseStream = source.mapValues(String::toUpperCase);
upperCaseStream.to("output-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
Kafka Streams 支持基于時間的窗口操作,如滾動窗口、滑動窗口和會話窗口。以下是一個使用滾動窗口的示例:
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import java.time.Duration;
import java.util.Properties;
public class WindowedStreamsApp {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("application.id", "windowed-streams-app");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("input-topic");
source.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count()
.toStream()
.to("windowed-output-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
Kafka Connect 是 Kafka 提供的一個工具,用于在 Kafka 和其他系統之間進行數據導入和導出。Kafka Connect 提供了豐富的連接器,支持與各種數據源和數據匯的集成。
Kafka Connect 的核心概念是連接器(Connector)和任務(Task)。連接器負責管理任務的創建和調度,任務負責實際的數據傳輸。以下是一個簡單的 Kafka Connect 示例,展示了如何使用 FileStreamSourceConnector 將文件中的數據導入到 Kafka:
”`java import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.runtime.standalone.StandaloneWorker; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfig; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.Config; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigDef; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.Type; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.Value; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Enum; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.EnumValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.List; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.ListValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Map; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.MapValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.String; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.StringValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Struct; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.StructValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Union; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.UnionValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Unknown; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.UnknownValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Void; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.VoidValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Enum; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.EnumValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.List; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.ListValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Map; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.MapValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.String; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.StringValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Struct; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.StructValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Union; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.UnionValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Unknown; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.UnknownValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Void; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.VoidValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Enum; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.EnumValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.List; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.ListValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Map; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.MapValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.String; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.StringValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Struct; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.StructValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Union; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.UnionValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Unknown; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.UnknownValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Void; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.VoidValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Enum; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.EnumValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.List; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.ListValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Map; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.MapValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.String; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.StringValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Struct; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.StructValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Union; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.UnionValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Unknown; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.UnknownValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Void; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.VoidValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Enum; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.EnumValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.List; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.ListValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Map; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.MapValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.String; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.StringValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Struct; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.StructValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Union; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.UnionValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Unknown; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.UnknownValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Void; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.VoidValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Enum; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.EnumValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.List; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.ListValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Map; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.MapValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.String; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.StringValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Struct; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.StructValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Union; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.UnionValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Unknown; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.UnknownValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Void; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.VoidValue; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType; import org.apache.kafka.connect.runtime.standalone.StandaloneWorkerConfigTransformer.ConfigValue.ValueType.Enum; import org.apache.kafka.connect.runtime.standalone
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。