目录

1.Spark SQL 简介

2.SparkSession

3.Spark SQL 数据的读写

3.1 读写 TXT 文件

3.2 读写 CSV 文件

3.3 读写 JSON 文件

3.4 读写 Parquet 文件

3.5 读写 ORC 文件

3.6 读写MySQL数据库

4.Spark SQL 语法

4.1 SQL 语法

4.2 DSL 语法


1.Spark SQL 简介

        Spark SQL是Apache Spark一个模块用于处理结构化数据。它提供了两种编程抽象:DataFrame和DataSet,并作为分布式SQL查询引擎

        DataFrame是一个分布式数据集合,类似于传统数据库中的表,带有schema信息。DataFrame是以列式格式进行存储的,每一列可以不同数据类型(如整数字符串等),并且每一列都有一个名称

        Dataset是在Spark 1.6中新增一个接口,它提供了RDD(强类型可以使用强大的lambda函数)的优点,以及Spark SQL优化执行引擎的优点。数据集可以从JVM对象构建然后使用函数转换mapflatMapfilter等)进行操作

2.SparkSession

        SparkSession是Spark 2.0版本引入的新概念,它是对SparkContext升级扩展。SparkSession提供了更加统一的API(在Spark 1.x版本中,每个API都需要一个对应的Context例如StreamingContext、SQLContext、HiveContext等),包括SQL查询、数据转换、数据读写注册数据源管理配置操作
        SparkSession是Spark 应用程序的新入口点,要创建一个基本的SparkSession,只需使用SparkSession.builder即可

package com.yichenkeji.demo.sparkjava;

import org.apache.spark.sql.SparkSession;

public class SparkSqlDemo {
    public static void main(String[] args) {
        SparkSession spark = SparkSession
                .builder()
                .master("local")
                .appName("Spark SQL Demo")
//                .config("spark.some.config.option", "some-value")
                .getOrCreate();

        spark.stop();
    }
}
package com.yichenkeji.demo.sparkscala

import org.apache.spark.sql.SparkSession

object SparkSqlDemo {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .master("local")
      .appName("Spark SQL Demo")
//      .config("spark.some.config.option", "some-value")
      .getOrCreate()

    spark.stop
  }

}

3.Spark SQL 数据的读写

3.1 读写 TXT 文件

        Spark SQL提供了spark.read().text(“file_name“)来将文本文件目录下的文本文件读取到Spark DataFrame中,以及dataframe.write().text(“path“)来将数据写入文本文件。在读取文本文件时,每一行默认成为具有字符串value”列的一行。在写入文本文件时以及dataframe对象中的每一行也只能有一列,否则会提示“Text data source supports only a single column, and you have 2 columns.”错误

package com.yichenkeji.demo.sparkjava;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class SparkSqlTxtDemo {
    public static void main(String[] args) {
        SparkSession spark = SparkSession
                .builder()
                .master("local")
                .appName("Spark SQL TXT Demo")
//                .config("spark.some.config.option", "some-value")
                .getOrCreate();
        //读取txt文件
        Dataset<Row&gt; df1 = spark.read()
//                .option("lineSep", ",")//自定义分隔符:默认是`r`, `rn` and `n`
                .text("F:\24\spark\data\people.txt");
        df1.show();
        //读取txt文件的另一种方式
        Dataset<Row&gt; df2 = spark.read().format("text")
               .load("F:\24\spark\data\people.txt");
        df2.show();

        //写入txt文件
        df1.write().text("F:\24\spark\data\people_1.txt");
        //写入txt文件的另外一种方式
        df2.write().format("text").save("F:\24\spark\data\people_2.txt");
        spark.stop();
    }
}
package com.yichenkeji.demo.sparkscala

import org.apache.spark.sql.SparkSession

object SparkSqlTxtDemo {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .master("local")
      .appName("Spark SQL TXT Demo")
      //      .config("spark.some.config.option", "some-value")
      .getOrCreate()
    //读取txt文件
    val df1 = spark.read.text("F:\24\spark\data\people.txt")
    df1.show
    //读取txt文件的另一种方式
    val df2 = spark.read.format("text").load("F:\24\spark\data\people.txt")
    df2.show

    //写入txt文件
    df1.write.text("F:\24\spark\data\people_1.txt")
    //写入txt文件的另一种方式
    df2.write.format("text").save("F:\24\spark\data\people_2.txt")
    
    spark.stop
  }

}

3.2 读写 CSV 文件

        Spark SQL提供了spark.read().csv(“file_name“)将CSV格式的文件或文件夹读取到Spark DataFrame中,以及dataframe.write().csv(“path“)将数据写入CSV文件。

package com.yichenkeji.demo.sparkjava;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class SparkSqlCsvDemo {
    public static void main(String[] args) {
        SparkSession spark = SparkSession
                .builder()
                .master("local")
                .appName("Spark SQL CSV Demo")
//                .config("spark.some.config.option", "some-value")
                .getOrCreate();

        //读取CSV文件:CSV带有表头
        Dataset<Row> df1 = spark.read()
               .option("header", "true")//设置是否表头
                .option("delimiter", ";") //设置分隔符,默认逗号
               .csv("F:\24\spark\data\people.csv");
        df1.show();
        //读取CSV文件:CSV带有表头
        //csv文件的另外一种读取方式
        Dataset<Row> df2 = spark.read().format("csv")
                .option("header", "true")//设置是否有表头
                .option("delimiter", ";") //设置分隔符,默认逗号
               .load("F:\24\spark\data\people.csv");
        df2.show();

        //写入CSV文件
        df1.write().csv("F:\24\spark\data\people_1.csv");
        //写入CSV文件的另外一种方式
        df2.write().format("csv").save("F:\24\spark\data\people_2.csv");
        spark.stop();
    }
}
package com.yichenkeji.demo.sparkscala

import org.apache.spark.sql.SparkSession

object SparkSqlCsvDemo {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .master("local")
      .appName("Spark SQL CSV Demo")
      //      .config("spark.some.config.option", "some-value")
      .getOrCreate()
    //读取CSV文件
    val df1 = spark.read
      .option("header", "true")//是否有表头
      .option("delimiter", ";") //分隔符
      .csv("F:\24\spark\data\people.csv")
    df1.show
    //读取CSV文件的另一种方式
    val df2 = spark.read.format("csv")
      .option("header", "true") //是否有表头
      .option("delimiter", ";") //分隔符
      .load("F:\24\spark\data\people.csv")
    df2.show

    //写入csv文件
    df1.write.csv("F:\24\spark\data\people_1.csv")
    //写入csv文件的另一种方式
    df2.write.format("csv").save("F:\24\spark\data\people_2.csv")

    spark.stop
  }

}

3.3 读写 JSON 文件

        Spark SQL提供了spark.read().json(“file_name“)将json格式的文件或文件夹读取到Spark DataFrame中,以及dataframe.write().json(“path“)将数据写入json文件。

package com.yichenkeji.demo.sparkjava;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class SparkSqlJsonDemo {
    public static void main(String[] args) {
        SparkSession spark = SparkSession
                .builder()
                .master("local")
                .appName("Spark SQL JSON Demo")
//                .config("spark.some.config.option", "some-value")
                .getOrCreate();

        //读取JSON文件
        Dataset<Row> df1 = spark.read()
               .json("F:\24\spark\data\people.json");
        df1.show();
        //读取JSON文件
        //JSON文件的另外一种读取方式
        Dataset<Row> df2 = spark.read().format("json")
               .load("F:\24\spark\data\people.json");
        df2.show();

        //写入JSON文件
        df1.write().json("F:\24\spark\data\people_1.json");
        //写入JSON文件的另外一种方式
        df2.write().format("csv").save("F:\24\spark\data\people_2.json");
        spark.stop();
    }
}
package com.yichenkeji.demo.sparkscala

import org.apache.spark.sql.SparkSession

object SparkSqlJsonDemo {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .master("local")
      .appName("Spark SQL JSON Demo")
      //      .config("spark.some.config.option", "some-value")
      .getOrCreate()
    //读取JSON文件
    val df1 = spark.read
      .json("F:\24\spark\data\people.json")
    df1.show
    //读取JSON文件的另一种方式
    val df2 = spark.read.format("json")
      .load("F:\24\spark\data\people.json")
    df2.show

    //写入JSON文件
    df1.write.json("F:\24\spark\data\people_1.json")
    //写入JSON件的另一种方式
    df2.write.format("json").save("F:\24\spark\data\people_2.json")

    spark.stop
  }

}

3.4 读写 Parquet 文件

        Parquet 是一种通用的列式存储格式,Parquet已广泛应用于Hadoop生态圈(MapReduce、Spark、Hive、Impala),是离线数仓的主要数据格式。Spark SQL提供了spark.read().parquet(“file_name“)将parquet格式的文件或文件夹读取到Spark DataFrame中,以及dataframe.write().parquet(“path“)将数据写入parquet文件。

package com.yichenkeji.demo.sparkjava;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class SparkSqlParquetDemo {
    public static void main(String[] args) {
        SparkSession spark = SparkSession
                .builder()
                .master("local")
                .appName("Spark SQL Parquet Demo")
//                .config("spark.some.config.option", "some-value")
                .getOrCreate();

        //读取Parquet文件
        Dataset<Row> df1 = spark.read()
               .parquet("F:\24\spark\data\users.parquet");
        df1.show();
        //Parquet件的另外一种读取方式
        Dataset<Row> df2 = spark.read().format("parquet")
               .load("F:\24\spark\data\users.parquet");
        df2.show();

        //写入Parquet文件
        df1.write().parquet("F:\24\spark\data\users_1.parquet");
        //写入Parquet文件的另外一种方式
        df2.write().format("parquet").save("F:\24\spark\data\users_2.parquet");
        spark.stop();
    }
}
package com.yichenkeji.demo.sparkscala

import org.apache.spark.sql.SparkSession

object SparkSqlParquetDemo {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .master("local")
      .appName("Spark SQL Parquet Demo")
      //      .config("spark.some.config.option", "some-value")
      .getOrCreate()
    //读取Parquet文件
    val df1 = spark.read
      .parquet("F:\24\spark\data\users.parquet")
    df1.show
    //读取Parquet文件的另一种方式
    val df2 = spark.read.format("parquet")
      .load("F:\24\spark\data\users.parquet")
    df2.show

    //写入Parquet文件
    df1.write.parquet("F:\24\spark\data\users_1.parquet")
    //写入Parquet件的另一种方式
    df2.write.format("parquet").save("F:\24\spark\data\users_2.parquet")

    spark.stop
  }

}

3.5 读写 ORC 文件

        ORC是一种列式存储结构,ORC最早产生于Apache Hive,用于减小Hadoop文件的存储空间和加速Hive查询。和Parquet一样,ORC也广泛应用于Hadoop生态圈。Spark SQL提供了spark.read().orc(“file_name“)将orc格式的文件或文件夹读取到Spark DataFrame中,以及dataframe.write().orc(“path“)将数据写入orc文件。

  •  Java代码实现:
package com.yichenkeji.demo.sparkjava;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class SparkSqlOrcDemo {
    public static void main(String[] args) {
        SparkSession spark = SparkSession
                .builder()
                .master("local")
                .appName("Spark SQL ORC Demo")
//                .config("spark.some.config.option", "some-value")
                .getOrCreate();

        //读取ORC文件
        Dataset<Row> df1 = spark.read()
               .orc("F:\24\spark\data\users.orc");
        df1.show();
        //ORC文件的另外一种读取方式
        Dataset<Row> df2 = spark.read().format("orc")
               .load("F:\24\spark\data\users.orc");
        df2.show();

        //写入ORC文件
        df1.write().orc("F:\24\spark\data\users_1.orc");
        //写入ORC文件的另外一种方式
        df2.write().format("orc").save("F:\24\spark\data\users_2.orc");

        spark.stop();
    }
}
  • Scala代码实现:
package com.yichenkeji.demo.sparkscala

import org.apache.spark.sql.SparkSession

object SparkSqlOrcDemo {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .master("local")
      .appName("Spark SQL ORC Demo")
      //      .config("spark.some.config.option", "some-value")
      .getOrCreate()
    //读取ORC文件
    val df1 = spark.read
      .orc("F:\24\spark\data\users.orc")
    df1.show
    //读取ORC文件的另一种方式
    val df2 = spark.read.format("orc")
      .load("F:\24\spark\data\users.orc")
    df2.show

    //写入ORC文件
    df1.write.orc("F:\24\spark\data\users_1.orc")
    //写入ORC件的另一种方式
    df2.write.format("orc").save("F:\24\spark\data\users_2.orc")

    spark.stop
  }

}

3.6 读写MySQL数据库

        Spark SQL 可以通过 JDBC 从关系数据库读取数据的方式创建 DataFrame,通过对DataFrame 一系列计算后,还可以将数据再写回关系数据库中。

  • Java代码实现:
package com.yichenkeji.demo.sparkjava;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;

import java.util.Properties;

public class SparkSqlMySQLDemo {
    public static void main(String[] args) {
        SparkSession spark = SparkSession
                .builder()
                .master("local")
                .appName("Spark SQL MySQL Demo")
//                .config("spark.some.config.option", "some-value")
                .getOrCreate();
        String url = "jdbc:mysql://localhost:3307/demo?useSSL=false";
        String user = "root";
        String password = "wsx-123";
        String driver = "com.mysql.cj.jdbc.Driver";
        String table = "demo.dim_area";

        Properties properties = new Properties();
        properties.setProperty("user",user);
        properties.setProperty("password",password);
        properties.setProperty("driver",driver);
        properties.setProperty("url",url);
        properties.setProperty("useUnicode","true");
        properties.setProperty("characterEncoding","utf-8");

        //读取MySQL文件
        Dataset<Row> df1 = spark.read()
                        .jdbc(url,table,properties);
        df1.show(2);
        //MySQL的另外一种读取方式
        Dataset<Row> df2 = spark.read().format("jdbc")
                .option("url", url)
                .option("dbtable", table)
                .option("user", user)
                .option("password", password)
               .load();
        df2.show(3);

//        //写入MySQL文件
        df1.write().mode(SaveMode.Overwrite).jdbc(url,"dim_area_1",properties);
//        //写入MySQL的另外一种方式
        df2.write().mode(SaveMode.Overwrite).format("jdbc")
                .option("url", url)
                .option("dbtable", "dim_area_2")
                .option("user", user)
                .option("password", password)
                .save();

        spark.stop();
    }
}
  • Scala代码实现:
package com.yichenkeji.demo.sparkscala

import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}

import java.util.Properties

object SparkSqlMySQLDemo {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .master("local")
      .appName("Spark SQL MySQL Demo")
      //      .config("spark.some.config.option", "some-value")
      .getOrCreate()
    val url = "jdbc:mysql://localhost:3307/demo?useSSL=false"
    val user = "root"
    val password = "wsx-123"
    val driver = "com.mysql.cj.jdbc.Driver"
    val table = "demo.dim_area"

    val properties = new Properties
    properties.setProperty("user", user)
    properties.setProperty("password", password)
    properties.setProperty("driver", driver)
    properties.setProperty("url", url)
    properties.setProperty("useUnicode", "true")
    properties.setProperty("characterEncoding", "utf-8")

    //读取MySQL文件
    val df1 = spark.read.jdbc(url, table, properties)
    df1.show(2)
    //MySQL的另外一种读取方式
    val df2 = spark.read.format("jdbc")
      .option("url", url)
      .option("dbtable", table)
      .option("user", user)
      .option("password", password)
      .load

    df2.show(3)

    //写入MySQL文件
    df1.write.mode(SaveMode.Overwrite).jdbc(url, "dim_area_1", properties)
    //写入MySQL的另外一种方式
    df2.write.mode(SaveMode.Overwrite).format("jdbc")
      .option("url", url)
      .option("dbtable", "dim_area_2")
      .option("user", user)
      .option("password", password)
      .save

    spark.stop
  }

}

        注意通过jdbc方式写入mysql数据库是,dataframe中的字段保存表中必须存在,否则报错

4.Spark SQL 语法

4.1 SQL 语法

        SparkSession中的sql函数允许应用程序编程运行SQL查询,并将结果作为DataFrame返回。这种方式的查询必须要有临时视图或者全局视图辅助

  • Java代码实现:
package com.yichenkeji.demo.sparkjava;

import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class SparkSqlDemo {
    public static void main(String[] args) throws AnalysisException {
        SparkSession spark = SparkSession
                .builder()
                .master("local")
                .appName("Spark SQL Demo")
//                .config("spark.some.config.option", "some-value")
                .getOrCreate();
        //读取CSV文件:CSV带有表头
        Dataset<Row> df1 = spark.read()
                .option("header", "true")//设置是否有表头
                .option("delimiter", ";") //设置分隔符,默认逗号
                .csv("F:\24\spark\data\people.csv");
        df1.show();
        //创建临时视图
        df1.createOrReplaceTempView("people");
        //创建全局临时视图
         df1.createGlobalTempView("people");
        Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
        sqlDF.show();
        //+-----+---+---------+
        //| name|age|      job|
        //+-----+---+---------+
        //|Jorge| 30|Developer|
        //|  Bob| 32|Developer|
        //+-----+---+---------+
        //全局临时视图保存global_temp数据库中,使用全局临时视图时需要路径访问,如:global_temp.people
        Dataset<Row> sqlDF2 = spark.sql("SELECT name,age,job FROM global_temp.people WHERE age > 30");
        sqlDF2.show();
        //+-----+---+---------+
        //| name|age|      job|
        //+-----+---+---------+
        //|  Bob| 32|Developer|
        //+-----+---+---------+
        spark.stop();
    }
}
  • Scala代码实现:
package com.yichenkeji.demo.sparkscala

import org.apache.spark.sql.SparkSession

object SparkSqlDemo {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .master("local")
      .appName("Spark SQL Demo")
//      .config("spark.some.config.option", "some-value")
      .getOrCreate()
    //读取CSV文件
    val df1 = spark.read
      .option("header", "true") //是否有表头
      .option("delimiter", ";") //分隔符
      .csv("F:\24\spark\data\people.csv")
    df1.show
    // 创建临时视图
    df1.createOrReplaceTempView("people")
    // 创建全局临时视图
     df1.createGlobalTempView("people")
    //使用临时视图
    spark.sql("SELECT * FROM people").show()
    //+-----+---+---------+
    //| name|age|      job|
    //+-----+---+---------+
    //|Jorge| 30|Developer|
    //|  Bob| 32|Developer|
    //+-----+---+---------+

    // 全局临时视图保存global_temp数据库中,使用全局临时视图时需要路径访问,如:global_temp.people
    spark.sql("SELECT name,age,job FROM global_temp.people where age > 30").show()
    //+-----+---+---------+
    //| name|age|      job|
    //+-----+---+---------+
    //|  Bob| 32|Developer|
    //+-----+---+---------+
    spark.stop
  }

}

4.2 DSL 语法

        DataFrame DataFrame为Scala、Java、Python和R提供了针对结构化数据操作领域特定语言。使用 DSL 语法不必创建临时视图。

  • Java代码实现:
package com.yichenkeji.demo.sparkjava;

import org.apache.spark.sql.*;

import static org.apache.spark.sql.functions.col;
public class SparkSqlDSLDemo {
    public static void main(String[] args) throws AnalysisException {
        SparkSession spark = SparkSession
                .builder()
                .master("local")
                .appName("Spark SQL DSL")
//                .config("spark.some.config.option", "some-value")
                .getOrCreate();
        //读取CSV文件:CSV带有表头
        Dataset<Row> df1 = spark.read()
                .option("header", "true")//设置是否有表头
                .option("delimiter", ";") //设置分隔符,默认逗号
                .csv("F:\24\spark\data\people.csv");
        df1.show();
        // 打印字段信息
        df1.printSchema();
//		root
//		 |-- name: string (nullable = true)
//		 |-- age: string (nullable = true)
//		 |-- job: string (nullable = true)

        //查询指定字段
        df1.select("name","age","job").show();
        //+-----+---+---------+
        //| name|age|      job|
        //+-----+---+---------+
        //|Jorge| 30|Developer|
        //|  Bob| 32|Developer|
        //+-----+---+---------+

       //查询指定字段并设置别名,对查询的字段进行运算
        df1.select(col("name").alias("name")
                ,col("age").plus(1).alias("age")
                ,col("job"))
                .show();
        //+-----+---+---------+
        //| name|age|      job|
        //+-----+---+---------+
        //|Jorge| 31|Developer|
        //|  Bob| 33|Developer|
        //+-----+---+---------+

        //过滤数据
        df1.filter(col("age").gt(30)).show();
//        df1.where(col("age").gt(30)).show();
        //+-----+---+---------+
        //| name|age|      job|
        //+-----+---+---------+
        //|  Bob| 32|Developer|
        //+-----+---+---------+

        //分组聚合
        df1.groupBy("job").count().show();
        //+---------+-----+
        //|      job|count|
        //+---------+-----+
        //|Developer|    2|
        //+---------+-----+
        //复杂聚合
        df1.groupBy(col("job").alias("post")).agg(
                functions.count(col("name")).as("nums")
                ,functions.sum(col("age")).as("total_age")
                ,functions.avg(col("age")).as("aver_age")
                ).show();
        //+---------+----+---------+--------+
        //|     post|nums|total_age|aver_age|
        //+---------+----+---------+--------+
        //|Developer|   2|     62.0|    31.0|
        //+---------+----+---------+--------+
        spark.stop();
    }
}
  • Scala代码实现:
package com.yichenkeji.demo.sparkscala

import org.apache.spark.sql.{SparkSession, functions}
import org.apache.spark.sql.functions.col

object SparkSqlDSLDemo {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .master("local")
      .appName("Spark SQL DSL")
//      .config("spark.some.config.option", "some-value")
      .getOrCreate()
    //读取CSV文件
    val df1 = spark.read
      .option("header", "true") //是否有表头
      .option("delimiter", ";") //分隔符
      .csv("F:\24\spark\data\people.csv")
    df1.show

    //打印字段信息
    df1.printSchema
    //		root
    //		 |-- name: string (nullable = true)
    //		 |-- age: string (nullable = true)
    //		 |-- job: string (nullable = true)

    //查询指定字段
    df1.select("name", "age", "job").show
    //+-----+---+---------+
    //| name|age|      job|
    //+-----+---+---------+
    //|Jorge| 30|Developer|
    //|  Bob| 32|Developer|
    //+-----+---+---------+

    //查询指定字段并设置别名,对查询的字段进行运算
    import spark.implicits._
    df1.select($"name" as "name", ($"age" + 1) as "age", $"job" as "job").show
    //+-----+---+---------+
    //| name|age|      job|
    //+-----+---+---------+
    //|Jorge| 31|Developer|
    //|  Bob| 33|Developer|
    //+-----+---+---------+

    //过滤数据
    df1.filter($"age" > 30).show
//    df1.where($"age" > 30).show
    //+-----+---+---------+
    //| name|age|      job|
    //+-----+---+---------+
    //|  Bob| 32|Developer|
    //+-----+---+---------+

    //分组聚合
    df1.groupBy("job").count.show
    //+---------+-----+
    //|      job|count|
    //+---------+-----+
    //|Developer|    2|
    //+---------+-----+
    //复杂聚合
    df1.groupBy(col("job").alias("post")).agg(
      functions.count(col("name")).as("nums")
      , functions.sum(col("age")).as("total_age")
      , functions.avg(col("age")).as("aver_age")
    ).show()
    //+---------+----+---------+--------+
    //|     post|nums|total_age|aver_age|
    //+---------+----+---------+--------+
    //|Developer|   2|     62.0|    31.0|
    //+---------+----+---------+--------+


    spark.stop
  }

}

原文地址:https://blog.csdn.net/m0_37559973/article/details/133686021

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_48662.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注