在CentOS上集成Hadoop分布式文件系統(HDFS)與Apache Kafka,通常涉及將Kafka作為數據的生產者或消費者,并將數據寫入HDFS或從HDFS讀取數據。以下是一個簡化的應用案例,展示如何使用Kafka將數據寫入HDFS。
kafka-topics.sh --create --topic order-created-topic --partitions 12 --replication-factor 3 --bootstrap-server localhost:9092
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("order-created-topic", orderId, orderJson));
producer.close();
SparkConf conf = new SparkConf().setAppName("Kafka to HDFS");
conf.setMaster("local[*]");
JavaPairRDD<String, String> lines = KafkaUtils.createDirectStream(
conf,
"order-created-topic",
new StringDeserializer(),
new StringDeserializer()
).mapToPair(record -> new Tuple2<>(record.value(), record.key()));
lines.saveAsHadoopFile("/path/to/hdfs/directory",
new TextOutputFormat<String, String>(),
"org.apache.hadoop.mapred.lib.MultipleTextOutputFormat",
new Configuration(false)
);
請注意,上述代碼示例和配置可能需要根據實際環境進行調整。在實際應用中,還需要考慮數據的序列化方式、錯誤處理、資源管理等因素。此外,對于生產環境,還需要考慮安全性配置,如SSL/TLS加密和身份驗證。