Ubuntu環境下實現Hadoop實時數據處理的技術方案
傳統Hadoop MapReduce以批處理為核心,無法滿足實時數據處理需求。需結合流處理框架(如Flink、Spark Streaming)與實時數據管道(如Kafka),構建“采集-傳輸-處理-存儲”的完整實時流水線。以下是具體實施步驟:
sudo apt update
sudo apt install openjdk-11-jdk
驗證安裝:java -version(需顯示Java版本信息)。/usr/local/hadoop-3.3.1),配置核心文件:
core-site.xml:設置HDFS默認文件系統地址(fs.defaultFS=hdfs://localhost:9000);hdfs-site.xml:配置NameNode數據目錄(dfs.namenode.name.dir=/opt/hadoopdata/namenode)與DataNode數據目錄(dfs.datanode.data.dir=/opt/hadoopdata/datanode),并設置副本數(dfs.replication=1,單機環境)。hadoop namenode -format,啟動HDFS:start-dfs.sh(通過jps驗證NameNode、DataNode進程是否運行)。Kafka作為分布式消息隊列,負責實時數據的接收、存儲與傳輸,是連接數據源與流處理框架的關鍵組件。
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &
test_topic):bin/kafka-topics.sh --create --topic test_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic),消費者接收消息(bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic --from-beginning)。Flink是原生流處理框架,支持事件時間、狀態管理與容錯,適合實時分析、實時ETL等場景。
~/.bashrc添加export FLINK_HOME=/usr/local/flink-1.13.2、export PATH=$PATH:$FLINK_HOME/bin),啟動集群:bin/start-cluster.sh
驗證:訪問http://localhost:8081查看Flink Web UI。from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.common.serialization import SimpleStringSchema
env = StreamExecutionEnvironment.get_execution_environment()
# 配置Kafka消費者
kafka_consumer = FlinkKafkaConsumer(
topics='test_topic',
deserialization_schema=SimpleStringSchema(),
properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'flink_group'}
)
# 添加數據源
stream = env.add_source(kafka_consumer)
# 數據處理:按空格分割單詞,統計詞頻
result = stream \
.flat_map(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.key_by(lambda x: x[0]) \
.sum(1)
# 輸出到控制臺
result.print()
# 執行任務
env.execute("Kafka Flink WordCount")
提交任務:flink run -c your_module_name your_script.py。Spark Streaming通過微批處理模擬流處理,繼承Spark的易用性與生態(如Hive、HDFS集成),適合已有Spark基礎的場景。
~/.bashrc添加export SPARK_HOME=/usr/local/spark-3.2.1-bin-hadoop3.2、export PATH=$PATH:$SPARK_HOME/bin)。KafkaUtils讀取Kafka數據,進行實時單詞計數:from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext(appName="KafkaSparkStreaming")
ssc = StreamingContext(sc, batch_duration=1) # 1秒批處理間隔
# 配置Kafka參數
kafka_params = {"metadata.broker.list": "localhost:9092"}
topics = ["test_topic"]
# 創建Kafka流
kafka_stream = KafkaUtils.createDirectStream(ssc, topics, kafka_params)
# 數據處理:提取值,按空格分割單詞,統計詞頻
lines = kafka_stream.map(lambda x: x[1])
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
# 輸出結果
word_counts.pprint()
# 啟動流處理
ssc.start()
ssc.awaitTermination()
提交任務:spark-submit --master local[2] your_script.py。處理后的實時數據可存儲至以下系統,滿足不同場景需求:
FileSystemSink寫入HDFS);parallelism.default)、批處理間隔(Spark Streaming的batchDuration),優化GC策略(如使用G1GC減少停頓時間)。通過以上步驟,可在Ubuntu環境下構建基于Hadoop生態的實時數據處理系統,結合流處理框架的高吞吐與低延遲特性,滿足實時分析、監控等場景需求。