溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Structured中怎么利用Streaming實現超低延遲

發布時間:2021-08-10 11:22:57 來源:億速云 閱讀:175 作者:Leah 欄目:大數據

這期內容當中小編將會給大家帶來有關Structured中怎么利用Streaming實現超低延遲,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

要在連續處理模式下運行支持的查詢,您只需指定一個連續觸發器,并將所需的checkpoint間隔作為參數。 例如浪尖的demo如下:

object ContinuousProcessing {
 def main(args: Array[String]): Unit = {
   val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("yarn-client")
     .set("yarn.resourcemanager.hostname", "mt-mdh.local")
     .set("spark.executor.instances","2")
     .set("spark.default.parallelism","4")
     .set("spark.sql.shuffle.partitions","4")
     .setJars(List("/Users/meitu/Desktop/sparkjar/bigdata.jar"
       ,"/opt/jars/spark-streaming-kafka-0-10_2.11-2.3.1.jar"
       ,"/opt/jars/kafka-clients-0.10.2.2.jar"
       ,"/opt/jars/kafka_2.11-0.10.2.2.jar"
       ,"/opt/jars/spark-sql-kafka-0-10_2.11-2.0.2.jar"))


   val spark = SparkSession
     .builder
     .appName("StructuredKafkaWordCount")
     .config(sparkConf)
     .getOrCreate()

   spark
     .readStream
     .format("kafka")
     .option("kafka.bootstrap.servers", "mt-mdh.local:9093")
     .option("subscribe", "StructuredSource")
     .load()
     .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
     .writeStream
     .format("kafka")
     .option("kafka.bootstrap.servers", "mt-mdh.local:9093")
     .option("topic", "StructuredSink")
     .option("checkpointLocation","/sql/checkpoint")
     .trigger(Trigger.Continuous("1 second"))  // only change in query
     .start()
     .awaitTermination()
 }

}

checkpoint 間隔為1秒意味著連續處理引擎將每秒記錄查詢的進度。 生成的checkpoint采用與微批處理引擎兼容的格式,因此可以使用任何觸發器重新啟動任何查詢。 例如,假如查詢支持微批處理和連續處理,那么實際上也可以用連續處理觸發器去啟動微批處理觸發器,反之亦然。 

請注意,無論何時切換到連續模式,都將獲得至少一次的容錯保證。

支持的查詢

從Spark 2.3開始,連續處理模式僅支持以下類型的查詢。

  • Operations:在連續模式下僅支持dataset/dataframe的類似于map的操作,即支持projection(select,map,flatMap,mapPartitions等)和selection(where,filter等)。

  • 除了聚合函數(因為尚不支持聚合),current_timestamp()和current_date()(使用時間的確定性計算具有挑戰性)之外,支持所有SQL函數。

Sources 

  • Kafka Source:支持所有操作。

  • Rate source:適合測試。只有連續模式支持的選項是numPartitions和rowsPerSecond。

Sinks

  • Kafka sink:支持所有選項。

  • Memory sink:適合調試。

  • Console sink:適合調試。支持所有操作。請注意,控制臺將打印你在連續觸發器中指定的每個checkpoint間隔。

更詳細的關于sink和source信息,請參閱輸入源和輸出接收器部分的官網。雖然控制臺接收器非常適合測試,但是使用Kafka作為源和接收器可以最好地觀察到端到端的低延遲處理。

注意事項

  • 連續處理引擎啟動多個長時間運行的任務,這些任務不斷從源中讀取數據,處理數據并連續寫入接收器。 查詢所需的任務數取決于查詢可以并行從源讀取的分區數。 因此,在開始連續處理查詢之前,必須確保群集中有足夠的核心并行執行所有任務。 例如,如果您正在讀取具有10個分區的Kafka主題,則群集必須至少具有10個核心才能使查詢正常執行。

  • 停止連續處理流可能會產生虛假的任務終止警告。 這些可以安全地忽略。

  • 目前沒有自動重試失敗的任務。 任何失敗都將導致查詢停止,并且需要從檢查點手動重新啟動。

上述就是小編為大家分享的Structured中怎么利用Streaming實現超低延遲了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女