Spark(第六节—2)Spark数据分析案例之求TopN,Spark的udf与udaf函数

1300-黄同学

发表文章数:85

热门标签

首页 » 大数据 » 正文

Spark数据分析案例之求TopN

思路

将数据读成DataFrame,并将DataFrame映射成临时表,然后用sparkSession.sql的方式用sql语句来求TopN。

数据和目标

数据是如下的json数据,可以直接读成DataFrame,求每个clazz中score的前两名

{“name”:“a”,“clazz”:1,“score”:80}
{“name”:“b”,“clazz”:1,“score”:78}
{“name”:“c”,“clazz”:1,“score”:95}
{“name”:“d”,“clazz”:2,“score”:74}
{“name”:“e”,“clazz”:2,“score”:92}
{“name”:“f”,“clazz”:3,“score”:99}
{“name”:“g”,“clazz”:3,“score”:99}
{“name”:“h”,“clazz”:3,“score”:45}
{“name”:“i”,“clazz”:3,“score”:55}
{“name”:“j”,“clazz”:3,“score”:78}

代码实现

import org.apache.spark.sql.SparkSession

object ScoreAnalysis {
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder().appName("ScoreAnalysis")
      .master("local").getOrCreate()
    val sc = sparkSession.sparkContext
    sc.setLogLevel("WARN")

    val jsonDF = sparkSession.read.json("score.txt")
    jsonDF.createOrReplaceTempView("student_score")

    sparkSession.sql("select t.name,t.clazz,t.score,t.drp from/n" +
      "(select name,clazz,score,/n" +
      "dense_rank() over(partition by clazz order by score desc ) drp/n" +
      "from student_score) t where t.drp<=2").show()

    sc.stop()
    sparkSession.close()
  }
}

Spark的udf函数

实现自定义一个udf将小写字母转为大写

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.api.java.UDF1
import org.apache.spark.sql.types.StringType

object MySparkUDF {
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder().appName("MySparkUDF")
      .master("local").getOrCreate()
    val sc = sparkSession.sparkContext
    sc.setLogLevel("WARN")

    val textRDD = sc.textFile("file:///E://大数据视频//Spark//课件" +
      "//3、spark第三天课上资料//资料//udf.txt")
    import sparkSession.implicits._
    val df = textRDD.toDF()
    df.createOrReplaceTempView("line_tab")

    // 将自定义udf函数注册到sparkSession
    /*
    源代码:
    def register(name : scala.Predef.String,
    f : org.apache.spark.sql.api.java.UDF1[_, _],
    returnType : org.apache.spark.sql.types.DataType) : scala.Unit =
    { /* compiled code */ }
     */
    sparkSession.udf.register("my_upper", new UDF1[String, String] {
      override def call(t1: String): String = t1.toUpperCase
    }, StringType)
    // 使用自定义的udf函数
    sparkSession.sql("select value,my_upper(value) upper from line_tab").show()

    sc.stop()
    sparkSession.close()
  }
}

Spark的udaf函数

实现自定义一个udaf函数来求平均值。
数据如下,是json数据,读成DataFrame,求salary的平均值:

{“name”:“Michael”,“salary”:3000}
{“name”:“Andy”,“salary”:4500}
{“name”:“Justin”,“salary”:3500}
{“name”:“Berta”,“salary”:4000}

Spark的自定义udaf函数,首先写一个继承自UserDefinedAggregateFunction的类来完成算法逻辑,然后在sprakSession中注册这个udaf函数类的实例即可。

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, LongType, StructField, StructType}

class MySparkUDAF  extends UserDefinedAggregateFunction{
  //输入的数据类型的schema
  override def inputSchema: StructType = {
    StructType(StructField("input",LongType)::Nil)
  }
  //缓冲区数据类型schema,说白了就是转换之后的数据的schema
  override def bufferSchema: StructType = {
    StructType(StructField("sum",LongType)::StructField("total",LongType)::Nil)
  }
  //返回值的数据类型
  override def dataType: DataType = {
    DoubleType
  }
  //确定是否相同的输入会有相同的输出
  override def deterministic: Boolean = {
    true
  }
  //初始化内部数据结构,初始为类似(0L,0L)的形式
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0L
    buffer(1) = 0L
  }
  //更新数据内部结构
  // getLong(0)意思是获取集合的第一个值并转为Long,getLong(1)意思是获取集合的第二个值并转为Long
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    //所有的金额相加
    buffer(0) = buffer.getLong(0) + input.getLong(0)
    //一共有多少条数据
    buffer(1) = buffer.getLong(1) + 1
  }
  //来自不同分区的数据进行合并
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) =buffer1.getLong(0) + buffer2.getLong(0)
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  }
  //计算输出数据值
  override def evaluate(buffer: Row): Any = {
    buffer.getLong(0).toDouble / buffer.getLong(1)
  }
}


object MySparkUDAF {
  def main(args: Array[String]): Unit = {
    val sparkSession=SparkSession.builder().appName("MySparkUDAF")
      .master("local").getOrCreate()
    val sc=sparkSession.sparkContext
    sc.setLogLevel("WARN")

    // 读取数据
    val jsonDF=sparkSession.read.json("file:///E://大数据视频//Spark//课件" +
      "//3、spark第三天课上资料//资料//udaf.txt")
    // 将df注册成临时表
    jsonDF.createOrReplaceTempView("employee_table")

    // 将自定义udaf函数注册到sparkSession
    sparkSession.udf.register("my_avg",new MySparkUDAF)
    //sparkSession.sql("select * from employee_table").show()
    // 试运行自定义udaf函数
    sparkSession.sql("select my_avg(salary) from employee_table").show()

    sc.stop()
    sparkSession.close()
  }
}
标签:

未经允许不得转载:作者:1300-黄同学, 转载或复制请以 超链接形式 并注明出处 拜师资源博客
原文地址:《Spark(第六节—2)Spark数据分析案例之求TopN,Spark的udf与udaf函数》 发布于2021-02-20

分享到:
赞(0) 打赏

评论 抢沙发

评论前必须登录!

  注册



长按图片转发给朋友

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏

Vieu3.3主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。

登录

忘记密码 ?

您也可以使用第三方帐号快捷登录

Q Q 登 录
微 博 登 录