Kafka TimeoutException 通常是由于客戶端與 Kafka 服務器之間的通信超時引起的。這可能是由于網絡問題、服務器負載過高或者客戶端配置不當等原因導致的。為了優化連接池,你可以嘗試以下方法:
connection.timeout.ms
和 request.timeout.ms
。Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("connection.timeout.ms", "60000"); // 增加連接超時時間到 60 秒
props.put("request.timeout.ms", "60000"); // 增加請求超時時間到 60 秒
session.timeout.ms
。props.put("session.timeout.ms", "30000"); // 增加會話超時時間到 30 秒
max.partition.fetchers
和 max.in.flight.requests.per.connection
來實現。props.put("max.partition.fetchers", "16"); // 增加最大分區獲取者數
props.put("max.in.flight.requests.per.connection", "5"); // 增加每個連接的最大未確認請求數
Properties props = new Properties();
// ... 其他配置 ...
// 創建 Kafka 消費者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 使用連接池
consumer.setPartitionsPerTopic(10); // 每個主題的分區數
consumer.setFetchMaxBytes(1048576); // 每次拉取的最大字節數
consumer.setFetchMinBytes(1); // 每次拉取的最小字節數
consumer.setFetchWaitMaxMs(500); // 拉取等待的最大時間
consumer.setMaxPollRecords(500); // 每次輪詢的最大記錄數
consumer.setMaxPartitionFetchBytes(1048576); // 每個分區拉取的最大字節數
通過以上方法,你可以嘗試優化 Kafka 客戶端的連接池配置,以減少 TimeoutException 的發生。