本文介绍: 到这一步,我们已经使用Glue ETL对s3桶中的数据进行了清洗、分区操作。在进行上篇中的Athena操作后,我们已经可以通过Athena直接查询到清洗、分区后的数据集了。接下来,我们会通过使用APIGateway+Lambda+Athena来构建一个无服务器的数据查询分析服务。
2 数据清洗、转换
ETL:
大纲
2.1 架构图
2.2 数据清洗
此步会将S3中的原始数据清洗成我们想要的自定义结构的数据。之后,我们可通过APIGateway+Lambda+Athena来实现一个无服务器的数据分析服务。
步骤 | 图例 |
---|---|
1、入口 | |
2、创建Job(s3作为数据源,则Type选择Spark,若为Kinesis等,选择Stream Spark) | |
3、IAM角色需要有s3与Glue的权限 | |
4、选择s3脚本位置,若已经完成脚本的编写工作,则可以选择第二项或第三项,若无则Glue会提供默认脚本 | |
5、安全配置参数 | 建议:添加参数–enable–auto–scaling为true。每次在我们执行Job任务时,会根据运行 ETL 任务的数据处理单元(DPU)的个数来分配动态IP,在我们子网的动态IP数低于DPU数时,Job将会执行失败。此参数将会动态分配IP。 |
6、数据源() | |
7、数据目标(我们会将清洗后的数据存储到新的s3桶) | |
8、设计架构(在本案例中,我们会自定义脚本。所以不再在此处设计架构)(此处设计后,脚本会自动生成相关代码) | |
9、保存 |
2.3 编辑脚本
2.3.1 连接数据源(s3)
#数据源
datasource = glueContext.create_dynamic_frame.from_catalog(database = args['db_name'], table_name = tableName, transformation_ctx = "datasource")
2.3.2. 数据结构转换
mapped_readings = ApplyMapping.apply(frame = datasource, mappings = [("lclid", "string", "meter_id", "string"),
("datetime", "string", "reading_time", "string"),
("KWH/hh (per half hour)", "double", "reading_value", "double")],
transformation_ctx = "mapped_readings")
2.3.2 数据结构拆分、定义
mapped_readings_df = DynamicFrame.toDF(mapped_readings)
mapped_readings_df = mapped_readings_df.withColumn("obis_code", lit(""))
mapped_readings_df = mapped_readings_df.withColumn("reading_type", lit("INT"))
reading_time = to_timestamp(col("reading_time"), "yyyy-MM-dd HH:mm:ss")
mapped_readings_df = mapped_readings_df
.withColumn("week_of_year", weekofyear(reading_time))
.withColumn("date_str", regexp_replace(col("reading_time").substr(1,10), "-", ""))
.withColumn("day_of_month", dayofmonth(reading_time))
.withColumn("month", month(reading_time))
.withColumn("year", year(reading_time))
.withColumn("hour", hour(reading_time))
.withColumn("minute", minute(reading_time))
.withColumn("reading_date_time", reading_time)
.drop("reading_time")
2.3.3 清洗后的数据写入新s3
# write data to S3
filteredMeterReads = DynamicFrame.fromDF(mapped_readings_df, glueContext, "filteredMeterReads")
s3_clean_path = "s3://" + args['clean_data_bucket']
glueContext.write_dynamic_frame.from_options(
frame = filteredMeterReads,
connection_type = "s3",
connection_options = {"path": s3_clean_path},
format = "parquet",
transformation_ctx = "s3CleanDatasink")
2.3.4 运行作业
执行成功后,状态将变为“SUCCESS”,失败将会给出失败信息,可在CloudWatch 中查看详情
清洗后的数据保存到了s3
数据清洗完毕后,可通过上一篇中的爬网程序步骤,将清洗后的数据的结构创建表到数据目录中,
此时我们可以使用Athena对清洗后的数据进行分析。
2.4 数据分区
接下来我们对数据进行分区处理(此处只提供了按天分区)
重新进行数据清洗中的创建Job操作后,重写脚本
2.4.1 编辑脚本
cleanedMeterDataSource = glueContext.create_dynamic_frame.from_catalog(database = args['db_name'], table_name = tableName, transformation_ctx = "cleanedMeterDataSource")
business_zone_bucket_path_daily = "s3://{}/daily".format(args['business_zone_bucket'])
businessZone = glueContext.write_dynamic_frame.from_options(frame = cleanedMeterDataSource,
connection_type = "s3",
connection_options = {"path": business_zone_bucket_path_daily, "partitionKeys": ["reading_type", "date_str"]},
format = "parquet",
transformation_ctx = "businessZone")
2.4.2 运行脚本
分区后的数据结果:
再次创建、运行爬网程序,将会在数据目录中生成新的分区表。
2.5 总结
到这一步,我们已经使用Glue ETL对s3桶中的数据进行了清洗、分区操作。在进行上篇中的Athena操作后,我们已经可以通过Athena直接查询到清洗、分区后的数据集了。
接下来,我们会通过使用APIGateway+Lambda+Athena来构建一个无服务器的数据查询分析服务。
原文地址:https://blog.csdn.net/wujiesunlirong/article/details/134791685
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_42156.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。