溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spark-SQL的具體編程場景

發布時間:2020-06-06 03:32:22 來源:網絡 閱讀:673 作者:原生zzy 欄目:大數據

入門案例:

object SparkSqlTest {
    def main(args: Array[String]): Unit = {
        //屏蔽多余的日志
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.project-spark").setLevel(Level.WARN)
        //構建編程入口
        val conf: SparkConf = new SparkConf()
        conf.setAppName("SparkSqlTest")
            .setMaster("local[2]")
        val spark: SparkSession = SparkSession.builder().config(conf)
            .getOrCreate()

        /**
          * 注意在spark 2.0之后:
          * val sqlContext = new SQLContext(sparkContext)
          * val hiveContext = new HiveContext(sparkContext)
          * 主構造器被私有化,所以這里只能使用SparkSession對象創建
          */
        //創建sqlcontext對象
        val sqlContext: SQLContext = spark.sqlContext
        //加載數據為DataFrame,這里加載的是json數據
        //數據格式:{name:'',age:18}
        val perDF: DataFrame = sqlContext.read.json("hdfs://zzy/data/person.json")

        //查看二維表結構
        perDF.printSchema()

        //查看數據,默認顯示20條記錄
        perDF.show()

        //復雜查詢
        perDF.select("name").show() //指定字段進行查詢
        perDF.select(new Column("name"),new Column("age").>(18)).show()  //指定查詢條件進行查詢
        perDF.select("name","age").where(new Column("age").>(18)).show() //指定查詢條件進行查詢
        perDF.select("age").groupBy("age").avg("age") //聚合操作
    }
}

如果對入門案例不太了解的話,接下來分步驟的介紹:

(1)RDD/DataSet//DataFrame/list 之間的轉化

   通過RDD轉換為DataFrame/DataSet,有兩種方式:
    - 通過反射的方式將RDD或者外部的集合轉化為dataframe/datasets
    - 要通過編程動態的來將外部的集合或者RDD轉化為dataframe或者dataset
   注意:如果是dataFrame對應的是java bean ,如果是dataSet對應的是case class

通過反射的方式將RDD或者外部的集合轉化為dataframe/datasets

數據準備

case class Student(name:String, birthday:String, province:String)
val stuList = List(
      new Student("委xx", "1998-11-11", "山西"),
      new Student("吳xx", "1999-06-08", "河南"),
      new Student("戚xx", "2000-03-08", "山東"),
      new Student("王xx", "1997-07-09", "安徽"),
      new Student("薛xx", "2002-08-09", "遼寧")
    )

list --> DataFrame:

        //屏蔽多余的日志
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.project-spark").setLevel(Level.WARN)
        //構建編程入口
        val conf: SparkConf = new SparkConf()
        conf.setAppName("SparkSqlTest")
            .setMaster("local[2]")
            .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
            .registerKryoClasses(Array(classOf[Student]))
        val spark: SparkSession = SparkSession.builder().config(conf)
            .getOrCreate()

        //創建sqlcontext對象
        val sqlContext: SQLContext = spark.sqlContext

        /**
          * list--->DataFrame
          * 將scala集合轉換為java集合
          */
        val javaList: util.List[Student] = JavaConversions.seqAsJavaList(stuList)
        val stuDF: DataFrame = sqlContext.createDataFrame(javaList,classOf[Student])
        val count = stuDF.count()
        println(count)

RDD --> DataFrame:

        //屏蔽多余的日志
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.project-spark").setLevel(Level.WARN)
        //構建編程入口
        val conf: SparkConf = new SparkConf()
        conf.setAppName("SparkSqlTest")
            .setMaster("local[2]")
            .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
            .registerKryoClasses(Array(classOf[Student]))
        val spark: SparkSession = SparkSession.builder().config(conf)
            .getOrCreate()

        //創建sqlcontext對象
        val sqlContext: SQLContext = spark.sqlContext
        //創建sparkContext
        val sc: SparkContext = spark.sparkContext
        /**
          * RDD--->DataFrame
          */
        val stuRDD: RDD[Student] = sc.makeRDD(stuList)
        val stuDF: DataFrame = sqlContext.createDataFrame(stuRDD,classOf[Student])
        val count = stuDF.count()
        println(count)

list --> DataSet:

        //屏蔽多余的日志
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.project-spark").setLevel(Level.WARN)
        //構建編程入口
        val conf: SparkConf = new SparkConf()
        conf.setAppName("SparkSqlTest")
            .setMaster("local[2]")
            .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
            .registerKryoClasses(Array(classOf[Student]))
        val spark: SparkSession = SparkSession.builder().config(conf)
            .getOrCreate()

        //創建sqlcontext對象
        val sqlContext: SQLContext = spark.sqlContext
        //創建sparkContext
        val sc: SparkContext = spark.sparkContext
        /**
          * list--->DataSet
          */
        //如果創建Dataset 必須導入下面的隱式轉換
        import spark.implicits._
        val stuDF: Dataset[Student] = sqlContext.createDataset(stuList)
        stuDF.createTempView("student")
        //使用完整的sql語句進行查詢,使用反射的方式,只有Dataset可以,dataFrame不行
        val sql=
            """
              |select * from student
            """.stripMargin
        spark.sql(sql).show()

Spark-SQL的具體編程場景
RDD --> DataSet:

        //屏蔽多余的日志
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.project-spark").setLevel(Level.WARN)
        //構建編程入口
        val conf: SparkConf = new SparkConf()
        conf.setAppName("SparkSqlTest")
            .setMaster("local[2]")
            .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
            .registerKryoClasses(Array(classOf[Student]))
        val spark: SparkSession = SparkSession.builder().config(conf)
            .getOrCreate()

        //創建sqlcontext對象
        val sqlContext: SQLContext = spark.sqlContext
        //創建sparkContext
        val sc: SparkContext = spark.sparkContext
        /**
          * RDD--->DataSet
          */
        //如果創建Dataset 必須導入下面的隱式轉換
        import spark.implicits._
        val stuRDD: RDD[Student] = sc.makeRDD(stuList)
        val stuDF: Dataset[Student] = sqlContext.createDataset(stuRDD)
        stuDF.createTempView("student")
        //使用完整的sql語句進行查詢,使用反射的方式,只有Dataset可以,dataFrame不行
        val sql=
            """
              |select * from student
            """.stripMargin
        spark.sql(sql).show()
通過編程動態的來將外部的集合或者RDD轉化為dataframe或者dataset

list --> DataFrame:

        //屏蔽多余的日志
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.project-spark").setLevel(Level.WARN)
        //構建編程入口
        val conf: SparkConf = new SparkConf()
        conf.setAppName("SparkSqlTest")
            .setMaster("local[2]")
            .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
            .registerKryoClasses(Array(classOf[Student]))
        val spark: SparkSession = SparkSession.builder().config(conf)
            .getOrCreate()

        //創建sqlcontext對象
        val sqlContext: SQLContext = spark.sqlContext
        //創建sparkContext
        val sc: SparkContext = spark.sparkContext

        //list-DataFrame

        //1.將list中的元素全部轉化為Row
        val RowList: List[Row] = stuList.map(item => {
            Row(item.name, item.birthday, item.province)
        })
        //2.構建元數據
        val schema=StructType(List(
            StructField("name",DataTypes.StringType),
            StructField("birthday",DataTypes.StringType),
            StructField("province",DataTypes.StringType)
        ))
        //將scala的集合轉化為java集合
        val javaList = JavaConversions.seqAsJavaList(RowList)
        val stuDF = spark.createDataFrame(javaList,schema)
        stuDF.createTempView("student")
        //使用完整的sql語句進行查詢,使用動態編程的方式,Dataset、dataFrame都可以
        val sql=
            """
              |select * from student
            """.stripMargin
        spark.sql(sql).show()

RDD--> DataFrame:

        //屏蔽多余的日志
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.project-spark").setLevel(Level.WARN)
        //構建編程入口
        val conf: SparkConf = new SparkConf()
        conf.setAppName("SparkSqlTest")
            .setMaster("local[2]")
            .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
            .registerKryoClasses(Array(classOf[Student]))
        val spark: SparkSession = SparkSession.builder().config(conf)
            .getOrCreate()

        //創建sqlcontext對象
        val sqlContext: SQLContext = spark.sqlContext
        //創建sparkContext
        val sc: SparkContext = spark.sparkContext

        //RDD-DataFrame

        //將RDD中的元素轉換為Row
        val RowRDD: RDD[Row] = sc.makeRDD(stuList).map(item => {
            Row(item.name, item.birthday, item.province)
        })

        //2.構建元數據
        val schema=StructType(List(
            StructField("name",DataTypes.StringType),
            StructField("birthday",DataTypes.StringType),
            StructField("province",DataTypes.StringType)
        ))
        val stuDF = spark.createDataFrame(RowRDD,schema)
        stuDF.createTempView("student")
        //使用完整的sql語句進行查詢,使用動態編程的方式,Dataset、dataFrame都可以
        val sql=
            """
              |select * from student
            """.stripMargin
        spark.sql(sql).show()

由于構建DataFrame和構建DataSet一模一樣,這里就不在演示

(2)spark SQL加載數據的方式

        //屏蔽多余的日志
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.project-spark").setLevel(Level.WARN)
        //構建編程入口
        val conf: SparkConf = new SparkConf()
        conf.setAppName("SparkSqlTest")
                .setMaster("local[2]")

        val spark: SparkSession = SparkSession.builder().config(conf)
                .getOrCreate()

        //創建sqlcontext對象
        val sqlContext: SQLContext = spark.sqlContext
        //創建sparkContext
        val sc: SparkContext = spark.sparkContext

        //早期版本加載:parquet文件
        sqlContext.load("hdfs://zzy/hello.parquet")
        //加載json數據
        sqlContext.read.json("hdfs://zzy/hello.json")
        //加載普通文件
        sqlContext.read.text("hdfs://zzy/hello.txt")
        //加載csv
        sqlContext.read.csv("hdfs://zy/hello.csv")
        //讀取jdbc的數據
        val url="jdbc:mysql://localhost:3306/hello"
        val properties=new Properties()
        properties.setProperty("user","root")
        properties.setProperty("password","123456")
        val tableName="book"
        sqlContext.read.jdbc(url,tableName,properties)

(3)spark SQL數據落地的方式

        //屏蔽多余的日志
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.project-spark").setLevel(Level.WARN)
        //構建編程入口
        val conf: SparkConf = new SparkConf()
        conf.setAppName("SparkSqlTest")
                .setMaster("local[2]")

        val spark: SparkSession = SparkSession.builder().config(conf)
                .getOrCreate()

        //創建sqlcontext對象
        val sqlContext: SQLContext = spark.sqlContext
        //創建sparkContext
        val sc: SparkContext = spark.sparkContext
        val testFD: DataFrame = sqlContext.read.text("hdfs://zzy/hello.txt")

        //寫入到普通文件
        testFD.write.format("json") //以什么格式寫入
                .mode(SaveMode.Append)  //寫入方式
                .save("hdfs://zzy/hello.json")  //寫入的文件位置

        //寫入到數據庫
        val url="jdbc:mysql://localhost:3306/hello"
        val table_name="book"
        val prots=new Properties()
        prots.put("user","root")
        prots.put("password","123456")
        testFD.write.mode(SaveMode.Append).jdbc(url,table_name,prots)
向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女