小編給大家分享一下spark中delta寫操作ACID事務的示例分析,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
##分析
直接到WriteIntoDelta.run
override def run(sparkSession: SparkSession): Seq[Row] = { deltaLog.withNewTransaction { txn => val actions = write(txn, sparkSession) val operation = DeltaOperations.Write(mode, Option(partitionColumns), options.replaceWhere, options.userMetadata) txn.commit(actions, operation) } Seq.empty }
我們來看一下deltaLog.withNewTrancation
方法 :
def withNewTransaction[T](thunk: OptimisticTransaction => T): T = { try { update() val txn = new OptimisticTransaction(this) OptimisticTransaction.setActive(txn) thunk(txn) } finally { OptimisticTransaction.clearActive() } }
首先update方法直接同步調用updateInternal用來更新當前deltalog的snapshot,具體的updateInternal如下:
val segment = getLogSegmentForVersion(currentSnapshot.logSegment.checkpointVersion) if (segment.version == currentSnapshot.version) { // Exit early if there is no new file lastUpdateTimestamp = clock.getTimeMillis() return currentSnapshot } logInfo(s"Loading version ${segment.version}" + segment.checkpointVersion.map(v => s"starting from checkpoint version $v.")) val newSnapshot = createSnapshot( segment, minFileRetentionTimestamp, segment.lastCommitTimestamp) ... currentSnapshot.uncache() currentSnapshot = newSnapshot
首先先通過getLogSegmentForVersion方法獲取當前最新的snapshot,之后更新到內存
設置OptimisticTransaction,并在當前事務中執行當前語句
val actions = write(txn, sparkSession) val operation = DeltaOperations.Write(mode, Option(partitionColumns), options.replaceWhere, options.userMetadata) txn.commit(actions, operation)
val atcions = write(txn, sparksession)
我們已經在spark delta寫操作ACID事務前傳--寫文件基礎類FileFormat/FileCommitProtocol分析分析了,即會返回Seq[AddAction],而實際的數據文件已經存儲到了文件目錄下
val operation = DeltaOperations.Write(mode, Option(partitionColumns)
記錄了這是一個delta write Operation
txn.commit(actions, operation)
是該提交delta log的關鍵:
def commit(actions: Seq[Action], op: DeltaOperations.Operation): Long = recordDeltaOperation( deltaLog, "delta.commit") { commitStartNano = System.nanoTime() val version = try { // Try to commit at the next version. var finalActions = prepareCommit(actions, op) // Find the isolation level to use for this commit val noDataChanged = actions.collect { case f: FileAction => f.dataChange }.forall(_ == false) val isolationLevelToUse = if (noDataChanged) { // If no data has changed (i.e. its is only being rearranged), then SnapshotIsolation // provides Serializable guarantee. Hence, allow reduced conflict detection by using // SnapshotIsolation of what the table isolation level is. SnapshotIsolation } else { Serializable } val isBlindAppend = { val dependsOnFiles = readPredicates.nonEmpty || readFiles.nonEmpty val onlyAddFiles = finalActions.collect { case f: FileAction => f }.forall(_.isInstanceOf[AddFile]) onlyAddFiles && !dependsOnFiles } if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_COMMIT_INFO_ENABLED)) { commitInfo = CommitInfo( clock.getTimeMillis(), op.name, op.jsonEncodedValues, Map.empty, Some(readVersion).filter(_ >= 0), None, Some(isBlindAppend), getOperationMetrics(op), getUserMetadata(op)) finalActions = commitInfo +: finalActions } // Register post-commit hooks if any lazy val hasFileActions = finalActions.collect { case f: FileAction => f }.nonEmpty if (DeltaConfigs.SYMLINK_FORMAT_MANIFEST_ENABLED.fromMetaData(metadata) && hasFileActions) { registerPostCommitHook(GenerateSymlinkManifest) } val commitVersion = doCommit(snapshot.version + 1, finalActions, 0, isolationLevelToUse) logInfo(s"Committed delta #$commitVersion to ${deltaLog.logPath}") postCommit(commitVersion, finalActions) commitVersion } catch { case e: DeltaConcurrentModificationException => recordDeltaEvent(deltaLog, "delta.commit.conflict." + e.conflictType) throw e case NonFatal(e) => recordDeltaEvent( deltaLog, "delta.commit.failure", data = Map("exception" -> Utils.exceptionString(e))) throw e } runPostCommitHooks(version, actions) version }
prepareCommit用來做一些提交前的檢查,以及增加一些actions,
如果是第一次提交還得增加Protocol,如{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
如metadata變化了,還得增加newMetadata,如 {"metaData":{"id":"2b2457e3-ce74-4897-abbd-04a94692304a","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1609398723678}}
如果配置了spark.databricks.delta.commitInfo.enabled(默認是true)則還會增加commitInfo信息等,如{"commitInfo":{"timestamp":1609400013646,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"306","numOutputRows":"0"}}}
, 如果做了Presto / Athena兼容,還會注冊GenerateSymlinkManifest postCommitHook,在commit成功后還會進行調用
doCommit(snapshot.version + 1, finalActions, 0, isolationLevelToUse) 同步
提交最終的action到deltalog:
protected def doCommit( attemptVersion: Long, actions: Seq[Action], attemptNumber: Int, isolationLevel: IsolationLevel): Long = deltaLog.lockInterruptibly { try { ... deltaLog.store.write( deltaFile(deltaLog.logPath, attemptVersion), actions.map(_.json).toIterator) val commitTime = System.nanoTime() val postCommitSnapshot = deltaLog.update() if (postCommitSnapshot.version < attemptVersion) { recordDeltaEvent(deltaLog, "delta.commit.inconsistentList", data = Map( "committedVersion" -> attemptVersion, "currentVersion" -> postCommitSnapshot.version )) throw new IllegalStateException( s"The committed version is $attemptVersion " + s"but the current version is ${postCommitSnapshot.version}.") } // Post stats var numAbsolutePaths = 0 var pathHolder: Path = null val distinctPartitions = new mutable.HashSet[Map[String, String]] val adds = actions.collect { case a: AddFile => pathHolder = new Path(new URI(a.path)) if (pathHolder.isAbsolute) numAbsolutePaths += 1 distinctPartitions += a.partitionValues a } ... attemptVersion } catch { case e: java.nio.file.FileAlreadyExistsException => checkAndRetry(attemptVersion, actions, attemptNumber, isolationLevel) } }
deltaLog.store.write( deltaFile(deltaLog.logPath, attemptVersion), actions.map(_.json).toIterator)` 方法直接調用HDFSLogStore的write方法,而最終調用writeInternal方法,這里attemptVersion是當前的version+1 我們看一下writeInternal方法:
private def writeInternal(path: Path, actions: Iterator[String], overwrite: Boolean): Unit = { val fc: FileContext = try { getFileContext(path) } catch { case e: IOException if e.getMessage.contains(noAbstractFileSystemExceptionMessage) => val newException = DeltaErrors.incorrectLogStoreImplementationException(sparkConf, e) logError(newException.getMessage, newException.getCause) throw newException } if (!overwrite && fc.util.exists(path)) { // This is needed for the tests to throw error with local file system throw new FileAlreadyExistsException(path.toString) } val tempPath = createTempPath(path) var streamClosed = false // This flag is to avoid double close var renameDone = false // This flag is to save the delete operation in most of cases. val stream = fc.create( tempPath, EnumSet.of(CREATE), CreateOpts.checksumParam(ChecksumOpt.createDisabled())) try { actions.map(_ + "\n").map(_.getBytes(UTF_8)).foreach(stream.write) stream.close() streamClosed = true try { val renameOpt = if (overwrite) Options.Rename.OVERWRITE else Options.Rename.NONE fc.rename(tempPath, path, renameOpt) renameDone = true // TODO: this is a workaround of HADOOP-16255 - remove this when HADOOP-16255 is resolved tryRemoveCrcFile(fc, tempPath) } catch { case e: org.apache.hadoop.fs.FileAlreadyExistsException => throw new FileAlreadyExistsException(path.toString) } } finally { if (!streamClosed) { stream.close() } if (!renameDone) { fc.delete(tempPath, false) } } }
如果文件存在,則拋出異常。否則寫入該log文件,這里注意:如果是本地文件的話,需要加同步進行rename操作,因為本地文件的rename操作,即使目標文件存在了也不會報異常,其他的文件類型,則不需要加同步。 如果deltaLog.store.write沒有發生異常,則獲取最新的snaphost,進行記錄,返回傳入的attemptVersion, 如果發生了異常,則執行checkAndRetry(attemptVersion, actions, attemptNumber, isolationLevel),進行重試,重試的時候,我們得查找以下從事務提交到失敗這段時間的由其他程序提交的Actions,如果和當前actions沒有沖突則繼續提交,否則拋出異常
postCommit 進行checkpoint操作,每隔 10 個提交,Delta Lake 會在 _delta_log 子目錄下自動生成一個 Parquet 格式的 checkpoint 文件,便于快速replays,且清除過期的deltalog,默認保存30天(這里會把addfile和removefile合并了,比如先ad d A文件,之后remove A 合并完了A就沒記錄了):
def checkpoint(): Unit = recordDeltaOperation(this, "delta.checkpoint") { val checkpointMetaData = checkpoint(snapshot) val json = JsonUtils.toJson(checkpointMetaData) store.write(LAST_CHECKPOINT, Iterator(json), overwrite = true) doLogCleanup() }
runPostCommitHooks 如果注冊了postCommitHooks,就執行
至此整個delta寫deltalog的流程就結束了, 整個流程如下:
update 獲取最新的snapshot | v write()寫入delta data | v commit 事務提交 -> prepareCommit 用來做一些提交前的檢查,以及增加一些actions | v doCommit 真正寫入deltalog,會一直重試直到沖突 | v postCommit 進行checkpoint操作,合并Addfile和RemoveFile,便于快速replays,且清除過期的delta log | v runPostCommitHooks 如果存在hook則執行對應的hook | v 結束
以上是“spark中delta寫操作ACID事務的示例分析”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。