要對Spark函數進行單元測試,您可以使用以下步驟:
import unittest
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from your_module import your_spark_function # 導入你要測試的Spark函數
unittest.TestCase:class TestYourSparkFunction(unittest.TestCase):
def setUp(self):
# 初始化SparkSession
self.spark = SparkSession.builder \
.appName("Test Your Spark Function") \
.getOrCreate()
def test_your_spark_function(self):
# 創建測試數據
data = [("Alice", 34), ("Bob", 45), ("Cathy", 29)]
columns = ["Name", "Age"]
df = self.spark.createDataFrame(data, columns)
# 應用Spark函數
result_df = df.withColumn("AgeGroup", your_spark_function(col("Age")))
# 驗證結果
expected_data = [("Alice", 34, "30-40"), ("Bob", 45, "40-50"), ("Cathy", 29, "20-30")]
expected_columns = ["Name", "Age", "AgeGroup"]
expected_df = self.spark.createDataFrame(expected_data, expected_columns)
self.assertEqual(result_df.collect(), expected_df.collect())
tearDown方法以清理資源: def tearDown(self):
# 停止SparkSession
self.spark.stop()
main方法以運行測試:if __name__ == "__main__":
unittest.main()
將上述代碼片段整合到一個Python文件中,然后運行該文件。這將執行單元測試并驗證您的Spark函數是否按預期工作。