原创/朱季谦
在日常工作当中,经常遇到基于Spark去读取存储在HDFS中的批量文件数据进行统计分析的案例,这些文件一般以csv或者txt文件格式存在。例如,存在这样一份消费者行为数据,字段包括消费者姓名,年龄,性别,月薪,消费偏好,消费领域,购物平台,支付方式,单次购买商品数量,优惠券获取情况,购物动机。
- 用户统计学分析:针对性别、年龄等属性进行统计分析,了解消费者群体的组成和特征。
- 收入与购买行为的关系分析:通过比较月薪和单次购买商品数量之间的关系,探索收入水平对消费行为的影响。
- 消费偏好和消费领域的分析:查看不同消费者的消费偏好(例如性价比、功能性、时尚潮流等)和消费领域(例如家居用品、汽车配件、美妆护肤等),以了解他们的兴趣和偏好。
- 购物平台和支付方式的分析:研究购物平台(例如天猫、淘宝、拼多多等)和支付方式(例如微信支付、支付宝等)的选择情况,了解消费者在电商平台上的偏好。
- 优惠券获取情况和购物动机的关系:观察优惠券获取情况和购物动机之间的联系,探索消费者是否更倾向于使用优惠券进行购物。
针对这些需求,就可以使用Spark来读取文件后,进一步分析处理统计。
接下来,就是针对以上分析目标,设计一番Spark代码计算逻辑,由此可入门学习下Spark RDD常用用法。
获取一份具备以下字段的csv随机假样本,总共5246条数据,包括“消费者姓名,年龄,性别,月薪,消费偏好,消费领域,购物平台,支付方式,单次购买商品数量,优惠券获取情况,购物动机”。
Amy Harris,39,男,18561,性价比,家居用品,天猫,微信支付,10,折扣优惠,品牌忠诚
Lori Willis,33,女,14071,功能性,家居用品,苏宁易购,货到付款,1,折扣优惠,日常使用
Jim Williams,61,男,14145,时尚潮流,汽车配件,淘宝,微信支付,3,免费赠品,礼物赠送
Anthony Perez,19,女,11587,时尚潮流,珠宝首饰,拼多多,支付宝,5,免费赠品,商品推荐
......
将样本存放到项目目录为src/main/resources/consumerdata.csv,然后新建一个Scala的object类,创建一个main方法, 模拟从HDSF读取数据,然后通过.map(_.split(“,”))将csv文件每一行切割成一个数组形式的RDD
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("consumer")
val ss = SparkSession.builder().config(conf).getOrCreate()
val filePath: String = "src/main/resources/consumerdata.csv"
val consumerRDD = ss.sparkContext.textFile(filePath).map(_.split(","))
consumerRDD.foreach(x => {
x.foreach(y => print(y +" "))
println()
})
这个RDD相当于把每一行当作里一个Array[]数组,第一行的Array0是消费者姓名,即Amy Harris,Array1是年龄,即39,以此类推。
消费者姓名 | 年龄 | 性别 | 月薪 | 消费偏好 | 消费领域 | 购物平台 | 支付方式 | 单次购买商品数量 | 优惠券获取情况 | 购物动机 |
---|---|---|---|---|---|---|---|---|---|---|
Amy Harris | 39 | 男 | 18561 | 性价比 | 家居用品 | 天猫 | 微信支付 | 10 | 折扣优惠 | 品牌忠诚 |
Lori Willis | 33 | 女 | 14071 | 功能性 | 家居用品 | 苏宁易购 | 货到付款 | 1 | 折扣优惠 | 日常使用 |
。。。 |
一、统计消费者支付方式偏好分布
这行代码意思,x.apply(7)表示取每一行的第八个字段,相当数组Array[7],第八个字段是【支付方式】。
- map(x=>(x.apply(7),1))表示是对RDD里每一行出现过的支付方式字段设置为1个,例如,第一行把原本数组格式Array的RDD做了转换,生成(微信支付,1)格式的新RDD,表示用微信支付的用户出现了1次。
- reduceByKey(_ + _)表示按RDD的key进行聚合统计,表示统计微信支付出现的次数,支付宝出现的次数等。最后,通过
- sortBy(_._2,false)表示按照key–value当中的value进行倒序排序,false表示倒叙,true表示升序。
因此就可以按照以上格式,对文本数据里的每一个字段做相应分析,后文其他计算逻辑也是类似。
consumerRDD.map(x => (x.apply(7),1)).reduceByKey(_ + _).sortBy(_._2, false).foreach(println)
二、统计购物平台偏好分布
x.apply(5)表示取每一行的第六个字段,相当数组Array[5],第六个字段是【购物平台】。
同前文的【统计消费者支付方式偏好分布】一样,通过map(x=>(x.apply(5),1))生成(购物平台,1)格式的RDD,然后再通过reduceByKey算子针对相同的key做统计,最后倒序排序。
consumerRDD.map(x => (x.apply(5), 1)).reduceByKey(_ + _).sortBy(_._2, false).foreach(println)
三、统计购物偏好方式分布
x.apply(4)表示取每一行的第五个字段,相当数组Array[4],第五个字段是【消费领域】。
consumerRDD.map(x => (x.apply(4), 1)).reduceByKey(_ + _).sortBy(_._2, false).foreach(println)
四、统计购物动机分布
x.apply(10)表示取每一行的第十个字段,相当数组Array[10],第10个字段是【购物动机】。
consumerRDD.map(x => (x.apply(10), 1)).reduceByKey(_ + _).sortBy(_._2, false).foreach(println)
五、消费者年龄分布
该需求通过将RDD映射成DataFrame数据集,方便用SQL语法处理,按照年龄区间分区,分别为”0-20″,”21-30″,”31-40″
……这个分区字符串名,就相当key,value表示落在该分区的用户数量。这时,就可以分组做聚合统计了,统计出各个年龄段的消费者数量。
//取出consumerRDD每一行数组需要的字段
val rowRDD = consumerRDD.map{
x => Row(x.apply(0),x.apply(1).toInt,x.apply(2),x.apply(3).toInt,x.apply(4),x.apply(5),x.apply(6),x.apply(7),x.apply(8).toInt,x.apply(9),x.apply(10))
}
//设置字段映射
val schema = StructType(Seq(
StructField("consumerName", StringType),
StructField("age", IntegerType),
StructField("gender", StringType),
StructField("monthlyIncome", IntegerType),
StructField("consumptionPreference", StringType),
StructField("consumptionArea", StringType),
StructField("shoppingPlatform", StringType),
StructField("paymentMethod", StringType),
StructField("quantityOfItemsPurchased", IntegerType),
StructField("couponAcquisitionStatus", StringType),
StructField("shoppingMotivation", StringType)
))
val df = ss.createDataFrame(rowRDD, schema).toDF()
//按年龄分布计算
val agedf = df.withColumn("age_range",
when(col("age").between(0, 20), "0-20")
.when(col("age").between(21, 30), "21-30")
.when(col("age").between(31, 40), "31-40")
.when(col("age").between(41, 50), "41-50")
.when(col("age").between(51, 60), "51-60")
.when(col("age").between(61, 70), "61-70")
.when(col("age").between(81, 90), "81-90")
.when(col("age").between(91, 100), "91-100")
.otherwise("Unknow")
)
//分组统计
val result = agedf.groupBy("age_range").agg(count("consumerName").alias("Count")).sort(desc("Count"))
result.show()
打印结果:
六、统计年龄分布
类似年龄分布的操作。
val sexResult = agedf.groupBy("gender").agg(count("consumerName").alias("Count")).sort(desc("Count"))
sexResult.show()
打印结果:
除了以上的统计分析案例之外,还有优惠券获取情况和购物动机的关系、消费领域方式等统计,可以进一步拓展分析。
本文基于分析消费者行为数据,可以入门学习到,Spark如何读取样本文件,通过map(_.split(“,”))处理样本成一个数组格式的RDD,基于该RDD,可以进一步通过map、reduceByKey、groupBy等算子做处理与统计,最后获取该样本的信息价值。
原文地址:https://blog.csdn.net/weixin_40706420/article/details/134760764
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_35078.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!