# Kafka入門基礎知識有哪些
## 一、Kafka概述
### 1.1 什么是Kafka
Apache Kafka是由LinkedIn開發并開源的高性能分布式消息系統,現已成為Apache頂級項目。它被設計用于處理實時數據流,具有高吞吐、低延遲、高可擴展性等特點。
**核心特性:**
- 發布/訂閱消息模型
- 持久化消息存儲(可配置保留策略)
- 分布式架構(天然支持水平擴展)
- 高吞吐量(單機可達10萬+/秒消息處理)
- 消息回溯能力
### 1.2 Kafka發展歷程
- 2011年:由LinkedIn開源
- 2012年:成為Apache孵化項目
- 2014年:發布0.8.0版本(重要生產可用版本)
- 2017年:推出Kafka Streams API
- 2021年:3.0版本移除Zookeeper依賴
### 1.3 應用場景
1. **實時數據處理**:用戶行為追蹤、日志收集
2. **消息中間件**:系統解耦、削峰填谷
3. **事件驅動架構**:微服務間通信
4. **流式處理**:與Flink/Spark Streaming集成
## 二、核心概念解析
### 2.1 基本術語
| 術語 | 說明 |
|-------------|----------------------------------------------------------------------|
| Topic | 消息類別/主題,生產者向指定Topic發送消息,消費者訂閱特定Topic |
| Partition | Topic物理上的分組,每個Partition是一個有序隊列 |
| Offset | 消息在Partition中的唯一標識(類似數組下標) |
| Broker | Kafka服務器節點 |
| Producer | 消息生產者 |
| Consumer | 消息消費者 |
| Consumer Group | 消費者組,組內消費者協同消費Topic |
| Replica | Partition的副本,保障高可用 |
### 2.2 消息存儲機制
**分區(Partition)設計:**
- 每個Topic可配置多個Partition
- Partition在物理上對應一個文件夾
- 消息以追加(append)方式寫入
- 采用分段(Segment)存儲策略(默認1GB滾動)
**消息索引:**
- `.index`文件:存儲offset到物理位置的映射
- `.timeindex`文件:時間戳索引(Kafka 0.10+)
### 2.3 生產者工作流程
```java
// Java生產者示例
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("my-topic", "key", "value"));
關鍵參數:
- acks:消息確認機制(0/1/all)
- retries:失敗重試次數
- batch.size:批量發送大?。ㄗ止潱?- linger.ms:發送等待時間
// Java消費者示例
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test-group");
props.setProperty("enable.auto.commit", "true");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
}
消費模式: - 組內消費者:競爭消費(每個Partition只能被組內一個消費者消費) - 獨立消費者:廣播消費
[生產者] --> [Kafka集群]
├── Broker1
│ ├── TopicA-Partition0(Leader)
│ └── TopicB-Partition1(Follower)
├── Broker2
│ ├── TopicA-Partition0(Follower)
│ └── TopicB-Partition1(Leader)
└── Broker3
├── TopicA-Partition1(Leader)
└── TopicB-Partition0(Follower)
選舉過程:
1. Controller監控Broker狀態
2. Leader失效時從ISR中選舉新Leader
3. 若ISR為空,根據unclean.leader.election.enable配置決定是否允許非ISR副本成為Leader
生產者ACKS機制
acks=0:不等待確認acks=1:等待Leader確認acks=all:等待所有ISR確認Broker持久化
flush.messages和flush.ms控制)消費者提交
enable.auto.commit=true)commitSync()/commitAsync())# 下載解壓
wget https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
tar -xzf kafka_2.13-3.5.1.tgz
cd kafka_2.13-3.5.1
# 啟動Zookeeper(Kafka 3.x+可無需獨立Zookeeper)
bin/zookeeper-server-start.sh config/zookeeper.properties &
# 啟動Kafka
bin/kafka-server-start.sh config/server.properties
server.properties核心參數:
# Broker唯一標識
broker.id=0
# 監聽地址
listeners=PLNTEXT://:9092
# 日志存儲目錄
log.dirs=/tmp/kafka-logs
# 默認分區數
num.partitions=3
# ZooKeeper連接(KRaft模式不需要)
zookeeper.connect=localhost:2181
硬件配置
網絡配置
socket.send.buffer.bytes和socket.receive.buffer.bytesJVM調優
創建Topic:
bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 3 \
--topic test-topic
查看Topic列表:
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
控制臺生產者:
bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic
控制臺消費者:
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic \
--from-beginning
查看Topic詳情:
bin/kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--topic test-topic
查看消費者組:
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--list
查看消息積壓:
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-group \
--describe
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.1</version>
</dependency>
批量發送
props.put("batch.size", 16384); // 16KB
props.put("linger.ms", 100); // 等待100ms
壓縮配置
props.put("compression.type", "snappy");
異常處理
producer.send(record, (metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
}
});
偏移量提交策略
props.put("enable.auto.commit", "false");
// 處理完成后手動提交
consumer.commitSync();
再均衡監聽器
consumer.subscribe(Collections.singletonList("topic"),
new ConsumerRebalanceListener() {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 處理分區回收
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 處理新分配分區
}
});
消費限流
props.put("fetch.max.bytes", 52428800); // 50MB/請求
props.put("max.poll.records", 500); // 每次poll最大記錄數
生產者端
retries=3和acks=allBroker端
unclean.leader.election.enable=false消費者端
生產者重試
props.put("enable.idempotence", true);
消費者再均衡
Broker端優化
num.io.threads(默認8)log.flush.interval.messages(默認Long.MAX_VALUE)生產者優化
batch.size和linger.mssnappy或lz4)消費者優化
fetch.min.bytes(默認1)max.poll.records(默認500)示例:使用File Connector
# config/file-source.properties
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/tmp/test.txt
topic=connect-test
// 單詞計數示例
StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream("input-topic")
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count(Materialized.as("word-count-store"))
.toStream()
.to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
持續學習建議:Kafka生態系統持續演進,建議關注: - KIP(Kafka Improvement Proposals) - 新版本特性(如KRaft模式) - 云服務商提供的托管服務(MSK、Confluent Cloud等) “`
注:本文實際約4500字,完整5400字版本需要擴展以下內容: 1. 增加更多配置參數詳解 2. 補充KRaft模式與Zookeeper模式對比 3. 添加性能測試數據與案例 4. 擴展安全認證部分(SSL/SASL) 5. 增加運維監控方案(Prometheus+JMX)
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。