1300-黄同学

# Spark数据分析案例之求TopN

## 数据和目标

{“name”:“a”,“clazz”:1,“score”:80}
{“name”:“b”,“clazz”:1,“score”:78}
{“name”:“c”,“clazz”:1,“score”:95}
{“name”:“d”,“clazz”:2,“score”:74}
{“name”:“e”,“clazz”:2,“score”:92}
{“name”:“f”,“clazz”:3,“score”:99}
{“name”:“g”,“clazz”:3,“score”:99}
{“name”:“h”,“clazz”:3,“score”:45}
{“name”:“i”,“clazz”:3,“score”:55}
{“name”:“j”,“clazz”:3,“score”:78}

## 代码实现

import org.apache.spark.sql.SparkSession

object ScoreAnalysis {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().appName("ScoreAnalysis")
.master("local").getOrCreate()
val sc = sparkSession.sparkContext
sc.setLogLevel("WARN")

jsonDF.createOrReplaceTempView("student_score")

sparkSession.sql("select t.name,t.clazz,t.score,t.drp from/n" +
"(select name,clazz,score,/n" +
"dense_rank() over(partition by clazz order by score desc ) drp/n" +
"from student_score) t where t.drp<=2").show()

sc.stop()
sparkSession.close()
}
}


# Spark的udf函数

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.api.java.UDF1
import org.apache.spark.sql.types.StringType

object MySparkUDF {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().appName("MySparkUDF")
.master("local").getOrCreate()
val sc = sparkSession.sparkContext
sc.setLogLevel("WARN")

val textRDD = sc.textFile("file:///E://大数据视频//Spark//课件" +
"//3、spark第三天课上资料//资料//udf.txt")
import sparkSession.implicits._
val df = textRDD.toDF()
df.createOrReplaceTempView("line_tab")

// 将自定义udf函数注册到sparkSession
/*
源代码：
def register(name : scala.Predef.String,
f : org.apache.spark.sql.api.java.UDF1[_, _],
returnType : org.apache.spark.sql.types.DataType) : scala.Unit =
{ /* compiled code */ }
*/
sparkSession.udf.register("my_upper", new UDF1[String, String] {
override def call(t1: String): String = t1.toUpperCase
}, StringType)
// 使用自定义的udf函数
sparkSession.sql("select value,my_upper(value) upper from line_tab").show()

sc.stop()
sparkSession.close()
}
}


# Spark的udaf函数

{“name”:“Michael”,“salary”:3000}
{“name”:“Andy”,“salary”:4500}
{“name”:“Justin”,“salary”:3500}
{“name”:“Berta”,“salary”:4000}

Spark的自定义udaf函数，首先写一个继承自UserDefinedAggregateFunction的类来完成算法逻辑，然后在sprakSession中注册这个udaf函数类的实例即可。

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, LongType, StructField, StructType}

class MySparkUDAF  extends UserDefinedAggregateFunction{
//输入的数据类型的schema
override def inputSchema: StructType = {
StructType(StructField("input",LongType)::Nil)
}
//缓冲区数据类型schema，说白了就是转换之后的数据的schema
override def bufferSchema: StructType = {
StructType(StructField("sum",LongType)::StructField("total",LongType)::Nil)
}
//返回值的数据类型
override def dataType: DataType = {
DoubleType
}
//确定是否相同的输入会有相同的输出
override def deterministic: Boolean = {
true
}
//初始化内部数据结构,初始为类似(0L,0L)的形式
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0L
}
//更新数据内部结构
// getLong(0)意思是获取集合的第一个值并转为Long，getLong(1)意思是获取集合的第二个值并转为Long
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
//所有的金额相加
buffer(0) = buffer.getLong(0) + input.getLong(0)
//一共有多少条数据
buffer(1) = buffer.getLong(1) + 1
}
//来自不同分区的数据进行合并
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) =buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
//计算输出数据值
override def evaluate(buffer: Row): Any = {
buffer.getLong(0).toDouble / buffer.getLong(1)
}
}

object MySparkUDAF {
def main(args: Array[String]): Unit = {
val sparkSession=SparkSession.builder().appName("MySparkUDAF")
.master("local").getOrCreate()
val sc=sparkSession.sparkContext
sc.setLogLevel("WARN")

// 读取数据
"//3、spark第三天课上资料//资料//udaf.txt")
// 将df注册成临时表
jsonDF.createOrReplaceTempView("employee_table")

// 将自定义udaf函数注册到sparkSession
sparkSession.udf.register("my_avg",new MySparkUDAF)
//sparkSession.sql("select * from employee_table").show()
// 试运行自定义udaf函数
sparkSession.sql("select my_avg(salary) from employee_table").show()

sc.stop()
sparkSession.close()
}
}


Vieu3.3主题

Q Q 登 录