Kafka配置中的線程數主要涉及服務端和客戶端,具體設置如下:
網絡請求處理線程
num.network.threads
:處理網絡請求的線程數,建議設置為CPU核數的2-3倍。num.network.threads=8
(8核服務器可設為8-12)。磁盤IO處理線程
num.io.threads
:處理磁盤讀寫的線程數,建議設置為CPU核數的50%,不超過總核數。num.io.threads=4
(8核服務器可設為4-6)。分區恢復線程
num.recovery.threads.per.data.dir
:每個數據目錄的恢復線程數,通常設置為1-3,用于日志恢復和清理。num.recovery.threads.per.data.dir=2
。生產者多線程
ExecutorService producerExecutor = Executors.newFixedThreadPool(10); // 10個生產者線程
producerExecutor.submit(() -> {
// 生產者發送邏輯
});
消費者多線程
createMessageStreams
的topicCountMap
設置每個主題的消費線程數,需確保線程數≤分區數。Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put("topic1", 3); // 3個線程消費topic1
ConcurrentKafkaListenerContainerFactory
的concurrency
參數,需與分區數匹配。@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(3); // 每個消費者實例的線程數(不超過分區數)
return factory;
}
參數來源: