目录
一 . 开窗函数
分析函数 over(partition by xxx order by xxx [asc|desc] [rows between xxx and xxx])
分析函数可以大致分成如下3类:
1- 第一类: 聚合函数 sum() count() avg() max() min()
2- 第二类: row_number() rank() dense_rank() ntile()
3- 第三类: first_value() last_value() lead() lag()排序 :
row_number: 巧记: 1234 特点: 唯一且连续
dense_rank: 巧记: 1223 特点: 并列且连续
rank : 巧记: 1224 特点: 并列不连续
二 . SparkSQL函数定义
1. HIVE_SQL用户自定义函数
UDF: 一对一
UDAF: 多对一
UDTF: 一对多
spark sql原生python只能写udf, 借助pandas等第三方组件就可以写udf和udaf
2.Spark原生UDF
第一步:创建python函数
第二步: 将python函数注册到Spark sql中
注册方式一: udf对象 = sparkSession.udf.register(参数1,参数2,参数3)
注册方式二(只能DSL): udf对象 = F.udf(参数1,参数2)
注册方式三(只能DSL): 语法糖写法 @F.udf(returnType=返回值类型) 放置到对应Python的函数上面
返回基本类型
返回复杂类型
3. pandasUDF
Arrow是一种内存中的列式数据格式,提升跨语言数据传输速度,提升大数据分析项目的运行效率
基于Pandas的UDF函数来转换为Spark SQL的UDF函数进行使用。底层是基于Arrow框架来完成数据传输,允许向量化(可以充分利用计算机CPU性能)操作。
Pandas的UDF函数其实本质上就是Python的函数,只不过函数的传入数据类型为Pandas的类型
4. pandasUDAF
UDAF对自定义Python函数的要求: 输入数据的类型必须是Pandas中的Series对象,返回值类型必须是Python中的标量数据类型
三. Spark on HIVE
spark on hive的目的就是替换hive中的hive server 2服务
启动 hadoop服务:
start-all.sh
hive开启metastore服务命令:
cd /export/server/hive/bin
nohup ./hive –service metastore &
启动Spark-sql的命令:
cd /export/server/spark
./spark-sql
bin/spark-sql –master local –executor-memory 512m –total-executor-cores 1
启动hive的命令:
cd /export/server/hive/bin
./hive
启动Spark 提供的分布式的执行引擎:spark的Thrift服务项命令
如果需要连接Hive,此时需要启动一个Spark的客户端(spark-sql、代码)才可以。这个客户端底层相当于启动服务项,用于连接Hive的metastore的服务
cd /export/server/spark/sbin
./start-thriftserver.sh
–hiveconf hive.server2.thrift.port=10000
–hiveconf hive.server2.thrift.bind.host=node1
–hiveconf spark.sql.warehouse.dir=hdfs://node1:8020/user/hive/warehouse
–master local[2]
四.SparkSQL的执行流程
1- 接收客户端提交过来的SQL/DSL代码,首先会校验SQL/DSL的语法是否正常。如果通过校验,根据SQL/DSL的执行顺序,生成未解析的逻辑计划,也叫做AST抽象语法树
2- 对于AST抽象语法树加入元数据信息,确定一共涉及到哪些字段、字段的数据类型是什么,以及涉及到的表的其他相关元数据信息。加入元数据信息以后,就得到了(未优化的)逻辑计划
3- 对(未优化的)逻辑计划执行优化操作,整个优化通过优化器来执行。在优化器匹配相对应的优化规则,实时具体的优化。SparkSQL底层提供了一两百中优化规则,得到优化的逻辑计划。例如: 谓词下推(断言下推)、列值裁剪
3.1- 谓词下推: 也叫做断言下推。将数据过滤操作提前到数据扫描的时候执行,减少后续处理的数据量,提升效率。
3.2- 列值裁剪: 将一张表中与数据分析不相关的字段不加载进来,只加载数据分析用到的字段。减少后续处理的数据量,提升效率。
4- 由于优化规则很多,导致会得到多个优化的逻辑计划。在转换成物理执行计划的过程中,会根据成本模型(对比每个计划运行的耗时、资源消耗等)得到最优的一个物理执行计划
5- 将物理执行计划通过code generation(代码生成器),转变成Spark RDD的代码
6- 最后就是将Spark RDD代码部署到集群上运行。后续过程与Spark内核调度中Job的调度流程完全一致。
原文地址:https://blog.csdn.net/m0_49956154/article/details/135495384
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_54468.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!