1.Spark SQL 简介
Spark SQL是Apache Spark的一个模块,用于处理结构化数据。它提供了两种编程抽象:DataFrame和DataSet,并作为分布式SQL查询引擎。
DataFrame是一个分布式的数据集合,类似于传统数据库中的表,带有schema信息。DataFrame是以列式格式进行存储的,每一列可以是不同的数据类型(如整数、字符串等),并且每一列都有一个名称。
Dataset是在Spark 1.6中新增的一个接口,它提供了RDD(强类型,可以使用强大的lambda函数)的优点,以及Spark SQL优化执行引擎的优点。数据集可以从JVM对象构建,然后使用函数式转换(map,flatMap,filter等)进行操作。
2.SparkSession
SparkSession是Spark 2.0版本引入的新概念,它是对SparkContext的升级和扩展。SparkSession提供了更加统一的API(在Spark 1.x版本中,每个API都需要一个对应的Context:例如StreamingContext、SQLContext、HiveContext等),包括SQL查询、数据转换、数据读写、注册数据源、管理配置等操作。
SparkSession是Spark 应用程序的新入口点,要创建一个基本的SparkSession,只需使用SparkSession.builder即可:
package com.yichenkeji.demo.sparkjava;
import org.apache.spark.sql.SparkSession;
public class SparkSqlDemo {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.master("local")
.appName("Spark SQL Demo")
// .config("spark.some.config.option", "some-value")
.getOrCreate();
spark.stop();
}
}
package com.yichenkeji.demo.sparkscala
import org.apache.spark.sql.SparkSession
object SparkSqlDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local")
.appName("Spark SQL Demo")
// .config("spark.some.config.option", "some-value")
.getOrCreate()
spark.stop
}
}
3.Spark SQL 数据的读写
3.1 读写 TXT 文件
Spark SQL提供了spark.read().text(“file_name“)来将文本文件或目录下的文本文件读取到Spark DataFrame中,以及dataframe.write().text(“path“)来将数据写入文本文件。在读取文本文件时,每一行默认成为具有字符串“value”列的一行。在写入文本文件时以及dataframe对象中的每一行也只能有一列,否则会提示“Text data source supports only a single column, and you have 2 columns.”错误。
package com.yichenkeji.demo.sparkjava;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class SparkSqlTxtDemo {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.master("local")
.appName("Spark SQL TXT Demo")
// .config("spark.some.config.option", "some-value")
.getOrCreate();
//读取txt文件
Dataset<Row> df1 = spark.read()
// .option("lineSep", ",")//自定义行分隔符:默认是`r`, `rn` and `n`
.text("F:\24\spark\data\people.txt");
df1.show();
//读取txt文件的另一种方式
Dataset<Row> df2 = spark.read().format("text")
.load("F:\24\spark\data\people.txt");
df2.show();
//写入txt文件
df1.write().text("F:\24\spark\data\people_1.txt");
//写入txt文件的另外一种方式
df2.write().format("text").save("F:\24\spark\data\people_2.txt");
spark.stop();
}
}
package com.yichenkeji.demo.sparkscala
import org.apache.spark.sql.SparkSession
object SparkSqlTxtDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local")
.appName("Spark SQL TXT Demo")
// .config("spark.some.config.option", "some-value")
.getOrCreate()
//读取txt文件
val df1 = spark.read.text("F:\24\spark\data\people.txt")
df1.show
//读取txt文件的另一种方式
val df2 = spark.read.format("text").load("F:\24\spark\data\people.txt")
df2.show
//写入txt文件
df1.write.text("F:\24\spark\data\people_1.txt")
//写入txt文件的另一种方式
df2.write.format("text").save("F:\24\spark\data\people_2.txt")
spark.stop
}
}
3.2 读写 CSV 文件
Spark SQL提供了spark.read().csv(“file_name“)将CSV格式的文件或文件夹读取到Spark DataFrame中,以及dataframe.write().csv(“path“)将数据写入CSV文件。
package com.yichenkeji.demo.sparkjava;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class SparkSqlCsvDemo {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.master("local")
.appName("Spark SQL CSV Demo")
// .config("spark.some.config.option", "some-value")
.getOrCreate();
//读取CSV文件:CSV带有表头
Dataset<Row> df1 = spark.read()
.option("header", "true")//设置是否有表头
.option("delimiter", ";") //设置分隔符,默认是逗号
.csv("F:\24\spark\data\people.csv");
df1.show();
//读取CSV文件:CSV带有表头
//csv文件的另外一种读取方式
Dataset<Row> df2 = spark.read().format("csv")
.option("header", "true")//设置是否有表头
.option("delimiter", ";") //设置分隔符,默认是逗号
.load("F:\24\spark\data\people.csv");
df2.show();
//写入CSV文件
df1.write().csv("F:\24\spark\data\people_1.csv");
//写入CSV文件的另外一种方式
df2.write().format("csv").save("F:\24\spark\data\people_2.csv");
spark.stop();
}
}
package com.yichenkeji.demo.sparkscala
import org.apache.spark.sql.SparkSession
object SparkSqlCsvDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local")
.appName("Spark SQL CSV Demo")
// .config("spark.some.config.option", "some-value")
.getOrCreate()
//读取CSV文件
val df1 = spark.read
.option("header", "true")//是否有表头
.option("delimiter", ";") //分隔符
.csv("F:\24\spark\data\people.csv")
df1.show
//读取CSV文件的另一种方式
val df2 = spark.read.format("csv")
.option("header", "true") //是否有表头
.option("delimiter", ";") //分隔符
.load("F:\24\spark\data\people.csv")
df2.show
//写入csv文件
df1.write.csv("F:\24\spark\data\people_1.csv")
//写入csv文件的另一种方式
df2.write.format("csv").save("F:\24\spark\data\people_2.csv")
spark.stop
}
}
3.3 读写 JSON 文件
Spark SQL提供了spark.read().json(“file_name“)将json格式的文件或文件夹读取到Spark DataFrame中,以及dataframe.write().json(“path“)将数据写入json文件。
package com.yichenkeji.demo.sparkjava;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class SparkSqlJsonDemo {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.master("local")
.appName("Spark SQL JSON Demo")
// .config("spark.some.config.option", "some-value")
.getOrCreate();
//读取JSON文件
Dataset<Row> df1 = spark.read()
.json("F:\24\spark\data\people.json");
df1.show();
//读取JSON文件
//JSON文件的另外一种读取方式
Dataset<Row> df2 = spark.read().format("json")
.load("F:\24\spark\data\people.json");
df2.show();
//写入JSON文件
df1.write().json("F:\24\spark\data\people_1.json");
//写入JSON文件的另外一种方式
df2.write().format("csv").save("F:\24\spark\data\people_2.json");
spark.stop();
}
}
- Scala代码实现:
package com.yichenkeji.demo.sparkscala
import org.apache.spark.sql.SparkSession
object SparkSqlJsonDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local")
.appName("Spark SQL JSON Demo")
// .config("spark.some.config.option", "some-value")
.getOrCreate()
//读取JSON文件
val df1 = spark.read
.json("F:\24\spark\data\people.json")
df1.show
//读取JSON文件的另一种方式
val df2 = spark.read.format("json")
.load("F:\24\spark\data\people.json")
df2.show
//写入JSON文件
df1.write.json("F:\24\spark\data\people_1.json")
//写入JSON件的另一种方式
df2.write.format("json").save("F:\24\spark\data\people_2.json")
spark.stop
}
}
3.4 读写 Parquet 文件
Parquet 是一种通用的列式存储格式,Parquet已广泛应用于Hadoop生态圈(MapReduce、Spark、Hive、Impala),是离线数仓的主要数据格式。Spark SQL提供了spark.read().parquet(“file_name“)将parquet格式的文件或文件夹读取到Spark DataFrame中,以及dataframe.write().parquet(“path“)将数据写入parquet文件。
- Java代码实现:
package com.yichenkeji.demo.sparkjava;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class SparkSqlParquetDemo {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.master("local")
.appName("Spark SQL Parquet Demo")
// .config("spark.some.config.option", "some-value")
.getOrCreate();
//读取Parquet文件
Dataset<Row> df1 = spark.read()
.parquet("F:\24\spark\data\users.parquet");
df1.show();
//Parquet件的另外一种读取方式
Dataset<Row> df2 = spark.read().format("parquet")
.load("F:\24\spark\data\users.parquet");
df2.show();
//写入Parquet文件
df1.write().parquet("F:\24\spark\data\users_1.parquet");
//写入Parquet文件的另外一种方式
df2.write().format("parquet").save("F:\24\spark\data\users_2.parquet");
spark.stop();
}
}
- Scala代码实现:
package com.yichenkeji.demo.sparkscala
import org.apache.spark.sql.SparkSession
object SparkSqlParquetDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local")
.appName("Spark SQL Parquet Demo")
// .config("spark.some.config.option", "some-value")
.getOrCreate()
//读取Parquet文件
val df1 = spark.read
.parquet("F:\24\spark\data\users.parquet")
df1.show
//读取Parquet文件的另一种方式
val df2 = spark.read.format("parquet")
.load("F:\24\spark\data\users.parquet")
df2.show
//写入Parquet文件
df1.write.parquet("F:\24\spark\data\users_1.parquet")
//写入Parquet件的另一种方式
df2.write.format("parquet").save("F:\24\spark\data\users_2.parquet")
spark.stop
}
}
3.5 读写 ORC 文件
ORC是一种列式存储结构,ORC最早产生于Apache Hive,用于减小Hadoop文件的存储空间和加速Hive查询。和Parquet一样,ORC也广泛应用于Hadoop生态圈。Spark SQL提供了spark.read().orc(“file_name“)将orc格式的文件或文件夹读取到Spark DataFrame中,以及dataframe.write().orc(“path“)将数据写入orc文件。
- Java代码实现:
package com.yichenkeji.demo.sparkjava;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class SparkSqlOrcDemo {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.master("local")
.appName("Spark SQL ORC Demo")
// .config("spark.some.config.option", "some-value")
.getOrCreate();
//读取ORC文件
Dataset<Row> df1 = spark.read()
.orc("F:\24\spark\data\users.orc");
df1.show();
//ORC文件的另外一种读取方式
Dataset<Row> df2 = spark.read().format("orc")
.load("F:\24\spark\data\users.orc");
df2.show();
//写入ORC文件
df1.write().orc("F:\24\spark\data\users_1.orc");
//写入ORC文件的另外一种方式
df2.write().format("orc").save("F:\24\spark\data\users_2.orc");
spark.stop();
}
}
- Scala代码实现:
package com.yichenkeji.demo.sparkscala
import org.apache.spark.sql.SparkSession
object SparkSqlOrcDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local")
.appName("Spark SQL ORC Demo")
// .config("spark.some.config.option", "some-value")
.getOrCreate()
//读取ORC文件
val df1 = spark.read
.orc("F:\24\spark\data\users.orc")
df1.show
//读取ORC文件的另一种方式
val df2 = spark.read.format("orc")
.load("F:\24\spark\data\users.orc")
df2.show
//写入ORC文件
df1.write.orc("F:\24\spark\data\users_1.orc")
//写入ORC件的另一种方式
df2.write.format("orc").save("F:\24\spark\data\users_2.orc")
spark.stop
}
}
3.6 读写MySQL数据库
Spark SQL 可以通过 JDBC 从关系型数据库中读取数据的方式创建 DataFrame,通过对DataFrame 一系列的计算后,还可以将数据再写回关系型数据库中。
- Java代码实现:
package com.yichenkeji.demo.sparkjava;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.util.Properties;
public class SparkSqlMySQLDemo {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.master("local")
.appName("Spark SQL MySQL Demo")
// .config("spark.some.config.option", "some-value")
.getOrCreate();
String url = "jdbc:mysql://localhost:3307/demo?useSSL=false";
String user = "root";
String password = "wsx-123";
String driver = "com.mysql.cj.jdbc.Driver";
String table = "demo.dim_area";
Properties properties = new Properties();
properties.setProperty("user",user);
properties.setProperty("password",password);
properties.setProperty("driver",driver);
properties.setProperty("url",url);
properties.setProperty("useUnicode","true");
properties.setProperty("characterEncoding","utf-8");
//读取MySQL文件
Dataset<Row> df1 = spark.read()
.jdbc(url,table,properties);
df1.show(2);
//MySQL的另外一种读取方式
Dataset<Row> df2 = spark.read().format("jdbc")
.option("url", url)
.option("dbtable", table)
.option("user", user)
.option("password", password)
.load();
df2.show(3);
// //写入MySQL文件
df1.write().mode(SaveMode.Overwrite).jdbc(url,"dim_area_1",properties);
// //写入MySQL的另外一种方式
df2.write().mode(SaveMode.Overwrite).format("jdbc")
.option("url", url)
.option("dbtable", "dim_area_2")
.option("user", user)
.option("password", password)
.save();
spark.stop();
}
}
- Scala代码实现:
package com.yichenkeji.demo.sparkscala
import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
import java.util.Properties
object SparkSqlMySQLDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local")
.appName("Spark SQL MySQL Demo")
// .config("spark.some.config.option", "some-value")
.getOrCreate()
val url = "jdbc:mysql://localhost:3307/demo?useSSL=false"
val user = "root"
val password = "wsx-123"
val driver = "com.mysql.cj.jdbc.Driver"
val table = "demo.dim_area"
val properties = new Properties
properties.setProperty("user", user)
properties.setProperty("password", password)
properties.setProperty("driver", driver)
properties.setProperty("url", url)
properties.setProperty("useUnicode", "true")
properties.setProperty("characterEncoding", "utf-8")
//读取MySQL文件
val df1 = spark.read.jdbc(url, table, properties)
df1.show(2)
//MySQL的另外一种读取方式
val df2 = spark.read.format("jdbc")
.option("url", url)
.option("dbtable", table)
.option("user", user)
.option("password", password)
.load
df2.show(3)
//写入MySQL文件
df1.write.mode(SaveMode.Overwrite).jdbc(url, "dim_area_1", properties)
//写入MySQL的另外一种方式
df2.write.mode(SaveMode.Overwrite).format("jdbc")
.option("url", url)
.option("dbtable", "dim_area_2")
.option("user", user)
.option("password", password)
.save
spark.stop
}
}
注意:通过jdbc方式写入mysql数据库是,dataframe中的字段在保存的表中必须存在,否则报错。
4.Spark SQL 语法
4.1 SQL 语法
SparkSession中的sql函数允许应用程序编程地运行SQL查询,并将结果作为DataFrame返回。这种方式的查询必须要有临时视图或者全局视图来辅助。
- Java代码实现:
package com.yichenkeji.demo.sparkjava;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class SparkSqlDemo {
public static void main(String[] args) throws AnalysisException {
SparkSession spark = SparkSession
.builder()
.master("local")
.appName("Spark SQL Demo")
// .config("spark.some.config.option", "some-value")
.getOrCreate();
//读取CSV文件:CSV带有表头
Dataset<Row> df1 = spark.read()
.option("header", "true")//设置是否有表头
.option("delimiter", ";") //设置分隔符,默认是逗号
.csv("F:\24\spark\data\people.csv");
df1.show();
//创建临时视图
df1.createOrReplaceTempView("people");
//创建全局临时视图
df1.createGlobalTempView("people");
Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
sqlDF.show();
//+-----+---+---------+
//| name|age| job|
//+-----+---+---------+
//|Jorge| 30|Developer|
//| Bob| 32|Developer|
//+-----+---+---------+
//全局临时视图保存在global_temp数据库中,使用全局临时视图时需要全路径访问,如:global_temp.people
Dataset<Row> sqlDF2 = spark.sql("SELECT name,age,job FROM global_temp.people WHERE age > 30");
sqlDF2.show();
//+-----+---+---------+
//| name|age| job|
//+-----+---+---------+
//| Bob| 32|Developer|
//+-----+---+---------+
spark.stop();
}
}
- Scala代码实现:
package com.yichenkeji.demo.sparkscala
import org.apache.spark.sql.SparkSession
object SparkSqlDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local")
.appName("Spark SQL Demo")
// .config("spark.some.config.option", "some-value")
.getOrCreate()
//读取CSV文件
val df1 = spark.read
.option("header", "true") //是否有表头
.option("delimiter", ";") //分隔符
.csv("F:\24\spark\data\people.csv")
df1.show
// 创建临时视图
df1.createOrReplaceTempView("people")
// 创建全局临时视图
df1.createGlobalTempView("people")
//使用临时视图
spark.sql("SELECT * FROM people").show()
//+-----+---+---------+
//| name|age| job|
//+-----+---+---------+
//|Jorge| 30|Developer|
//| Bob| 32|Developer|
//+-----+---+---------+
// 全局临时视图保存在global_temp数据库中,使用全局临时视图时需要全路径访问,如:global_temp.people
spark.sql("SELECT name,age,job FROM global_temp.people where age > 30").show()
//+-----+---+---------+
//| name|age| job|
//+-----+---+---------+
//| Bob| 32|Developer|
//+-----+---+---------+
spark.stop
}
}
4.2 DSL 语法
DataFrame DataFrame为Scala、Java、Python和R提供了针对结构化数据操作的领域特定语言。使用 DSL 语法不必创建临时视图。
- Java代码实现:
package com.yichenkeji.demo.sparkjava;
import org.apache.spark.sql.*;
import static org.apache.spark.sql.functions.col;
public class SparkSqlDSLDemo {
public static void main(String[] args) throws AnalysisException {
SparkSession spark = SparkSession
.builder()
.master("local")
.appName("Spark SQL DSL")
// .config("spark.some.config.option", "some-value")
.getOrCreate();
//读取CSV文件:CSV带有表头
Dataset<Row> df1 = spark.read()
.option("header", "true")//设置是否有表头
.option("delimiter", ";") //设置分隔符,默认是逗号
.csv("F:\24\spark\data\people.csv");
df1.show();
// 打印字段信息
df1.printSchema();
// root
// |-- name: string (nullable = true)
// |-- age: string (nullable = true)
// |-- job: string (nullable = true)
//查询指定字段
df1.select("name","age","job").show();
//+-----+---+---------+
//| name|age| job|
//+-----+---+---------+
//|Jorge| 30|Developer|
//| Bob| 32|Developer|
//+-----+---+---------+
//查询指定字段并设置别名,对查询的字段进行运算
df1.select(col("name").alias("name")
,col("age").plus(1).alias("age")
,col("job"))
.show();
//+-----+---+---------+
//| name|age| job|
//+-----+---+---------+
//|Jorge| 31|Developer|
//| Bob| 33|Developer|
//+-----+---+---------+
//过滤数据
df1.filter(col("age").gt(30)).show();
// df1.where(col("age").gt(30)).show();
//+-----+---+---------+
//| name|age| job|
//+-----+---+---------+
//| Bob| 32|Developer|
//+-----+---+---------+
//分组聚合
df1.groupBy("job").count().show();
//+---------+-----+
//| job|count|
//+---------+-----+
//|Developer| 2|
//+---------+-----+
//复杂聚合
df1.groupBy(col("job").alias("post")).agg(
functions.count(col("name")).as("nums")
,functions.sum(col("age")).as("total_age")
,functions.avg(col("age")).as("aver_age")
).show();
//+---------+----+---------+--------+
//| post|nums|total_age|aver_age|
//+---------+----+---------+--------+
//|Developer| 2| 62.0| 31.0|
//+---------+----+---------+--------+
spark.stop();
}
}
- Scala代码实现:
package com.yichenkeji.demo.sparkscala
import org.apache.spark.sql.{SparkSession, functions}
import org.apache.spark.sql.functions.col
object SparkSqlDSLDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local")
.appName("Spark SQL DSL")
// .config("spark.some.config.option", "some-value")
.getOrCreate()
//读取CSV文件
val df1 = spark.read
.option("header", "true") //是否有表头
.option("delimiter", ";") //分隔符
.csv("F:\24\spark\data\people.csv")
df1.show
//打印字段信息
df1.printSchema
// root
// |-- name: string (nullable = true)
// |-- age: string (nullable = true)
// |-- job: string (nullable = true)
//查询指定字段
df1.select("name", "age", "job").show
//+-----+---+---------+
//| name|age| job|
//+-----+---+---------+
//|Jorge| 30|Developer|
//| Bob| 32|Developer|
//+-----+---+---------+
//查询指定字段并设置别名,对查询的字段进行运算
import spark.implicits._
df1.select($"name" as "name", ($"age" + 1) as "age", $"job" as "job").show
//+-----+---+---------+
//| name|age| job|
//+-----+---+---------+
//|Jorge| 31|Developer|
//| Bob| 33|Developer|
//+-----+---+---------+
//过滤数据
df1.filter($"age" > 30).show
// df1.where($"age" > 30).show
//+-----+---+---------+
//| name|age| job|
//+-----+---+---------+
//| Bob| 32|Developer|
//+-----+---+---------+
//分组聚合
df1.groupBy("job").count.show
//+---------+-----+
//| job|count|
//+---------+-----+
//|Developer| 2|
//+---------+-----+
//复杂聚合
df1.groupBy(col("job").alias("post")).agg(
functions.count(col("name")).as("nums")
, functions.sum(col("age")).as("total_age")
, functions.avg(col("age")).as("aver_age")
).show()
//+---------+----+---------+--------+
//| post|nums|total_age|aver_age|
//+---------+----+---------+--------+
//|Developer| 2| 62.0| 31.0|
//+---------+----+---------+--------+
spark.stop
}
}
原文地址:https://blog.csdn.net/m0_37559973/article/details/133686021
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_48662.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!