本文介绍: UDF: 一对一UDAF: 多对一UDTF: 一对多spark sql原生python只能写udf, 借助pandas等第三方组件就可以写udf和udaf。

目录

一 . 开窗函数

二 . SparkSQL函数定义

        1. HIVE_SQL用户自定义函数

        2.Spark原生UDF

        3. pandasUDF

        4. pandasUDAF

三. Spark on HIVE

四.SparkSQL的执行流程


一 . 开窗函数

               分析函数 over(partition by xxx order by xxx [asc|desc] [rows between xxx and xxx])

        分析函数可以大致分成如下3类:
        1- 第一类: 聚合函数 sum() count() avg() max() min()
        2- 第二类: row_number() rank() dense_rank() ntile()
        3- 第三类: first_value() last_value() lead() lag()

排序 :

        row_number: 巧记: 1234   特点: 唯一且连续

        dense_rank: 巧记: 1223   特点: 并列且连续

        rank   : 巧记: 1224   特点: 并列不连续

二 . SparkSQL函数定义

        1. HIVE_SQL用户自定义函数

                UDF: 一对一

                UDAF: 多对一

                UDTF: 一对多

        spark sql原生python只能写udf, 借助pandas等第三方组件就可以写udf和udaf

        2.Spark原生UDF

                第一步:创建python函数

                第二步: 将python函数注册到Spark sql中

                                注册方式一: udf对象 = sparkSession.udf.register(参数1,参数2,参数3)

                                注册方式二(只能DSL):  udf对象 = F.udf(参数1,参数2)

                                注册方式三(只能DSL):  语法糖写法  @F.udf(returnType=返回值类型)  放置到对应Python的函数上面

                返回基本类型

                返回复杂类型

        3. pandasUDF

                Arrow是一种内存中的列式数据格式,提升跨语言数据传输速度,提升大数据分析项目的运行效率  

                基于Pandas的UDF函数来转换为Spark SQL的UDF函数进行使用。底层是基于Arrow框架来完成数据传输,允许向量化(可以充分利用计算机CPU性能)操作。

                Pandas的UDF函数其实本质上就是Python的函数,只不过函数的传入数据类型为Pandas的类型

        4. pandasUDAF

                 UDAF对自定义Python函数的要求: 输入数据的类型必须是Pandas中的Series对象,返回值类型必须是Python中的标量数据类型

三. Spark on HIVE

        spark on hive的目的就是替换hive中的hive server 2服务

启动 hadoop服务:

        start-all.sh

hive开启metastore服务命令:

        cd /export/server/hive/bin

        nohup ./hive –service metastore &

启动Spark-sql的命令:

        cd /export/server/spark

        ./spark-sql

        bin/spark-sql –master local  –executor-memory 512m –total-executor-cores 1 

启动hive的命令:

        cd /export/server/hive/bin

        ./hive

启动Spark 提供的分布式的执行引擎:spark的Thrift服务项命令

如果需要连接Hive,此时需要启动一个Spark的客户端(spark-sql、代码)才可以。这个客户端底层相当于启动服务项,用于连接Hive的metastore的服务

cd /export/server/spark/sbin

./start-thriftserver.sh
–hiveconf hive.server2.thrift.port=10000
–hiveconf hive.server2.thrift.bind.host=node1
–hiveconf spark.sql.warehouse.dir=hdfs://node1:8020/user/hive/warehouse
–master local[2]

 

四.SparkSQL的执行流程

 

1- 接收客户端提交过来的SQL/DSL代码,首先会校验SQL/DSL的语法是否正常。如果通过校验,根据SQL/DSL的执行顺序,生成未解析的逻辑计划,也叫做AST抽象语法树

2- 对于AST抽象语法树加入元数据信息,确定一共涉及到哪些字段、字段的数据类型是什么,以及涉及到的表的其他相关元数据信息。加入元数据信息以后,就得到了(未优化的)逻辑计划

3- 对(未优化的)逻辑计划执行优化操作,整个优化通过优化器来执行。在优化器匹配相对应的优化规则,实时具体的优化。SparkSQL底层提供了一两百中优化规则,得到优化的逻辑计划。例如: 谓词下推(断言下推)、列值裁剪
    3.1- 谓词下推: 也叫做断言下推。将数据过滤操作提前到数据扫描的时候执行,减少后续处理的数据量,提升效率。
    3.2- 列值裁剪: 将一张表中与数据分析不相关的字段不加载进来,只加载数据分析用到的字段。减少后续处理的数据量,提升效率。
    
4- 由于优化规则很多,导致会得到多个优化的逻辑计划。在转换成物理执行计划的过程中,会根据成本模型(对比每个计划运行的耗时、资源消耗等)得到最优的一个物理执行计划

5- 将物理执行计划通过code generation(代码生成器),转变成Spark RDD的代码

6- 最后就是将Spark RDD代码部署到集群上运行。后续过程与Spark内核调度中Job的调度流程完全一致。

原文地址:https://blog.csdn.net/m0_49956154/article/details/135495384

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。

如若转载,请注明出处:http://www.7code.cn/show_54468.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注