溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

spark sql在scala中使用的方式有哪些

發布時間:2021-12-09 09:22:33 來源:億速云 閱讀:156 作者:iii 欄目:開發技術

這篇文章主要介紹“spark sql在scala中使用的方式有哪些”,在日常操作中,相信很多人在spark sql在scala中使用的方式有哪些問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”spark sql在scala中使用的方式有哪些”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

package hgs.spark.sql
import org.apache.spark.SparkConf
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLImplicits
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.Row
//第一種方法創建dataframe
object SqlTest1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("sqltest1").setMaster("local")
    val context = new SparkContext(conf)
    val sqlContext = new SQLContext(context)
    
    val rdd = context.textFile("d:\\person",1)
    val rdd2 = rdd.map(x=>{val t = x.split(" ");person(t(0).toInt,t(1),t(2).toInt)})
    //第一種方法創建dataframe,在這里需要導入隱式轉換
    import sqlContext.implicits._      
    val persondf = rdd2.toDF()  
    //這個方法在2.1.0里面被廢除
    //persondf.registerTempTable("person")
    //使用該函數代替
    persondf.createOrReplaceTempView("person")
    val result = sqlContext.sql("select * from person order by age desc")
    //打印查詢的結果
    result.show()
    //或者將結果保存到文件
    result.write.json("d://personselect")
   
    context.stop()
  }
}
case class person(id:Int,name:String,age:Int)
//第二種方法創建dataframe
//3.1.2.	通過StructType直接指定Schema
object SqlTest2{
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("sqltest2").setMaster("local")
    val context = new SparkContext(conf)
    val sqlContext = new SQLContext(context)
    
    val rdd = context.textFile("d:\\person",1)
    
    //第一種方法創建dataframe,在這里需要導入隱式轉換
    //創建schema,即一個映射關系   
    val personShcema = StructType(
    List(
        //下面為一個列的描述,分別為 列名,數據類型,是否為空
        StructField("id",IntegerType,true),
        StructField("name",StringType,true),
        StructField("age",IntegerType,true)
     )    
    )
    
    val rdd2 = rdd.map(x=>{val t = x.split(" ");Row(t(0).toInt,t(1),t(2).toInt)})
    //通過這種方式創建dataframe
    val persondf = sqlContext.createDataFrame(rdd2, personShcema)
    //將dataframe映射為一個臨時的表
    persondf.createOrReplaceTempView("person")
    //查詢數據展示
    sqlContext.sql("select * from person order by age desc").show()
    context.stop()
/*  查詢出的數據
 *  +---+----+---+
    | id|name|age|
    +---+----+---+
    |  1| hgs| 26|
    |  3|  zz| 25|
    |  2|  wd| 24|
    |  4|  cm| 24|
    +---+----+---+
    */
    
  }
}
一些筆記:
checkpoint:
	將rdd中間過程持久化到hdfs上面,如果某個rdd失敗,則從hdfs回復,這樣代價較小
	sc.setCheckpointDir("hdfs dir or other fs dir "),建議將RDD cache之后再
	checkpoin這樣將減少一次運算直接從內存中將RDD進行checkpoin
	但是這樣之前依賴的RDD也會被丟棄
RDD Objects構建DAG--->DAGScheduler(TaskSet(每個Task在每個excutor上&&切分stage,并提價stage))
    ------>TaskScheduler(Task&&提交task,)------>Worker	(執行task)
stage:根據依賴關系區分stage,當遇到一個寬依賴(節點之間交換數據)的時候劃分一個stage
	其中窄依賴:父RDD的分區數據只傳向一個子RDD分區,而寬依賴則是父RDD的分區數據會傳向多個子RDD的或者多個分區
	
spark SQL:處理結構化的數據
	DataFrames:與RDD類似,DataFrame也是一個分布式數據容器。然而DataFrame更像傳統數據庫的二維表格,
		除了數據以外,還記錄數據的結構信息,即schema。同時,與Hive類似,DataFrame也支持
		嵌套數據類型(struct、array和map)。從API易用性的角度上 看,DataFrame API提供的是一套高層
		的關系操作,比函數式的RDD API要更加友好,門檻更低。由于與R和Pandas的DataFrame類似,
		Spark DataFrame很好地繼承了傳統單機數據分析的開發體驗
	創建DataFrame: 將數據映射為class,RDD.toDF 
	通過sql查詢,將df注冊為一個表1. df.registerTempTable("test") sqlContext.sql("select * from test").show 
								 2.通過StructType定義:StrutType(List())
hive 3.0.0 與spark
	1.將hive-site.xml hdfs-site.xml  core-site.xml復制到spark的conf文件夾下 ,將mysql驅動放到spark的jars文件夾下面
	2.在hive中的語句在spark-sql中完全適用:
		create table person(id int,name string,age int) row format delimited fields terminated by ' ';
		load data inpath 'hdfs://bigdata00:9000/person' into table person;
		select * from person;
		數據如下:
			1	hgs	26
			2	wd	24
			3	zz	25
			4	cm	24
	3.在spark-sql console交互中會打印很多的INFO級別的信息,很煩人,解決辦法是
		在conf文件夾下:
		   mv log4j.properties.template  log4j.properties
			將log4j.rootCategory=INFO, console 修改為log4j.rootCategory=WARN, console

到此,關于“spark sql在scala中使用的方式有哪些”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女