本文介绍: 【代码】pyspark学习_dataframe常用操作_02。
#回顾01常用操作
from pyspark import SparkSession,DataFrame
spark = SparkSession.builder.getOrCreate()
peopleDF = spark.read.json("people.json")
peopleDF.printSchema()#显示DataFrame的模式信息
peopleDF.show()#显示DataFrame的数据信息
peopleDF.foreach(print)#foreach方法参数为function,如print,每条数据输入function进行结果输出
def test_foreach(df:DataFrame):
if df['age']>20:
print(df)
else:
pass
peopleDF.foreach(test_foreach)
#以下操作返回新的dataframe
peopleDF.select(peopleDF['name'])#查询DataFrame部分列
peopleDF.filter(peopleDF['age']>30)#按条件筛选数据
peopleDF.groupBy(peopleDF['age']).count()#按列对数据进行统计
peopleDF.sort(peopleDF['age'].asc(),peopleDF['name'].desc())#按age升序,如果一致,按name降序
peopleDF.select(peopleDF['name'].alias('username'))#修改列名
peopleDF.withColumn('test_col',peopleDF['age'])#新增列
peopleDF.replace('Bob','tom',subset=['name'])#替换信息,如果name列字段为Bob,替换为tom
2.11 withCoulumnRenamed
#withColumnRenamed:修改现有列名名称
"""
para1:existing:str 现有列名str类型
para2:new:str 修改后列名str类型
return:dataframe 返回dataframe
"""
peopleDF.withColumnRenamed("age","age_new").show()
输出如下:
+-------+----+
|age_new|name|
+-------+----+
| 12| tom|
| 22|jack|
| 33| Bob|
+-------+----+
2.12 join
#join:关联,类似于sql里面的join函数
"""
para1:other:DataFrame 另一个DataFrame
para2:on: 类似于sql中的on,两个dataframe的关联字段
how: left/right/inner/full
"""
peopleDF1 = peopleDF
peopleDF.join(peopleDF1,on = (peopleDF1['age']==peopleDF['age']) & (peopleDF1['name'] == peopleDF['name']),how='inner').show()
输出如下:
+---+----+---+----+
|age|name|age|name|
+---+----+---+----+
| 12| tom| 12| tom|
| 22|jack| 22|jack|
| 33| Bob| 33| Bob|
+---+----+---+----+
2.13 count
#count:计算数量--->返回int
peopleDF.count() #返回dataframe的数量
输出如下:
3
2.14 drop
#drop:删除dataframe的列,一次删除一列
"""
para:列名 str
return:dataframe
"""
peopleDF.drop('name').show()
输出如下:
+---+
|age|
+---+
| 12|
| 22|
| 33|
+---+
2.15 take
#take:获取dataframe的前N行数据
"""
para1:num int类型
return list(Row)
"""
print(peopleDF.take(2)) #获取peopleDF前2行数据
输出如下:
[Row(age=12, name='tom'), Row(age=22, name='jack')]
解析Row数据
people_list = peopleDF.take(2)
for people in people_list:
print(people['age'])
2.16 distinct
#distinct:输出dataframe不同的行数据 返回新的dataframe
peopleDF.distinct().show()
输出如下:
+---+----+
|age|name|
+---+----+
| 33| Bob|
| 12| tom|
| 22|jack|
+---+----+
2.17 union
#union:上下拼接两个不同的dataframe,要求两个dataframe有相同的列数
"""
para1: other:DataFrame 其他的DataFrame
return: DataFrame
"""
peopleDF.union(peopleDF).show()
+---+----+
|age|name|
+---+----+
| 12| tom|
| 22|jack|
| 33| Bob|
| 12| tom|
| 22|jack|
| 33| Bob|
+---+----+
2.18 first
#first:获取dataframe中第一个数据
print(peopleDF.first()) #返回Row
输出如下:
Row(age=12, name='tom')
2.19 createOrReplaceTempView
#createOrReplaceTempView:注册临时表
"""
para:name str类型,注册后表名
return:None
"""
peopleDF.createOrReplaceTempView("people")
result = spark.sql("select * from people")
result.show()
输出如下:
+---+----+
|age|name|
+---+----+
| 12| tom|
| 22|jack|
| 33| Bob|
+---+----+
2.20 repartition
#repartition:返回由给定分区表达式分区的新的DataFrame
"""
para1:numPartitions int:指定分区的数量,如果未指定,则默认分区数
para2:cols str或者Column 指定分区列
return:dataframe
"""
print('修改分区前:{}'.format(peopleDF.rdd.getNumPartitions())) # 修改分区前分区数
peopleDF_new = peopleDF.repartition(3, 'name')
print('修改分区后:{}'.format(peopleDF_new.rdd.getNumPartitions())) # 修改分区后分区数
输出如下:
修改分区前:1
修改分区后:3
2.21 rdd
#rdd:将dataframe类型数据转为RDD类型数据
peopleRDD = peopleDF.rdd
print(type(peopleRDD))
输出如下:
<class 'pyspark.rdd.RDD'>
2.22 toDF
#toDF:将RDD类型数据转换为DataFrame类型数据
"""
para1:schema:列名,由各个列名组成的list
para2:sampleRatio:采样率,用于推测各个列的数据类型,默认前100个数据
"""
peopleRdd = peopleDF.rdd
peopleRdd.toDF(schema=['name','age'],sampleRatio=0.5).printSchema()
输出如下:
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
2.23 collect
#collect:将DataFrame类型的数据转为list,同时会从远程集群拉去数据到driver端
print(peopleDF.collect())
#输出如下:
[Row(age=12, name='tom'), Row(age=22, name='jack'), Row(age=33, name='Bob')]
2.24 persist/unpersist
#persist:dataframe数据持久化 unpersist:数据释放持久化 cache:数据持久化,调用了persist
#is_cached:属性,是否缓存
"""
persist参数:
storageLevel:持久化策略 (useDisk,useMemory,useOffHeap,deserialized,replication=1)
默认持久化级别是:Memory_and_disk (True,True,False,False,1)
"""
persist_peopleDF = peopleDF.persist()
print(persist_peopleDF.is_cached)
persist_peopleDF_1 = persist_peopleDF.unpersist()
print(persist_peopleDF_1.is_cached)
输出如下:
True
False
2.25 fillna
#fillna:如果列为空,填充数据
"""
para1:value:要填充的值
para2:subset:列名,可以为多列
return:dataframe 返回新的dataframe
"""
修改原有数据如下:
{"name":"tom","age":12,"year":2012}
{"name":"jack"}
{"name":"Bob","age":33}
peopleDF.fillna(value=0,subset=['age','year']).show()
输出如下:
+---+----+----+
|age|name|year|
+---+----+----+
| 12| tom|2012|
| 0|jack| 0|
| 33| Bob| 0|
+---+----+----+
原文地址:https://blog.csdn.net/qq_37239381/article/details/135841945
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_62263.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。