小編給大家分享一下結構化處理之Spark Session的示例分析,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
創建DataFrame,有三種模式,一種是sql()主要是訪問Hive表;一種是從RDD生成DataFrame,主要從ExistingRDD開始創建;還有一種是read/format格式,從json/txt/csv等數據源格式創建。
先看看第三種方式的創建流程。
1、read/format
def read: DataFrameReader = new DataFrameReader(self)
SparkSession.read()方法直接創建DataFrameReader,然后再DataFrameReader的load()方法來導入外部數據源。load()方法主要邏輯如下:
def load(paths: String*): DataFrame = {
sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = paths,
userSpecifiedSchema = userSpecifiedSchema,
className = source,
options = extraOptions.toMap).resolveRelation())
}創建對應數據源類型的DataSource,DataSource解析成BaseRelation,然后通過SparkSession的baseRelationToDataFrame方法從BaseRelation映射生成DataFrame。從BaseRelation創建LogicalRelation,然后調用Dataset.ofRows方法從LogicalRelation創建DataFrame。DataFrame實際就是Dataset。
type DataFrame = Dataset[Row]
baseRelationToDataFrame的定義:
def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
Dataset.ofRows(self, LogicalRelation(baseRelation))
}Dataset.ofRows方法主要是將邏輯計劃轉換成物理計劃,然后生成新的Dataset。
2、執行
SparkSession的執行關鍵是如何從LogicalPlan生成物理計劃。我們試試跟蹤這部分邏輯。
def count(): Long = withAction("count", groupBy().count().queryExecution) { plan =>
plan.executeCollect().head.getLong(0)
}
Dataset的count()動作觸發物理計劃的執行,調用物理計劃plan的executeCollect方法,該方法實際上會調用doExecute()方法生成Array[InternalRow]格式。executeCollect方法在SparkPlan中定義。
3、HadoopFsRelation
需要跟蹤下如何從HadoopFsRelation生成物理計劃(也就是SparkPlan)
通過FileSourceStrategy來解析。它在FileSourceScanExec上疊加Filter和Projection等操作,看看FileSourceScanExec的定義:
case class FileSourceScanExec(
@transient relation: HadoopFsRelation,
output: Seq[Attribute],
requiredSchema: StructType,
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression],
override val metastoreTableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec with ColumnarBatchScan {
。。。
}它的主要執行代碼doExecute()的功能邏輯如下:
protected override def doExecute(): RDD[InternalRow] = {
if (supportsBatch) {
// in the case of fallback, this batched scan should never fail because of:
// 1) only primitive types are supported
// 2) the number of columns should be smaller than spark.sql.codegen.maxFields
WholeStageCodegenExec(this).execute()
} else {
val unsafeRows = {
val scan = inputRDD
if (needsUnsafeRowConversion) {
scan.mapPartitionsWithIndexInternal { (index, iter) =>
val proj = UnsafeProjection.create(schema)
proj.initialize(index)
iter.map(proj)
}
} else {
scan
}
}
val numOutputRows = longMetric("numOutputRows")
unsafeRows.map { r =>
numOutputRows += 1
r
}
}
}inputRDD有兩種方式創建,一是createBucketedReadRDD,二是createNonBucketedReadRDD。兩者沒有本質的區別,僅僅是文件分區規則的不同。
private lazy val inputRDD: RDD[InternalRow] = {
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
dataSchema = relation.dataSchema,
partitionSchema = relation.partitionSchema,
requiredSchema = requiredSchema,
filters = pushedDownFilters,
options = relation.options,
hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
relation.bucketSpec match {
case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)
case _ =>
createNonBucketedReadRDD(readFile, selectedPartitions, relation)
}
}
createNonBucketedReadRDD調用FileScanRDD :
new FileScanRDD(fsRelation.sparkSession, readFile, partitions)以上是“結構化處理之Spark Session的示例分析”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。