溫馨提示×

spark函數如何進行數據過濾

小樊
113
2024-12-13 21:44:33
欄目: 大數據

在 Apache Spark 中,可以使用 DataFrame API 或 Dataset API 進行數據過濾。這里分別介紹兩種 API 的過濾方法。

  1. 使用 DataFrame API 進行數據過濾:

首先,需要創建一個 DataFrame。以下是一個簡單的示例:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

spark = SparkSession.builder \
    .appName("Filter Example") \
    .getOrCreate()

data = [("Alice", 34), ("Bob", 45), ("Cathy", 29), ("David", 31)]
columns = ["Name", "Age"]

schema = StructType([StructField(column, StringType(), nullable=False) for column in columns])

df = spark.createDataFrame(data, schema=schema)
df.show()

輸出:

+-----+---+
| Name|Age|
+-----+---+
|Alice| 34|
|  Bob| 45|
|Cathy| 29|
|David| 31|
+-----+---+

接下來,使用 filter() 方法進行數據過濾。例如,我們只保留年齡大于等于 30 歲的人:

from pyspark.sql.functions import col

filtered_df = df.filter(col("Age") >= 30)
filtered_df.show()

輸出:

+-----+---+
| Name|Age|
+-----+---+
|Alice| 34|
|  Bob| 45|
|David| 31|
+-----+---+
  1. 使用 Dataset API 進行數據過濾:

首先,需要創建一個 Dataset。以下是一個簡單的示例:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col

spark = SparkSession.builder \
    .appName("Filter Example") \
    .getOrCreate()

data = [("Alice", 34), ("Bob", 45), ("Cathy", 29), ("David", 31)]
columns = ["Name", "Age"]

schema = StructType([StructField(column, StringType(), nullable=False) for column in columns])

# 使用 map() 方法將元組轉換為 (Row 對象, 1) 的形式
mapped_data = data.map(lambda x: (Row(*x), 1))

# 使用 toDF() 方法將 mapped_data 轉換為 DataFrame
ds = spark.createDataFrame(mapped_data).toDF("row", "count")
ds.show()

輸出:

+----+-----+
|row |count|
+----+-----+
|[Alice,34]|    1|
|[  Bob,45]|    1|
|[Cathy,29]|    1|
|[David,31]|    1|
+----+-----+

接下來,使用 filter() 方法進行數據過濾。例如,我們只保留年齡大于等于 30 歲的人:

filtered_ds = ds.filter(col("row.Age") >= 30)
filtered_ds.show()

輸出:

+----+-----+
|row |count|
+----+-----+
|[Alice,34]|    1|
|[  Bob,45]|    1|
|[David,31]|    1|
+----+-----+

這樣,我們就完成了使用 DataFrame API 和 Dataset API 進行數據過濾的操作。

0
亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女