這篇文章主要介紹“spark sql在scala中使用的方式有哪些”,在日常操作中,相信很多人在spark sql在scala中使用的方式有哪些問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”spark sql在scala中使用的方式有哪些”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
package hgs.spark.sql
import org.apache.spark.SparkConf
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLImplicits
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.Row
//第一種方法創建dataframe
object SqlTest1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("sqltest1").setMaster("local")
val context = new SparkContext(conf)
val sqlContext = new SQLContext(context)
val rdd = context.textFile("d:\\person",1)
val rdd2 = rdd.map(x=>{val t = x.split(" ");person(t(0).toInt,t(1),t(2).toInt)})
//第一種方法創建dataframe,在這里需要導入隱式轉換
import sqlContext.implicits._
val persondf = rdd2.toDF()
//這個方法在2.1.0里面被廢除
//persondf.registerTempTable("person")
//使用該函數代替
persondf.createOrReplaceTempView("person")
val result = sqlContext.sql("select * from person order by age desc")
//打印查詢的結果
result.show()
//或者將結果保存到文件
result.write.json("d://personselect")
context.stop()
}
}
case class person(id:Int,name:String,age:Int)
//第二種方法創建dataframe
//3.1.2. 通過StructType直接指定Schema
object SqlTest2{
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("sqltest2").setMaster("local")
val context = new SparkContext(conf)
val sqlContext = new SQLContext(context)
val rdd = context.textFile("d:\\person",1)
//第一種方法創建dataframe,在這里需要導入隱式轉換
//創建schema,即一個映射關系
val personShcema = StructType(
List(
//下面為一個列的描述,分別為 列名,數據類型,是否為空
StructField("id",IntegerType,true),
StructField("name",StringType,true),
StructField("age",IntegerType,true)
)
)
val rdd2 = rdd.map(x=>{val t = x.split(" ");Row(t(0).toInt,t(1),t(2).toInt)})
//通過這種方式創建dataframe
val persondf = sqlContext.createDataFrame(rdd2, personShcema)
//將dataframe映射為一個臨時的表
persondf.createOrReplaceTempView("person")
//查詢數據展示
sqlContext.sql("select * from person order by age desc").show()
context.stop()
/* 查詢出的數據
* +---+----+---+
| id|name|age|
+---+----+---+
| 1| hgs| 26|
| 3| zz| 25|
| 2| wd| 24|
| 4| cm| 24|
+---+----+---+
*/
}
}一些筆記:
checkpoint:
將rdd中間過程持久化到hdfs上面,如果某個rdd失敗,則從hdfs回復,這樣代價較小
sc.setCheckpointDir("hdfs dir or other fs dir "),建議將RDD cache之后再
checkpoin這樣將減少一次運算直接從內存中將RDD進行checkpoin
但是這樣之前依賴的RDD也會被丟棄
RDD Objects構建DAG--->DAGScheduler(TaskSet(每個Task在每個excutor上&&切分stage,并提價stage))
------>TaskScheduler(Task&&提交task,)------>Worker (執行task)
stage:根據依賴關系區分stage,當遇到一個寬依賴(節點之間交換數據)的時候劃分一個stage
其中窄依賴:父RDD的分區數據只傳向一個子RDD分區,而寬依賴則是父RDD的分區數據會傳向多個子RDD的或者多個分區
spark SQL:處理結構化的數據
DataFrames:與RDD類似,DataFrame也是一個分布式數據容器。然而DataFrame更像傳統數據庫的二維表格,
除了數據以外,還記錄數據的結構信息,即schema。同時,與Hive類似,DataFrame也支持
嵌套數據類型(struct、array和map)。從API易用性的角度上 看,DataFrame API提供的是一套高層
的關系操作,比函數式的RDD API要更加友好,門檻更低。由于與R和Pandas的DataFrame類似,
Spark DataFrame很好地繼承了傳統單機數據分析的開發體驗
創建DataFrame: 將數據映射為class,RDD.toDF
通過sql查詢,將df注冊為一個表1. df.registerTempTable("test") sqlContext.sql("select * from test").show
2.通過StructType定義:StrutType(List())
hive 3.0.0 與spark
1.將hive-site.xml hdfs-site.xml core-site.xml復制到spark的conf文件夾下 ,將mysql驅動放到spark的jars文件夾下面
2.在hive中的語句在spark-sql中完全適用:
create table person(id int,name string,age int) row format delimited fields terminated by ' ';
load data inpath 'hdfs://bigdata00:9000/person' into table person;
select * from person;
數據如下:
1 hgs 26
2 wd 24
3 zz 25
4 cm 24
3.在spark-sql console交互中會打印很多的INFO級別的信息,很煩人,解決辦法是
在conf文件夾下:
mv log4j.properties.template log4j.properties
將log4j.rootCategory=INFO, console 修改為log4j.rootCategory=WARN, console到此,關于“spark sql在scala中使用的方式有哪些”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。