本文介绍: 自定义聚合函数类:计算年龄的平均值继承org.apache.spark.sql.expressions.Aggregator, 定义泛型IN : 输入的数据类型 LongBUF : 缓冲区的数据类型 Buff ->样例类OUT : 输出的数据类型 Long重写方法(6个)/*** 自定义聚合函数类:计算年龄的平均值* 1. 继承org.apache.spark.sql.expressions.Aggregator, 定义泛型* IN : 输入的数据类型 Long。
一、UDF(User-Defined-Function)
1、注册UDF
udf对象 = spark.udf.register( 参数1, 参数2, 参数3)
参数1:UDF名称,可用于SQL风格
//获取系统时间
val df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
private val startTimeID: UserDefinedFunction = spark.udf.register("STARTTIMEID", (s: String) => {
s + " time:"+df.format(new Date())
})
2、 DSL使用UDF函数
ruleFrame.select(startTimeID($"API_CODE"),$"API_ID").show()
3、SparkSQL使用UDF
ruleFrame.createOrReplaceTempView("AAAS_RULE_20220316")
spark.sql("select STARTTIMEID(API_CODE),API_CODE from AAAS_RULE_20220316").show()
二、UDAF 用户自定义聚合函数
1、自定义UDAF函数类–传入类型为样例类
继承org.apache.spark.sql.expressions.Aggregator, 定义泛型
IN : 输入的数据类型 Long
BUF : 缓冲区的数据类型 Buff ->样例类
OUT : 输出的数据类型 Long
重写方法(6个)
package Util
import bean.{UDAFAgeFunction, UserInfo}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders}
/**
* 自定义聚合函数类:计算年龄的平均值
* 1. 继承org.apache.spark.sql.expressions.Aggregator, 定义泛型
* IN : 输入的数据类型 Long
* BUF : 缓冲区的数据类型 Buff
* OUT : 输出的数据类型 Long
* 2. 重写方法(6个)
*/
class MyAvgUDAF extends Aggregator[UserInfo,UDAFAgeFunction,Long]{
// z & zero : 初始值或零值
// 缓冲区的初始化,需要更改入参:缓冲区UDAFAgeBuffer样例类,给个空值
override def zero: UDAFAgeFunction = {
UDAFAgeFunction(0L,0L)
}
//根据输入的数据更新缓冲区的数据,缓冲区的计算方式
//入参:缓冲区UDAFAgeBuffer,输入数据UserInfo
override def reduce(buffer: UDAFAgeFunction, userInfo: UserInfo): UDAFAgeFunction = {
buffer.ageSum=buffer.ageSum+userInfo.age
buffer.cnt=buffer.cnt+1
buffer
}
//合并缓冲区,sumAge+sumAge,cnt+cnt,入参是不同缓冲区的数据
override def merge(buffer1: UDAFAgeFunction, buffer2: UDAFAgeFunction): UDAFAgeFunction = {
UDAFAgeFunction(buffer1.ageSum+buffer2.ageSum,buffer1.cnt+buffer2.cnt)
}
//计算结果缓冲区合并后对缓冲区数据类型做计算,入参是缓冲区数据,返回值是计算结果,这里是平均年龄
override def finish(buffer: UDAFAgeFunction): Long = {
buffer.ageSum/buffer.cnt
}
// 缓冲区的编码操作,变更入参为缓冲区,如果是样例类就是Encoders.product,否则就是对应的scalaLonscalaInt等类型
override def bufferEncoder: Encoder[UDAFAgeFunction] = Encoders.product
// 输出的编码操作,入参为计算结果(Long),如果是样例类就是Encoders.product,否则就是对应的scalaLonscalaInt等类型
override def outputEncoder: Encoder[Long] = Encoders.scalaLong
}
2、自定义UDAF函数类–传入类型为数据类型Int等
package Util
import bean.UDAFAgeFunction
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders}
/**
* 自定义聚合函数类:计算年龄的平均值
* 1. 继承org.apache.spark.sql.expressions.Aggregator, 定义泛型
* IN : 输入的数据类型 Long
* BUF : 缓冲区的数据类型 Buff
* OUT : 输出的数据类型 Long
* 2. 重写方法(6个)
*/
class MyAvgUDAFNew extends Aggregator[Int,UDAFAgeFunction,Long]{
// z & zero : 初始值或零值
// 缓冲区的初始化,需要更改入参:缓冲区UDAFAgeBuffer样例类,给个空值
override def zero: UDAFAgeFunction = {
UDAFAgeFunction(0L,0L)
}
//根据输入的数据更新缓冲区的数据,缓冲区的计算方式
//入参:缓冲区UDAFAgeBuffer,输入数据UserInfo
override def reduce(buffer: UDAFAgeFunction, age: Int): UDAFAgeFunction = {
buffer.ageSum=buffer.ageSum+age
buffer.cnt=buffer.cnt+1
buffer
}
//合并缓冲区,sumAge+sumAge,cnt+cnt,入参是不同缓冲区的数据
override def merge(buffer1: UDAFAgeFunction, buffer2: UDAFAgeFunction): UDAFAgeFunction = {
UDAFAgeFunction(buffer1.ageSum+buffer2.ageSum,buffer1.cnt+buffer2.cnt)
}
//计算结果缓冲区合并后对缓冲区数据类型做计算,入参是缓冲区数据,返回值是计算结果,这里是平均年龄
override def finish(buffer: UDAFAgeFunction): Long = {
buffer.ageSum/buffer.cnt
}
// 缓冲区的编码操作,变更入参为缓冲区,如果是样例类就是Encoders.product,否则就是对应的scalaLonscalaInt等类型
override def bufferEncoder: Encoder[UDAFAgeFunction] = Encoders.product
// 输出的编码操作,入参为计算结果(Long),如果是样例类就是Encoders.product,否则就是对应的scalaLonscalaInt等类型
override def outputEncoder: Encoder[Long] = Encoders.scalaLong
}
3、二者使用中的区别
package SparkSQL.UDF
import Util.{Env, MyAvgUDAF, MyAvgUDAFNew}
import bean.UserInfo
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.{Dataset, SparkSession, TypedColumn, functions}
object UDAFDemo extends App with Env{
/*
* 自定义聚合函数
* 强类型聚合函数 Aggregator
* */
Logger.getLogger("org").setLevel(Level.ERROR)
//准备spark环境
private val sc: SparkContext = getSparkContext()
//准备SparkSession环境,SparkSession 是 Spark 最新的 SQL 查询起始点
private val spark: SparkSession = getSparkSession()
import spark.implicits._
//创建DataSet
private val userInfoRdd = sc.textFile("src/data/userInfo.txt").map(data=>UserInfo(
data.split(" ")(0),data.split(" ")(1).toInt)
)
private val userInfoSet: Dataset[UserInfo] = userInfoRdd.toDF().as[UserInfo]
userInfoSet.createOrReplaceTempView("userInfo")
println("############传参为有类型的UDAF###################")
//因为myAvgUDAFFunction传入的是样例类UserInfo,所以不需要处理select的字段,而是把整个函数当成列数据传入,直接查询结果
//不注册 将UDAF函数转换为查询的列对象
private val myAvgUDAFCol: TypedColumn[UserInfo, Long] = new MyAvgUDAF().toColumn
println(" DSl风格结果")
userInfoSet.select(myAvgUDAFCol).show
println(" SQl风格不支持查询列对象")
println("############传参为无类型的UDAF###################")
//替换UDAF的入参,UserInfo改为(Int)就可以通过select myAvgUDAFFunction(name,age) from table 查询
//注册UDAF,方式和UDF一致
private val myAvgUDAFNew = new MyAvgUDAFNew
private val myAvgAgeNew: UserDefinedFunction = spark.udf.register("MYAvgAgeNew", functions.udaf(myAvgUDAFNew))
println(" DSl风格结果")
userInfoSet.select(myAvgAgeNew($"age")).show()
println(" sql风格结果")
spark.sql("select MYAvgAgeNew(age) from userInfo").show()
sc.stop()
spark.stop()
}
原文地址:https://blog.csdn.net/qq_40607631/article/details/132670014
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_2035.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。