# Spark MLlib 聚類 KMeans 怎么用
Apache Spark 的 MLlib 是一個強大的機器學習庫,提供了多種聚類算法,其中 KMeans 是最常用的無監督學習算法之一。本文將詳細介紹如何在 Spark MLlib 中使用 KMeans 進行聚類分析。
## 目錄
1. [KMeans 算法簡介](#kmeans-算法簡介)
2. [環境準備](#環境準備)
3. [數據準備](#數據準備)
4. [KMeans 模型訓練](#kmeans-模型訓練)
5. [模型評估](#模型評估)
6. [模型保存與加載](#模型保存與加載)
7. [完整代碼示例](#完整代碼示例)
8. [總結](#總結)
---
## KMeans 算法簡介
KMeans 是一種基于距離的聚類算法,通過迭代將數據點劃分為 K 個簇,使得每個數據點屬于離其最近的簇中心(質心)。算法步驟如下:
1. 隨機初始化 K 個質心
2. 將每個數據點分配到最近的質心
3. 重新計算每個簇的質心(取簇內點的均值)
4. 重復步驟 2-3 直到質心不再變化或達到最大迭代次數
Spark MLlib 實現了分布式版本的 KMeans,適合處理大規模數據集。
---
## 環境準備
確保已安裝以下環境:
- Java 8+
- Spark 3.x
- Scala 2.12 或 Python(PySpark)
Maven 依賴(Scala):
```xml
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.12</artifactId>
<version>3.2.1</version>
</dependency>
KMeans 要求輸入數據為 Vector 類型。假設我們有一個文本文件 data.txt,每行是用空格分隔的數值:
1.2 3.4 2.1
0.5 1.2 0.9
...
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("KMeansExample").getOrCreate()
val data = spark.read.textFile("data.txt")
// 轉換為Vector RDD
val parsedData = data.map { line =>
Vectors.dense(line.split(" ").map(_.toDouble))
}.toDF("features")
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("KMeansExample").getOrCreate()
data = spark.read.text("data.txt")
def parse_line(line):
return [float(x) for x in line.value.split(" ")]
parsedData = data.rdd.map(parse_line).map(Vectors.dense).toDF(["features"])
k: 簇數量(必填)maxIter: 最大迭代次數(默認20)seed: 隨機種子(可選)tol: 收斂閾值(默認1e-4)import org.apache.spark.ml.clustering.KMeans
val kmeans = new KMeans()
.setK(3)
.setMaxIter(20)
.setSeed(1L)
val model = kmeans.fit(parsedData)
from pyspark.ml.clustering import KMeans
kmeans = KMeans(k=3, maxIter=20, seed=1)
model = kmeans.fit(parsedData)
val wcss = model.computeCost(parsedData)
println(s"Within-Cluster Sum of Squares = $wcss")
val predictions = model.transform(parsedData)
predictions.show()
輸出示例:
+-----------------+----------+
| features|prediction|
+-----------------+----------+
|[1.2,3.4,2.1] | 1|
|[0.5,1.2,0.9] | 0|
...
+-----------------+----------+
// 保存
model.save("/path/to/model")
// 加載
import org.apache.spark.ml.clustering.KMeansModel
val sameModel = KMeansModel.load("/path/to/model")
# 保存
model.save("/path/to/model")
# 加載
from pyspark.ml.clustering import KMeansModel
sameModel = KMeansModel.load("/path/to/model")
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
object KMeansExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("KMeansExample").getOrCreate()
val data = spark.read.textFile("data.txt")
val parsedData = data.map { line =>
Vectors.dense(line.split(" ").map(_.toDouble))
}.toDF("features")
val kmeans = new KMeans()
.setK(3)
.setMaxIter(20)
.setSeed(1L)
val model = kmeans.fit(parsedData)
val wcss = model.computeCost(parsedData)
println(s"WCSS: $wcss")
model.transform(parsedData).show()
spark.stop()
}
}
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors
spark = SparkSession.builder.appName("KMeansExample").getOrCreate()
data = spark.read.text("data.txt")
parsedData = data.rdd.map(lambda x: [float(v) for v in x.value.split(" ")])\
.map(Vectors.dense).toDF(["features"])
kmeans = KMeans(k=3, maxIter=20, seed=1)
model = kmeans.fit(parsedData)
wcss = model.computeCost(parsedData)
print(f"WCSS: {wcss}")
model.transform(parsedData).show()
spark.stop()
通過 Spark MLlib 的 KMeans 實現,我們可以輕松處理大規模數據集的聚類任務。關鍵步驟包括: 1. 準備向量格式的數據 2. 設置合理的 K 值和迭代參數 3. 評估模型質量(如WCSS) 4. 保存模型供后續使用
實際應用中可能需要嘗試不同的 K 值,或結合其他技術(如PCA)提高聚類效果。 “`
注:實際字數約1500字,可根據需要調整代碼示例的詳細程度或增加更多理論解釋來擴展內容。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。