原创/朱季谦

image

本文适合入门Spark RDD的计算处理

日常工作当中,经常遇到基于Spark读取存储在HDFS中的批量文件数据进行统计分析案例,这些文件一般以csv或者txt文件格式存在例如存在这样一份消费者行为数据字段包括消费者姓名,年龄,性别,月薪,消费偏好,消费领域,购物平台,支付方式,单次购买商品数量,优惠券获取情况,购物动机。

基于这份消费者行为数据,往往会有以下一些分析目标

针对这些需求,就可以使用Spark读取文件后,进一步分析处理统计。

接下来,就是针对以上分析目标设计一番Spark代码计算逻辑,由此可入门学习下Spark RDD常用用法

获取一份具备以下字段csv随机样本,总共5246条数据,包括“消费者姓名,年龄,性别,月薪,消费偏好,消费领域,购物平台,支付方式,单次购买商品数量,优惠券获取情况,购物动机”。

Amy Harris,39,男,18561,性价比,家居用品,天猫,微信支付,10,折扣优惠,品牌忠诚
Lori Willis,33,女,14071,功能性,家居用品,苏宁易购,货到付款,1,折扣优惠,日常使用
Jim Williams,61,男,14145,时尚潮流,汽车配件,淘宝,微信支付,3,免费赠品,礼物赠送
Anthony Perez,19,女,11587,时尚潮流,珠宝首饰,拼多多,支付宝,5,免费赠品,商品推荐
......

样本存放项目目录src/main/resources/consumerdata.csv然后新建一个Scala的object类,创建一个main方法, 模拟从HDSF读取数据然后通过.map(_.split(“,”))将csv文件一行切割成一个数组形式的RDD

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("consumer")
    val ss = SparkSession.builder().config(conf).getOrCreate()
    val filePath: String = "src/main/resources/consumerdata.csv"
    val consumerRDD = ss.sparkContext.textFile(filePath).map(_.split(","))

可以写一段代码打印看一下consumerRDD结构——

    consumerRDD.foreach(x => {
      x.foreach(y => print(y +" "))
      println()
    })

打印结果如下——

image

这个RDD相当于把每一行当作里一个Array[]数组第一行的Array0是消费者姓名,即Amy Harris,Array1是年龄,即39,以此类推。

消费者姓名 年龄 性别 月薪 消费偏好 消费领域 购物平台 支付方式 单次购买商品数量 优惠券获取情况 购物动机
Amy Harris 39 18561 性价比 家居用品 天猫 微信支付 10 折扣优惠 品牌忠诚
Lori Willis 33 14071 功能 家居用品 苏宁易购 货到付款 1 折扣优惠 日常使用
。。。

获取到该RDD后,就可以进行下一步的统计分析了。

一、统计消费者支付方式偏好分布

这行代码意思,x.apply(7)表示取每一行的第八个字段,相当数组Array[7],第八个字段是【支付方式】。

因此就可以按照以上格式,对文本数据里的每一个字段做相应分析,后文其他计算逻辑也是类似。

consumerRDD.map(x => (x.apply(7),1)).reduceByKey(_ + _).sortBy(_._2, false).foreach(println)

打印结果如下

image

二、统计购物平台偏好分布

x.apply(5)表示取每一行的第六个字段,相当数组Array[5],第六个字段是【购物平台】。

同前文的【统计消费者支付方式偏好分布】一样,通过map(x=>(x.apply(5),1))生成(购物平台,1)格式的RDD,然后通过reduceByKey算子针对相同key做统计,最后倒序排序

consumerRDD.map(x => (x.apply(5), 1)).reduceByKey(_ + _).sortBy(_._2, false).foreach(println)

打印结果——

image

三、统计购物偏好方式分布

x.apply(4)表示取每一行的第五个字段,相当数组Array[4],第五个字段是【消费领域】。

consumerRDD.map(x => (x.apply(4), 1)).reduceByKey(_ + _).sortBy(_._2, false).foreach(println)

打印结果

image

四、统计购物动机分布

x.apply(10)表示取每一行的第十个字段,相当数组Array[10],第10个字段是【购物动机】。

consumerRDD.map(x => (x.apply(10), 1)).reduceByKey(_ + _).sortBy(_._2, false).foreach(println)

打印结果——

image

五、消费者年龄分布

需求通过将RDD映射成DataFrame数据集,方便用SQL语法处理,按照年龄区间分区,分别为”0-20″,”21-30″,”31-40″

……这个分区字符串名,就相当keyvalue表示落在该分区用户数量。这时,就可以分组聚合统计了,统计出各个年龄段的消费者数量。

//取出consumerRDD每一行数需要的字段
val rowRDD = consumerRDD.map{
  x => Row(x.apply(0),x.apply(1).toInt,x.apply(2),x.apply(3).toInt,x.apply(4),x.apply(5),x.apply(6),x.apply(7),x.apply(8).toInt,x.apply(9),x.apply(10))
}

//设置字段映射
val schema = StructType(Seq(
  StructField("consumerName", StringType),
  StructField("age", IntegerType),
  StructField("gender", StringType),
  StructField("monthlyIncome", IntegerType),
  StructField("consumptionPreference", StringType),
  StructField("consumptionArea", StringType),
  StructField("shoppingPlatform", StringType),
  StructField("paymentMethod", StringType),
  StructField("quantityOfItemsPurchased", IntegerType),
  StructField("couponAcquisitionStatus", StringType),
  StructField("shoppingMotivation", StringType)

))
val df = ss.createDataFrame(rowRDD, schema).toDF()
//按年龄分布计算
val agedf = df.withColumn("age_range",
  when(col("age").between(0, 20), "0-20")
    .when(col("age").between(21, 30), "21-30")
    .when(col("age").between(31, 40), "31-40")
    .when(col("age").between(41, 50), "41-50")
    .when(col("age").between(51, 60), "51-60")
    .when(col("age").between(61, 70), "61-70")
    .when(col("age").between(81, 90), "81-90")
    .when(col("age").between(91, 100), "91-100")
    .otherwise("Unknow")
)
//分组统计
val result = agedf.groupBy("age_range").agg(count("consumerName").alias("Count")).sort(desc("Count"))
result.show()

打印结果:

image

六、统计年龄分布

类似年龄分布的操作

val sexResult = agedf.groupBy("gender").agg(count("consumerName").alias("Count")).sort(desc("Count"))
sexResult.show()

打印结果:

image

除了以上的统计分析案例之外,还有优惠券获取情况和购物动机的关系、消费领域方式等统计,可以一步拓展分析。

本文基于分析消费者行为数据,可以入门学习到,Spark如何读取样本文件通过map(_.split(“,”))处理样本成一个数组格式的RDD,基于该RDD,可以进一步通过map、reduceByKey、groupBy等算子处理与统计,最后获取该样本信息价值。

原文地址:https://blog.csdn.net/weixin_40706420/article/details/134760764

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_35078.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注