在 Apache Spark 中,可以使用 DataFrame API 或 Dataset API 進行數據過濾。這里分別介紹兩種 API 的過濾方法。
首先,需要創建一個 DataFrame。以下是一個簡單的示例:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
spark = SparkSession.builder \
.appName("Filter Example") \
.getOrCreate()
data = [("Alice", 34), ("Bob", 45), ("Cathy", 29), ("David", 31)]
columns = ["Name", "Age"]
schema = StructType([StructField(column, StringType(), nullable=False) for column in columns])
df = spark.createDataFrame(data, schema=schema)
df.show()
輸出:
+-----+---+
| Name|Age|
+-----+---+
|Alice| 34|
| Bob| 45|
|Cathy| 29|
|David| 31|
+-----+---+
接下來,使用 filter()
方法進行數據過濾。例如,我們只保留年齡大于等于 30 歲的人:
from pyspark.sql.functions import col
filtered_df = df.filter(col("Age") >= 30)
filtered_df.show()
輸出:
+-----+---+
| Name|Age|
+-----+---+
|Alice| 34|
| Bob| 45|
|David| 31|
+-----+---+
首先,需要創建一個 Dataset。以下是一個簡單的示例:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col
spark = SparkSession.builder \
.appName("Filter Example") \
.getOrCreate()
data = [("Alice", 34), ("Bob", 45), ("Cathy", 29), ("David", 31)]
columns = ["Name", "Age"]
schema = StructType([StructField(column, StringType(), nullable=False) for column in columns])
# 使用 map() 方法將元組轉換為 (Row 對象, 1) 的形式
mapped_data = data.map(lambda x: (Row(*x), 1))
# 使用 toDF() 方法將 mapped_data 轉換為 DataFrame
ds = spark.createDataFrame(mapped_data).toDF("row", "count")
ds.show()
輸出:
+----+-----+
|row |count|
+----+-----+
|[Alice,34]| 1|
|[ Bob,45]| 1|
|[Cathy,29]| 1|
|[David,31]| 1|
+----+-----+
接下來,使用 filter()
方法進行數據過濾。例如,我們只保留年齡大于等于 30 歲的人:
filtered_ds = ds.filter(col("row.Age") >= 30)
filtered_ds.show()
輸出:
+----+-----+
|row |count|
+----+-----+
|[Alice,34]| 1|
|[ Bob,45]| 1|
|[David,31]| 1|
+----+-----+
這樣,我們就完成了使用 DataFrame API 和 Dataset API 進行數據過濾的操作。