sparkSQL

1227-李同学

发表文章数:84

热门标签

首页 » 大数据 » 正文

Spark SQL概述

Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。
相比于Spark RDD API,Spark SQL包含了对结构化数据和在其上运算的更多信息,Spark SQL使用这些信息进行了额外的优化,使对结构化数据的操作更加高效和方便。

特点

  • 易整合

  • 统一的数据访问

  • 兼容hive

  • 标准的数据连接

RDD以及DataFrame以及DataSet

RDD基本介绍

  • RDD是一个懒执行的不可变的可以支持Lambda表达式的并行数据集合。
  • RDD的最大好处就是简单,API的人性化程度很高。
  • RDD的劣势是性能限制,它是一个JVM驻内存对象,这也就决定了存在GC的限制和数据增加时Java序列化成本的升高。

Dataframe基本概述

与RDD类似,DataFrame也是一个分布式数据容器。然而DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema。同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。从API易用性的角度上看,DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。由于与R和Pandas的DataFrame类似,Spark DataFrame很好地继承了传统单机数据分析的开发体验。
DataFrame也是懒执行的。
性能上比RDD要高,主要有两方面原因:

  • 定制化内存管理
    数据以二进制的方式存在于非堆内存,节省了大量空间之外,还摆脱了GC的限制。

sparkSQL

  • 优化的执行计划
    查询计划通过Spark catalyst optimiser进行优化.
    sparkSQL
    Dataframe的劣势在于在编译期缺少类型安全检查,导致运行时出错.

Dataset基本概述

  1. 是Dataframe API的一个扩展,是Spark最新的数据抽象
  2. 用户友好的API风格,既具有类型安全检查也具有Dataframe的查询优化特性。
  3. Dataset支持编解码器,当需要访问非堆上的数据时可以避免反序列化整个对象,提高了效率。
  4. 样例类被用来在Dataset中定义数据的结构信息,样例类中每个属性的名称直接映射到DataSet中的字段名称。
  5. Dataframe是Dataset的特列,DataFrame=Dataset[Row] ,所以可以通过as方法将Dataframe转换为Dataset。Row是一个类型,跟Car、Person这些的类型一样,所有的表结构信息我都用Row来表示。
  6. DataSet是强类型的。比如可以有Dataset[Car],Dataset[Person].

三者的共性

1、RDD、DataFrame、Dataset全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利
2、三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算,极端情况下,如果代码里面有创建、转换,但是后面没有在Action中使用对应的结果,在执行时会被直接跳过.

val sparkconf = new SparkConf().setMaster("local").setAppName("test").set("spark.port.maxRetries","1000")
val spark = SparkSession.builder().config(sparkconf).getOrCreate()
val rdd=spark.sparkContext.parallelize(Seq(("a", 1), ("b", 1), ("a", 1)))
// map不运行
rdd.map{line=>
  println("运行")
  line._1
}

3、三者都会根据spark的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出
4、三者都有partition的概念
5、三者有许多共同的函数,如filter,排序等
6、在对DataFrame和Dataset进行操作许多操作都需要这个包进行支持

import spark.implicits._
7、DataFrame和Dataset均可使用模式匹配获取各个字段的值和类型

DataFrame:

testDF.map{
      case Row(col1:String,col2:Int)=>
        println(col1);println(col2)
        col1
      case _=>
        ""
    }

Dataset:

case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
    testDS.map{
      case Coltest(col1:String,col2:Int)=>
        println(col1);println(col2)
        col1
      case _=>
        ""
    }

三者的区别

RDD:

  1. RDD一般和spark mlib同时使用
  2. RDD不支持sparksql操作

DataFrame:

  1. 与RDD和Dataset不同,DataFrame每一行的类型固定为Row,只有通过解析才能获取各个字段的值,如
testDF.foreach{
  line =>
    val col1=line.getAs[String]("col1")
    val col2=line.getAs[String]("col2")
}

每一列的值没法直接访问

  1. DataFrame与Dataset一般不与spark ml同时使用
  2. DataFrame与Dataset均支持sparksql的操作,比如select,groupby之类,还能注册临时表/视窗,进行sql语句操作,如
dataDF.createOrReplaceTempView("tmp")
spark.sql("select  ROW,DATE from tmp where DATE is not null order by DATE").show(100,false)
  1. DataFrame与Dataset支持一些特别方便的保存方式,比如保存成csv,可以带上表头,这样每一列的字段名一目了然
//保存
val saveoptions = Map("header" -> "true", "delimiter" -> "/t", "path" -> "hdfs://node01:8020/test")
datawDF.write.format("com.atguigu.spark.csv").mode(SaveMode.Overwrite).options(saveoptions).save()
//读取
val options = Map("header" -> "true", "delimiter" -> "/t", "path" -> "hdfs://node01:8020/test")
val datarDF= spark.read.options(options).format("com.atguigu.spark.csv").load()

利用这样的保存方式,可以方便的获得字段名和列的对应,而且分隔符(delimiter)可以自由指定。

Dataset:

Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同。
DataFrame也可以叫Dataset[Row],每一行的类型是Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的getAS方法或者共性中的第七条提到的模式匹配拿出特定字段
而Dataset中,每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获得每一行的信息

case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
/**
 rdd
 ("a", 1)
 ("b", 1)
 ("a", 1)
**/
val test: Dataset[Coltest]=rdd.map{line=>
      Coltest(line._1,line._2)
    }.toDS
test.map{
      line=>
        println(line.col1)
        println(line.col2)
    }

可以看出,Dataset在需要访问列中的某个字段时是非常方便的,然而,如果要写一些适配性很强的函数时,如果使用Dataset,行的类型又不确定,可能是各种case class,无法实现适配,这时候用DataFrame即Dataset[Row]就能比较好的解决问题

DataFrame常用操作

DSL风格语法

DataFrame提供了一个领域特定语言(DSL)来操作结构化数据。
下面是一些使用示例

  1. 查看DataFrame当中的数据
    查看DataFrame中的内容,通过调用show方法
scala> personDF.show
  1. 查看DataFram当中部分字段的数据
  • 第一种方式查看name字段数据
    查看name字段的数据
scala> personDF.select(personDF.col("name")).show
  • 第二种方式查看name字段数据
    查看name字段的另一种写法
scala> personDF.select("name").show
  • 第三种方式查看name和age字段数据
    查看 name 和age字段数据
scala> personDF.select(col("name"),col("age")).show
  • 第四种方式查看字段数据
    查看name和age的数据字段
    通过

    s

    c

    a

    l

    a

    使

    来进行scala当中字符串的引用,可以使用

    scala使来进行字段操作

scala> personDF.select($"name",$"age").show
  1. 打印DataFrame的Schema信息
personDF.printSchema
  1. 查询所有的name和age,并将age+1
personDF.select(col("id"), col("name"), col("age") + 1).show
personDF.select(personDF("id"), personDF("name"), personDF("age") + 1).show
  1. 过滤age大于等于25的,使用filter方法过滤
personDF.filter(col("age") >= 25).show
  1. 统计年龄大于30的人数
personDF.filter(col("age")>30).count()
  1. 按年龄进行分组并统计相同年龄的人数
personDF.groupBy("age").count().show

SQL风格语法

DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中使用spark.sql() 来执行SQL语句查询,结果返回一个DataFrame。
如果想使用SQL风格的语法,需要将DataFrame注册成表,采用如下的方式:

  1. 将DataFrame注册成表
scala> personDF.registerTempTable("t_person")
warning: there was one deprecation warning; re-run with -deprecation for details
  1. 查询年龄最大的前两名
scala>  spark.sql("select * from t_person order by age desc limit 2 ").show
  1. 显示表的Schema信息
scala> spark.sql("desc t_person").show
  1. 查询年龄大于30的人的信息
scala>  spark.sql("select * from t_person where age > 30").show

标签:

未经允许不得转载:作者:1227-李同学, 转载或复制请以 超链接形式 并注明出处 拜师资源博客
原文地址:《sparkSQL》 发布于2020-11-11

分享到:
赞(0) 打赏

评论 抢沙发

评论前必须登录!

  注册



长按图片转发给朋友

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏

Vieu3.3主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。

登录

忘记密码 ?

您也可以使用第三方帐号快捷登录

Q Q 登 录
微 博 登 录