本文介绍: 原本zipWithIndex()之后的格式为JavaPairRDD 但是我们需要转换一下结构为JavaPairRDD 为后续的join操作做准备,因为join关联数据使用的是JavaPairRDD中的T字段。由于这两个Dataset的结构和数据量均不一样,若要拼接为一个大的Dataset,可以把每个Dataset多生成一个自动增长编号的列,这里最快的方式是使用每行数据的索引号,则需要转换为RDD操作。然后创建出新的Dataset
1.先将hdfs(或本地)存储的csv文件加载为Dataset
先在本地C盘准备两个csv文件
test.csv
client_id,behives,del,normal_status,cust_type,no_trd_days
7056,zl,1,hy,个人,2
7057,cf,1,hy,个人,12
7058,hs,2,hy,个人,1200
212121,0,sj,hy,个人,1100
212122,1,yx,hy,个人,100
212123,1,ls,hy,个人,100
cust_no,code,hight
sw7056,66661,125
es7057,66661,156
wq7058,66661,165
Dataset<Row> a = createRealView(session, "file:///C:\\test.csv");
Dataset<Row> b = createRealView(session, "file:///C:\\test1.csv");
a.createOrReplaceTempView("a");
b.createOrReplaceTempView("b");
由于这两个Dataset的结构和数据量均不一样,若要拼接为一个大的Dataset,可以把每个Dataset多生成一个自动增长编号的列,这里最快的方式是使用每行数据的索引号,则需要转换为RDD操作。
JavaPairRDD<Long, Row> aNewRDD = a.toJavaRDD().zipWithIndex().mapToPair(tuple -> {
Long key = tuple._2;
Row val = tuple._1;
return new Tuple2<>(key, val);
});
JavaPairRDD<Long, Row> bNewRDD = b.toJavaRDD().zipWithIndex().mapToPair(tuple -> {
Long key = tuple._2;
Row val = tuple._1;
return new Tuple2<>(key, val);
});
原本zipWithIndex()之后的格式为JavaPairRDD<Row, Long> 但是我们需要转换一下结构为JavaPairRDD<Long, Row> 为后续的join操作做准备,因为join关联数据使用的是JavaPairRDD<T,M>中的T字段
JavaPairRDD<Long, Tuple2<Row, Row>> joinRDD = aNewRDD.join(bNewRDD);
joinRDD.collect().forEach(x -> System.out.println(x));
(0,([7056,zl,1,hy,个人,2],[sw7056,66661,125]))
(1,([7057,cf,1,hy,个人,12],[es7057,66661,156]))
(2,([7058,hs,2,hy,个人,1200],[wq7058,66661,165]))
那么接下来 需要将Tuple2<Row, Row>中的两个Row合并为一个Row
JavaRDD<Row> rtRDD = joinRDD.map(tuple -> {
List<Object> rowContent = new ArrayList<>();
List<Object> tp1 = JavaConverters.seqAsJavaList(tuple._2._1.toSeq());
List<Object> tp2 = JavaConverters.seqAsJavaList(tuple._2._2.toSeq());
rowContent.addAll(tp1);
rowContent.addAll(tp2);
Seq<Object> rtSeq = JavaConverters.asScalaIteratorConverter(rowContent.iterator()).asScala().toSeq();
return Row.fromSeq(rtSeq);
});
这样就把JavaPairRDD<Long, Tuple2<Row, Row>>转换为JavaRDD<Row>,然后就需要转为Dataset<Row>
由于程序并不知道Dataset的数据结构,则需要建立结果的结构所需的schema
StructType schema = new StructType(new StructField[]{
new StructField("client_id", DataTypes.StringType, false, Metadata.empty()),
new StructField("behives", DataTypes.StringType, false, Metadata.empty()),
new StructField("del", DataTypes.StringType, false, Metadata.empty()),
new StructField("normal_status", DataTypes.StringType, false, Metadata.empty()),
new StructField("cust_type", DataTypes.StringType, false, Metadata.empty()),
new StructField("no_trd_days", DataTypes.StringType, false, Metadata.empty()),
new StructField("cust_no", DataTypes.StringType, false, Metadata.empty()),
new StructField("code", DataTypes.StringType, false, Metadata.empty()),
new StructField("hight", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> dataView = session.createDataFrame(rtRDD, schema);
dataView.show();
最终结果是
+---------+-------+---+-------------+---------+-----------+-------+-----+-----+
|client_id|behives|del|normal_status|cust_type|no_trd_days|cust_no| code|hight|
+---------+-------+---+-------------+---------+-----------+-------+-----+-----+
| 7056| zl| 1| hy| 个人| 2| sw7056|66661| 125|
| 7057| cf| 1| hy| 个人| 12| es7057|66661| 156|
| 7058| hs| 2| hy| 个人| 1200| wq7058|66661| 165|
+---------+-------+---+-------------+---------+-----------+-------+-----+-----+
public void createView(SparkSession session, String portraitPath, String prodPath) {
Dataset<Row> a = createRealView(session, "file:///C:\\test.csv");
Dataset<Row> rowDataset = a.unionAll(a).unionAll(a).orderBy(functions.rand());
a.createOrReplaceTempView("a");
Dataset<Row> b = createRealView(session, "file:///C:\\test1.csv");
b.createOrReplaceTempView("b");
JavaPairRDD<Long, Row> aNewRDD = a.toJavaRDD().zipWithIndex().mapToPair(tuple -> {
Long key = tuple._2;
Row val = tuple._1;
return new Tuple2<>(key, val);
});
JavaPairRDD<Long, Row> bNewRDD = b.toJavaRDD().zipWithIndex().mapToPair(tuple -> {
Long key = tuple._2;
Row val = tuple._1;
return new Tuple2<>(key, val);
});
JavaPairRDD<Long, Tuple2<Row, Row>> joinRDD = aNewRDD.join(bNewRDD);
joinRDD.collect().forEach(x -> System.out.println(x));
StructType schema = new StructType(new StructField[]{
new StructField("client_id", DataTypes.StringType, false, Metadata.empty()),
new StructField("behives", DataTypes.StringType, false, Metadata.empty()),
new StructField("del", DataTypes.StringType, false, Metadata.empty()),
new StructField("normal_status", DataTypes.StringType, false, Metadata.empty()),
new StructField("cust_type", DataTypes.StringType, false, Metadata.empty()),
new StructField("no_trd_days", DataTypes.StringType, false, Metadata.empty()),
new StructField("cust_no", DataTypes.StringType, false, Metadata.empty()),
new StructField("code", DataTypes.StringType, false, Metadata.empty()),
new StructField("hight", DataTypes.StringType, false, Metadata.empty())
});
JavaRDD<Row> rtRDD = joinRDD.map(tuple -> {
List<Object> rowContent = new ArrayList<>();
List<Object> tp1 = JavaConverters.seqAsJavaList(tuple._2._1.toSeq());
List<Object> tp2 = JavaConverters.seqAsJavaList(tuple._2._2.toSeq());
rowContent.addAll(tp1);
rowContent.addAll(tp2);
Seq<Object> rtSeq = JavaConverters.asScalaIteratorConverter(rowContent.iterator()).asScala().toSeq();
return Row.fromSeq(rtSeq);
});
Dataset<Row> dataView = session.createDataFrame(rtRDD, schema);
dataView.show();
}
private Dataset<Row> createRealView(SparkSession session, String hdfsPath) {
Dataset<Row> wideTableDF = null;
try {
wideTableDF = session.read().format("csv").option("header", "true").load(hdfsPath);
} catch (Exception e) {
System.err.println("未找到有效的宽表数据,查找路径为:" + hdfsPath);
}
return wideTableDF;
}
原文地址:https://blog.csdn.net/qq_40623672/article/details/134804819
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_47272.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。