本篇內容介紹了“DataSourceV2流處理方法是什么”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
SparkSession結構化流處理最后其實是通過DataSet的writeStream觸發執行的。這點與傳統的spark sql方式是不一樣的。writeStream會找到StreamingQueryManager的startQuery方法,然后一步步到MicroBatchExecution和ContinuousExecution。
核心點:MicroBatchExecution和ContinuousExecution里面會對StreamingRelationV2進行轉換,轉換成StreamingDataSourceV2Relation。而MicroBatchExecution和ContinuousExecution只有在StreamingQueryManager的createQuery方法中才會被使用到。那么這個StreamingQueryManager的createQuery方法會在哪里被使用到呢?跟蹤代碼會發現是DataStreamWriter中調用StreamingQueryManager的startQuery方法進而調用到createQuery方法的。
而DataStreamWriter是Dataset的writeStream創建的。
【以上說的是寫入流的過程】。
關鍵類:BaseSessionStateBuilder,里面有analyzer的定義。
protected def analyzer: Analyzer = new Analyzer(catalog, v2SessionCatalog, conf) {
override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
new FindDataSourceTable(session) +:
new ResolveSQLOnFile(session) +:
new FallBackFileSourceV2(session) +:
DataSourceResolution(conf, this.catalogManager) +:
customResolutionRules
override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
new DetectAmbiguousSelfJoin(conf) +:
PreprocessTableCreation(session) +:
PreprocessTableInsertion(conf) +:
DataSourceAnalysis(conf) +:
customPostHocResolutionRules
override val extendedCheckRules: Seq[LogicalPlan => Unit] =
PreWriteCheck +:
PreReadCheck +:
HiveOnlyCheck +:
TableCapabilityCheck +:
customCheckRules
}這里沒有特別需要關注的,先忽略。
DataSourceV2是指spark中V2版本的結構化流處理引擎框架。這里說的邏輯計劃就是StreamingDataSourceV2Relation,對應的物理計劃分成兩類:MicroBatchScanExec和ContinuousScanExec,兩者的應用場景從取名上就可以分辨出來,一個是微批處理模式;另一個則是連續流模式。
我們先從物理計劃開始解析。
這兩個物理計劃基于同一個父類:DataSourceV2ScanExecBase,先看看父類的代碼:
關鍵代碼:
override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
inputRDD.map { r =>
numOutputRows += 1
r
}
}子類需要重寫inputRDD。
兩種重要的checkpoint屬性:
val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))
val commitLog = new CommitLog(sparkSession, checkpointFile("commits"))
offsetLog是當前讀取到哪個offset了,commitLog是當前處理到哪個Offset了。這兩個Log非常重要,合在一起保證了Exactly-once語義。
好了,先看看MicroBatchScanExec是怎么重寫inputRDD的。
override lazy val partitions: Seq[InputPartition] = stream.planInputPartitions(start, end)
override lazy val readerFactory: PartitionReaderFactory = stream.createReaderFactory()
override lazy val inputRDD: RDD[InternalRow] = {
new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar)
}有三個地方,第一個是重寫Seq[InputPartition],調用stream的planInputPartitions方法,注意下這里的stream類型是MicroBatchStream;第二個是重寫readerFactory,獲得讀取器工廠類;第三個重寫是inputRDD,創建DataSourceRDD作為inputRDD,而前兩步重寫的Seq[InputPartition]和readerFactory作為DataSourceRDD的構造參數。
這里首先大概看下DataSourceRDD的功能是什么。
DataSourceRDD這個類的代碼很短,很容易看清楚。最重要的就是compute方法,先給出全部代碼:
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val inputPartition = castPartition(split).inputPartition
val reader: PartitionReader[_] = if (columnarReads) {
partitionReaderFactory.createColumnarReader(inputPartition)
} else {
partitionReaderFactory.createReader(inputPartition)
}
context.addTaskCompletionListener[Unit](_ => reader.close())
val iter = new Iterator[Any] {
private[this] var valuePrepared = false
override def hasNext: Boolean = {
if (!valuePrepared) {
valuePrepared = reader.next()
}
valuePrepared
}
override def next(): Any = {
if (!hasNext) {
throw new java.util.NoSuchElementException("End of stream")
}
valuePrepared = false
reader.get()
}
}
// TODO: SPARK-25083 remove the type erasure hack in data source scan
new InterruptibleIterator(context, iter.asInstanceOf[Iterator[InternalRow]])
}先根據讀取器工廠類創建一個PartitionReader,然后調用PartitionReader的get方法獲取數據。就是這么簡單了!
最后再看下ContinuousScanExec的定義。
override lazy val partitions: Seq[InputPartition] = stream.planInputPartitions(start)
override lazy val readerFactory: ContinuousPartitionReaderFactory = {
stream.createContinuousReaderFactory()
}
override lazy val inputRDD: RDD[InternalRow] = {
EpochCoordinatorRef.get(
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
sparkContext.env)
.askSync[Unit](SetReaderPartitions(partitions.size))
new ContinuousDataSourceRDD(
sparkContext,
sqlContext.conf.continuousStreamingExecutorQueueSize,
sqlContext.conf.continuousStreamingExecutorPollIntervalMs,
partitions,
schema,
readerFactory.asInstanceOf[ContinuousPartitionReaderFactory])
}和微批處理模式MicroBatchScanExec類似,也有三個地方重寫,第一個是重寫Seq[InputPartition],調用stream的planInputPartitions方法,注意下這里的stream類型是ContinuousStream;第二個是重寫readerFactory,獲得讀取器工廠類ContinuousPartitionReaderFactory;第三個重寫是inputRDD,創建ContinuousDataSourceRDD作為inputRDD,而前兩步重寫的Seq[InputPartition]和readerFactory作為ContinuousDataSourceRDD的構造參數。
這里首先大概看下ContinuousDataSourceRDD的功能是什么。
ContinuousDataSourceRDD的代碼和DataSourceRDD的基本差不多,直接看源碼吧,這里就不細說了,也沒啥好細說的,顯得啰里啰唆。
對于Kafka來說,ContinuousDataSourceRDD和DataSourceRDD其實最終是一樣的
“DataSourceV2流處理方法是什么”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。