《Spark 官方文档》Spark SQL, DataFrames 以及 Datasets 编程指南

spark-1.6.0 [原文地址]

Spark SQL, DataFrames 以及 Datasets 编程指南

概要

Spark SQL是Spark中处理结构化数据的模块。与基础的Spark RDD API不同,Spark SQL的接口提供了更多关于数据的结构信息和计算任务的运行时信息。在Spark内部,Spark SQL会能够用于做优化的信息比RDD API更多一些。Spark SQL如今有了三种不同的API:SQL语句、DataFrame API和最新的Dataset API。不过真正运行计算的时候,无论你使用哪种API或语言,Spark SQL使用的执行引擎都是同一个。这种底层的统一,使开发者可以在不同的API之间来回切换,你可以选择一种最自然的方式,来表达你的需求。

 

本文中所有的示例都使用Spark发布版本中自带的示例数据,并且可以在spark-shell、pyspark shell以及sparkR shell中运行。

SQL

Spark SQL的一种用法是直接执行SQL查询语句,你可使用最基本的SQL语法,也可以选择HiveQL语法。Spark SQL可以从已有的Hive中读取数据。更详细的请参考Hive Tables 这一节。如果用其他编程语言运行SQL,Spark SQL将以DataFrame返回结果。你还可以通过命令行command-line 或者 JDBC/ODBC 使用Spark SQL。

DataFrames

DataFrame是一种分布式数据集合,每一条数据都由几个命名字段组成。概念上来说,她和关系型数据库的表 或者 R和Python中的data frame等价,只不过在底层,DataFrame采用了更多优化。DataFrame可以从很多数据源(sources)加载数据并构造得到,如:结构化数据文件,Hive中的表,外部数据库,或者已有的RDD。

DataFrame API支持ScalaJavaPython, and R

Datasets

Dataset是Spark-1.6新增的一种API,目前还是实验性的。Dataset想要把RDD的优势(强类型,可以使用lambda表达式函数)和Spark SQL的优化执行引擎的优势结合到一起。Dataset可以由JVM对象构建(constructed )得到,而后Dataset上可以使用各种transformation算子(map,flatMap,filter 等)。

Dataset API 对 Scala 和 Java的支持接口是一致的,但目前还不支持Python,不过Python自身就有语言动态特性优势(例如,你可以使用字段名来访问数据,row.columnName)。对Python的完整支持在未来的版本会增加进来。

入门

入口:SQLContext

Spark SQL所有的功能入口都是SQLContext 类,及其子类。不过要创建一个SQLContext对象,首先需要有一个SparkContext对象。

val sc: SparkContext // 假设已经有一个 SparkContext 对象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 用于包含RDD到DataFrame隐式转换操作
import sqlContext.implicits._

除了SQLContext之外,你也可以创建HiveContext,HiveContext是SQLContext 的超集。

除了SQLContext的功能之外,HiveContext还提供了完整的HiveQL语法,UDF使用,以及对Hive表中数据的访问。要使用HiveContext,你并不需要安装Hive,而且SQLContext能用的数据源,HiveContext也一样能用。HiveContext是单独打包的,从而避免了在默认的Spark发布版本中包含所有的Hive依赖。如果这些依赖对你来说不是问题(不会造成依赖冲突等),建议你在Spark-1.3之前使用HiveContext。而后续的Spark版本,将会逐渐把SQLContext升级到和HiveContext功能差不多的状态。

spark.sql.dialect选项可以指定不同的SQL变种(或者叫SQL方言)。这个参数可以在SparkContext.setConf里指定,也可以通过 SQL语句的SET key=value命令指定。对于SQLContext,该配置目前唯一的可选值就是”sql”,这个变种使用一个Spark SQL自带的简易SQL解析器。而对于HiveContext,spark.sql.dialect 默认值为”hiveql”,当然你也可以将其值设回”sql”。仅就目前而言,HiveSQL解析器支持更加完整的SQL语法,所以大部分情况下,推荐使用HiveContext。

创建DataFrame

Spark应用可以用SparkContext创建DataFrame,所需的数据来源可以是已有的RDD(existing RDD),或者Hive表,或者其他数据源(data sources.)

以下是一个从JSON文件创建DataFrame的小栗子:

val sc: SparkContext // 已有的 SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val df = sqlContext.read.json("examples/src/main/resources/people.json")

// 将DataFrame内容打印到stdout
df.show()

DataFrame操作

DataFrame提供了结构化数据的领域专用语言支持,包括ScalaJavaPython and R.

这里我们给出一个结构化数据处理的基本示例:

val sc: SparkContext // 已有的 SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 创建一个 DataFrame
val df = sqlContext.read.json("examples/src/main/resources/people.json")

// 展示 DataFrame 的内容
df.show()
// age  name
// null Michael
// 30   Andy
// 19   Justin

// 打印数据树形结构
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// select "name" 字段
df.select("name").show()
// name
// Michael
// Andy
// Justin

// 展示所有人,但所有人的 age 都加1
df.select(df("name"), df("age") + 1).show()
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

// 筛选出年龄大于21的人
df.filter(df("age") > 21).show()
// age name
// 30  Andy

// 计算各个年龄的人数
df.groupBy("age").count().show()
// age  count
// null 1
// 19   1
// 30   1

DataFrame的完整API列表请参考这里:API Documentation

除了简单的字段引用和表达式支持之外,DataFrame还提供了丰富的工具函数库,包括字符串组装,日期处理,常见的数学函数等。完整列表见这里:DataFrame Function Reference.

编程方式执行SQL查询

SQLContext.sql可以执行一个SQL查询,并返回DataFrame结果。

val sqlContext = ... // 已有一个 SQLContext 对象
val df = sqlContext.sql("SELECT * FROM table")

创建Dataset

Dataset API和RDD类似,不过Dataset不使用Java序列化或者Kryo,而是使用专用的编码器(Encoder )来序列化对象和跨网络传输通信。如果这个编码器和标准序列化都能把对象转字节,那么编码器就可以根据代码动态生成,并使用一种特殊数据格式,这种格式下的对象不需要反序列化回来,就能允许Spark进行操作,如过滤、排序、哈希等。

// 对普通类型数据的Encoder是由 importing sqlContext.implicits._ 自动提供的
val ds = Seq(1, 2, 3).toDS()
ds.map(_ + 1).collect() // 返回: Array(2, 3, 4)

// 以下这行不仅定义了case class,同时也自动为其创建了Encoder
case class Person(name: String, age: Long)
val ds = Seq(Person("Andy", 32)).toDS()

// DataFrame 只需提供一个和数据schema对应的class即可转换为 Dataset。Spark会根据字段名进行映射。
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path).as[Person]

和RDD互操作

Spark SQL有两种方法将RDD转为DataFrame。

1. 使用反射机制,推导包含指定类型对象RDD的schema。这种基于反射机制的方法使代码更简洁,而且如果你事先知道数据schema,推荐使用这种方式;

2. 编程方式构建一个schema,然后应用到指定RDD上。这种方式更啰嗦,但如果你事先不知道数据有哪些字段,或者数据schema是运行时读取进来的,那么你很可能需要用这种方式。

利用反射推导schema

Spark SQL的Scala接口支持自动将包含case class对象的RDD转为DataFrame。对应的case class定义了表的schema。case class的参数名通过反射,映射为表的字段名。case class还可以嵌套一些复杂类型,如Seq和Array。RDD隐式转换成DataFrame后,可以进一步注册成表。随后,你就可以对表中数据使用SQL语句查询了。

// sc 是已有的 SparkContext 对象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 为了支持RDD到DataFrame的隐式转换
import sqlContext.implicits._

// 定义一个case class.
// 注意:Scala 2.10的case class最多支持22个字段,要绕过这一限制,
// 你可以使用自定义class,并实现Product接口。当然,你也可以改用编程方式定义schema
case class Person(name: String, age: Int)

// 创建一个包含Person对象的RDD,并将其注册成table
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")

// sqlContext.sql方法可以直接执行SQL语句
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

// SQL查询的返回结果是一个DataFrame,且能够支持所有常见的RDD算子
// 查询结果中每行的字段可以按字段索引访问:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

// 或者按字段名访问:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)

// row.getValuesMap[T] 会一次性返回多列,并以Map[String, T]为返回结果类型
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// 返回结果: Map("name" -> "Justin", "age" -> 19)

编程方式定义Schema

如果不能事先通过case class定义schema(例如,记录的字段结构是保存在一个字符串,或者其他文本数据集中,需要先解析,又或者字段对不同用户有所不同),那么你可能需要按以下三个步骤,以编程方式的创建一个DataFrame:

  1. 从已有的RDD创建一个包含Row对象的RDD
  2. 用StructType创建一个schema,和步骤1中创建的RDD的结构相匹配
  3. 把得到的schema应用于包含Row对象的RDD,调用这个方法来实现这一步:SQLContext.createDataFrame

For example:

例如:

// sc 是已有的SparkContext对象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 创建一个RDD
val people = sc.textFile("examples/src/main/resources/people.txt")

// 数据的schema被编码与一个字符串中
val schemaString = "name age"

// Import Row.
import org.apache.spark.sql.Row;

// Import Spark SQL 各个数据类型
import org.apache.spark.sql.types.{StructType,StructField,StringType};

// 基于前面的字符串生成schema
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// 将RDD[people]的各个记录转换为Rows,即:得到一个包含Row对象的RDD
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

// 将schema应用到包含Row对象的RDD上,得到一个DataFrame
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

// 将DataFrame注册为table
peopleDataFrame.registerTempTable("people")

// 执行SQL语句
val results = sqlContext.sql("SELECT name FROM people")

// SQL查询的结果是DataFrame,且能够支持所有常见的RDD算子
// 并且其字段可以以索引访问,也可以用字段名访问
results.map(t => "Name: " + t(0)).collect().foreach(println)

数据源

Spark SQL支持基于DataFrame操作一系列不同的数据源。DataFrame既可以当成一个普通RDD来操作,也可以将其注册成一个临时表来查询。把DataFrame注册为table之后,你就可以基于这个table执行SQL语句了。本节将描述加载和保存数据的一些通用方法,包含了不同的Spark数据源,然后深入介绍一下内建数据源可用选项。

通用加载/保存函数

在最简单的情况下,所有操作都会以默认类型数据源来加载数据(默认是Parquet,除非修改了spark.sql.sources.default 配置)。

val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

手动指定选项

你也可以手动指定数据源,并设置一些额外的选项参数。数据源可由其全名指定(如,org.apache.spark.sql.parquet),而对于内建支持的数据源,可以使用简写名(json, parquet, jdbc)。任意类型数据源创建的DataFrame都可以用下面这种语法转成其他类型数据格式。

val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

直接对文件使用SQL

Spark SQL还支持直接对文件使用SQL查询,不需要用read方法把文件加载进来。

val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

保存模式

Save操作有一个可选参数SaveMode,用这个参数可以指定如何处理数据已经存在的情况。很重要的一点是,这些保存模式都没有加锁,所以其操作也不是原子性的。另外,如果使用Overwrite模式,实际操作是,先删除数据,再写新数据。

仅Scala/Java 所有支持的语言 含义
SaveMode.ErrorIfExists (default) "error" (default) (默认模式)从DataFrame向数据源保存数据时,如果数据已经存在,则抛异常。
SaveMode.Append "append" 如果数据或表已经存在,则将DataFrame的数据追加到已有数据的尾部。
SaveMode.Overwrite "overwrite" 如果数据或表已经存在,则用DataFrame数据覆盖之。
SaveMode.Ignore "ignore" 如果数据已经存在,那就放弃保存DataFrame数据。这和SQL里CREATE TABLE IF NOT EXISTS有点类似。

保存到持久化表

在使用HiveContext的时候,DataFrame可以用saveAsTable方法,将数据保存成持久化的表。与registerTempTable不同,saveAsTable会将DataFrame的实际数据内容保存下来,并且在HiveMetastore中创建一个游标指针。持久化的表会一直保留,即使Spark程序重启也没有影响,只要你连接到同一个metastore就可以读取其数据。读取持久化表时,只需要用用表名作为参数,调用SQLContext.table方法即可得到对应DataFrame。

默认情况下,saveAsTable会创建一个”managed table“,也就是说这个表数据的位置是由metastore控制的。同样,如果删除表,其数据也会同步删除。

Parquet文件

Parquet 是一种流行的列式存储格式。Spark SQL提供对Parquet文件的读写支持,而且Parquet文件能够自动保存原始数据的schema。写Parquet文件的时候,所有的字段都会自动转成nullable,以便向后兼容。

编程方式加载数据

仍然使用上面例子中的数据:

// 我们继续沿用之前例子中的sqlContext对象
// 为了支持RDD隐式转成DataFrame
import sqlContext.implicits._

val people: RDD[Person] = ... // 和上面例子中相同,一个包含case class对象的RDD

// 该RDD将隐式转成DataFrame,然后保存为parquet文件
people.write.parquet("people.parquet")

// 读取上面保存的Parquet文件(多个文件 - Parquet保存完其实是很多个文件)。Parquet文件是自描述的,文件中保存了schema信息
// 加载Parquet文件,并返回DataFrame结果
val parquetFile = sqlContext.read.parquet("people.parquet")

// Parquet文件(多个)可以注册为临时表,然后在SQL语句中直接查询
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

分区发现

像Hive这样的系统,一个很常用的优化手段就是表分区。在一个支持分区的表中,数据是保存在不同的目录中的,并且将分区键以编码方式保存在各个分区目录路径中。Parquet数据源现在也支持自动发现和推导分区信息。例如,我们可以把之前用的人口数据存到一个分区表中,其目录结构如下所示,其中有2个额外的字段,gender和country,作为分区键:

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

在这个例子中,如果需要读取Parquet文件数据,我们只需要把 path/to/table 作为参数传给 SQLContext.read.parquet 或者 SQLContext.read.load。Spark SQL能够自动的从路径中提取出分区信息,随后返回的DataFrame的schema如下:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

注意,分区键的数据类型将是自动推导出来的。目前,只支持数值类型和字符串类型数据作为分区键。

有的用户可能不想要自动推导出来的分区键数据类型。这种情况下,你可以通过 spark.sql.sources.partitionColumnTypeInference.enabled (默认是true)来禁用分区键类型推导。禁用之后,分区键总是被当成字符串类型。

从Spark-1.6.0开始,分区发现默认只在指定目录的子目录中进行。以上面的例子来说,如果用户把 path/to/table/gender=male 作为参数传给 SQLContext.read.parquet 或者 SQLContext.read.load,那么gender就不会被作为分区键。如果用户想要指定分区发现的基础目录,可以通过basePath选项指定。例如,如果把 path/to/table/gender=male作为数据目录,并且将basePath设为 path/to/table,那么gender仍然会最为分区键。

Schema合并

像ProtoBuffer、Avro和Thrift一样,Parquet也支持schema演变。用户从一个简单的schema开始,逐渐增加所需的新字段。这样的话,用户最终会得到多个schema不同但互相兼容的Parquet文件。目前,Parquet数据源已经支持自动检测这种情况,并合并所有文件的schema。

因为schema合并相对代价比较大,并且在多数情况下不是必要的,所以从Spark-1.5.0之后,默认是被禁用的。你可以这样启用这一功能:

  1. 读取Parquet文件时,将选项mergeSchema设为true(见下面的示例代码)
  2. 或者,将全局选项spark.sql.parquet.mergeSchema设为true
// 继续沿用之前的sqlContext对象
// 为了支持RDD隐式转换为DataFrame
import sqlContext.implicits._

// 创建一个简单的DataFrame,存到一个分区目录中
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("data/test_table/key=1")

// 创建另一个DataFrame放到新的分区目录中,
// 并增加一个新字段,丢弃一个老字段
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("data/test_table/key=2")

// 读取分区表
val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
df3.printSchema()

// 最终的schema将由3个字段组成(single,double,triple)
// 并且分区键出现在目录路径中
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)

Hive metastore Parquet table转换

在读写Hive metastore Parquet 表时,Spark SQL用的是内部的Parquet支持库,而不是Hive SerDe,因为这样性能更好。这一行为是由spark.sql.hive.convertMetastoreParquet 配置项来控制的,而且默认是启用的。

Hive/Parquet schema调和

Hive和Parquet在表结构处理上主要有2个不同点:

  1. Hive大小写敏感,而Parquet不是
  2. Hive所有字段都是nullable的,而Parquet需要显示设置

由于以上原因,我们必须在Hive metastore Parquet table转Spark SQL Parquet table的时候,对Hive metastore schema做调整,调整规则如下:

  1. 两种schema中字段名和字段类型必须一致(不考虑nullable)。调和后的字段类型必须在Parquet格式中有相对应的数据类型,所以nullable是也是需要考虑的。
  2. 调和后Spark SQL Parquet table schema将包含以下字段:
    • 只出现在Parquet schema中的字段将被丢弃
    • 只出现在Hive metastore schema中的字段将被添加进来,并显式地设为nullable。

刷新元数据

Spark SQL会缓存Parquet元数据以提高性能。如果Hive metastore Parquet table转换被启用的话,那么转换过来的schema也会被缓存。这时候,如果这些表由Hive或其他外部工具更新了,你必须手动刷新元数据。

// 注意,这里sqlContext是一个HiveContext
sqlContext.refreshTable("my_table")

配置

Parquet配置可以通过 SQLContext.setConf 或者 SQL语句中 SET key=value来指定。

属性名 默认值 含义
spark.sql.parquet.binaryAsString false 有些老系统,如:特定版本的Impala,Hive,或者老版本的Spark SQL,不区分二进制数据和字符串类型数据。这个标志的意思是,让Spark SQL把二进制数据当字符串处理,以兼容老系统。
spark.sql.parquet.int96AsTimestamp true 有些老系统,如:特定版本的Impala,Hive,把时间戳存成INT96。这个配置的作用是,让Spark SQL把这些INT96解释为timestamp,以兼容老系统。
spark.sql.parquet.cacheMetadata true 缓存Parquet schema元数据。可以提升查询静态数据的速度。
spark.sql.parquet.compression.codec gzip 设置Parquet文件的压缩编码格式。可接受的值有:uncompressed, snappy, gzip(默认), lzo
spark.sql.parquet.filterPushdown true 启用过滤器下推优化,可以讲过滤条件尽量推导最下层,已取得性能提升
spark.sql.hive.convertMetastoreParquet true 如果禁用,Spark SQL将使用Hive SerDe,而不是内建的对Parquet tables的支持
spark.sql.parquet.output.committer.class org.apache.parquet.hadoop.
ParquetOutputCommitter
Parquet使用的数据输出类。这个类必须是 org.apache.hadoop.mapreduce.OutputCommitter的子类。一般来说,它也应该是 org.apache.parquet.hadoop.ParquetOutputCommitter的子类。注意:1. 如果启用spark.speculation, 这个选项将被自动忽略

2. 这个选项必须用hadoop configuration设置,而不是Spark SQLConf

3. 这个选项会覆盖 spark.sql.sources.outputCommitterClass

Spark SQL有一个内建的org.apache.spark.sql.parquet.DirectParquetOutputCommitter, 这个类的在输出到S3的时候比默认的ParquetOutputCommitter类效率高。

spark.sql.parquet.mergeSchema false 如果设为true,那么Parquet数据源将会merge 所有数据文件的schema,否则,schema是从summary file获取的(如果summary file没有设置,则随机选一个)

JSON数据集

Spark SQL在加载JSON数据的时候,可以自动推导其schema并返回DataFrame。用SQLContext.read.json读取一个包含String的RDD或者JSON文件,即可实现这一转换。

注意,通常所说的json文件只是包含一些json数据的文件,而不是我们所需要的JSON格式文件。JSON格式文件必须每一行是一个独立、完整的的JSON对象。因此,一个常规的多行json文件经常会加载失败。

// sc是已有的SparkContext对象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 数据集是由路径指定的
// 路径既可以是单个文件,也可以还是存储文本文件的目录
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path)

// 推导出来的schema,可由printSchema打印出来
people.printSchema()
// root
//  |-- age: integer (nullable = true)
//  |-- name: string (nullable = true)

// 将DataFrame注册为table
people.registerTempTable("people")

// 跑SQL语句吧!
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// 另一种方法是,用一个包含JSON字符串的RDD来创建DataFrame
val anotherPeopleRDD = sc.parallelize(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.read.json(anotherPeopleRDD)

Hive表

Spark SQL支持从Apache Hive读写数据。然而,Hive依赖项太多,所以没有把Hive包含在默认的Spark发布包里。要支持Hive,需要在编译spark的时候增加-Phive和-Phive-thriftserver标志。这样编译打包的时候将会把Hive也包含进来。注意,hive的jar包也必须出现在所有的worker节点上,访问Hive数据时候会用到(如:使用hive的序列化和反序列化SerDes时)。

Hive配置在conf/目录下hive-site.xml,core-site.xml(安全配置),hdfs-site.xml(HDFS配置)文件中。请注意,如果在YARN cluster(yarn-cluster mode)模式下执行一个查询的话,lib_mananged/jar/下面的datanucleus 的jar包,和conf/下的hive-site.xml必须在驱动器(driver)和所有执行器(executor)都可用。一种简便的方法是,通过spark-submit命令的–jars和–file选项来提交这些文件。

如果使用Hive,则必须构建一个HiveContext,HiveContext是派生于SQLContext的,添加了在Hive Metastore里查询表的支持,以及对HiveQL的支持。用户没有现有的Hive部署,也可以创建一个HiveContext。如果没有在hive-site.xml里配置,那么HiveContext将会自动在当前目录下创建一个metastore_db目录,再根据HiveConf设置创建一个warehouse目录(默认/user/hive/warehourse)。所以请注意,你必须把/user/hive/warehouse的写权限赋予启动spark应用程序的用户。

// sc是一个已有的SparkContext对象
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// 这里用的是HiveQL
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)

和不同版本的Hive Metastore交互

Spark SQL对Hive最重要的支持之一就是和Hive metastore进行交互,这使得Spark SQL可以访问Hive表的元数据。从Spark-1.4.0开始,Spark SQL有专门单独的二进制build版本,可以用来访问不同版本的Hive metastore,其配置表如下。注意,不管所访问的hive是什么版本,Spark SQL内部都是以Hive 1.2.1编译的,而且内部使用的Hive类也是基于这个版本(serdes,UDFs,UDAFs等)

以下选项可用来配置Hive版本以便访问其元数据:

属性名 默认值 含义
spark.sql.hive.metastore.version 1.2.1 Hive metastore版本,可选的值为0.12.0 到 1.2.1
spark.sql.hive.metastore.jars builtin 初始化HiveMetastoreClient的jar包。这个属性可以是以下三者之一:

  1. builtin

目前内建为使用Hive-1.2.1,编译的时候启用-Phive,则会和spark一起打包。如果没有-Phive,那么spark.sql.hive.metastore.version要么是1.2.1,要就是未定义

  1. maven

使用maven仓库下载的jar包版本。这个选项建议不要再生产环境中使用

  1. JVM格式的classpath。这个classpath必须包含所有Hive及其依赖的jar包,且包含正确版本的hadoop。这些jar包必须部署在driver节点上,如果你使用yarn-cluster模式,那么必须确保这些jar包也随你的应用程序一起打包
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,
org.postgresql,
com.microsoft.sqlserver,
oracle.jdbc
一个逗号分隔的类名前缀列表,这些类使用classloader加载,且可以在Spark SQL和特定版本的Hive间共享。例如,用来访问hive metastore 的JDBC的driver就需要这种共享。其他需要共享的类,是与某些已经共享的类有交互的类。例如,自定义的log4j appender
spark.sql.hive.metastore.barrierPrefixes (empty) 一个逗号分隔的类名前缀列表,这些类在每个Spark SQL所访问的Hive版本中都会被显式的reload。例如,某些在共享前缀列表(spark.sql.hive.metastore.sharedPrefixes)中声明为共享的Hive UD函数

用JDBC连接其他数据库

Spark SQL也可以用JDBC访问其他数据库。这一功能应该优先于使用JdbcRDD。因为它返回一个DataFrame,而DataFrame在Spark SQL中操作更简单,且更容易和来自其他数据源的数据进行交互关联。JDBC数据源在java和python中用起来也很简单,不需要用户提供额外的ClassTag。(注意,这与Spark SQL JDBC server不同,Spark SQL JDBC server允许其他应用执行Spark SQL查询)

首先,你需要在spark classpath中包含对应数据库的JDBC driver,下面这行包括了用于访问postgres的数据库driver

SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell

远程数据库的表可以通过Data Sources API,用DataFrame或者SparkSQL 临时表来装载。以下是选项列表:

属性名 含义
url 需要连接的JDBC URL
dbtable 需要读取的JDBC表。注意,任何可以填在SQL的where子句中的东西,都可以填在这里。(既可以填完整的表名,也可填括号括起来的子查询语句)
driver JDBC driver的类名。这个类必须在master和worker节点上都可用,这样各个节点才能将driver注册到JDBC的子系统中。
partitionColumn, lowerBound, upperBound, numPartitions 这几个选项,如果指定其中一个,则必须全部指定。他们描述了多个worker如何并行的读入数据,并将表分区。partitionColumn必须是所查询的表中的一个数值字段。注意,lowerBound和upperBound只是用于决定分区跨度的,而不是过滤表中的行。因此,表中所有的行都会被分区然后返回。
fetchSize JDBC fetch size,决定每次获取多少行数据。在JDBC驱动上设成较小的值有利于性能优化(如,Oracle上设为10)
val jdbcDF = sqlContext.read.format("jdbc").options(
  Map("url" -> "jdbc:postgresql:dbserver",
  "dbtable" -> "schema.tablename")).load()

疑难解答

  • JDBC driver class必须在所有client session或者executor上,对java的原生classloader可见。这是因为Java的DriverManager在打开一个连接之前,会做安全检查,并忽略所有对原声classloader不可见的driver。最简单的一种方法,就是在所有worker节点上修改compute_classpath.sh,并包含你所需的driver jar包。
  • 一些数据库,如H2,会把所有的名字转大写。对于这些数据库,在Spark SQL中必须也使用大写。

性能调整

对于有一定计算量的Spark作业来说,可能的性能改进的方式,不是把数据缓存在内存里,就是调整一些开销较大的选项参数。

内存缓存

Spark SQL可以通过调用SQLContext.cacheTable(“tableName”)或者DataFrame.cache()把tables以列存储格式缓存到内存中。随后,Spark SQL将会扫描必要的列,并自动调整压缩比例,以减少内存占用和GC压力。你也可以用SQLContext.uncacheTable(“tableName”)来删除内存中的table。

你还可以使用SQLContext.setConf 或在SQL语句中运行SET key=value命令,来配置内存中的缓存。

属性名 默认值 含义
spark.sql.inMemoryColumnarStorage.compressed true 如果设置为true,Spark SQL将会根据数据统计信息,自动为每一列选择单独的压缩编码方式。
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制列式缓存批量的大小。增大批量大小可以提高内存利用率和压缩率,但同时也会带来OOM(Out Of Memory)的风险。

其他配置选项

以下选项同样也可以用来给查询任务调性能。不过这些选项在未来可能被放弃,因为spark将支持越来越多的自动优化。

属性名 默认值 含义
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 配置join操作时,能够作为广播变量的最大table的大小。设置为-1,表示禁用广播。注意,目前的元数据统计仅支持Hive metastore中的表,并且需要运行这个命令:ANALYSE TABLE <tableName> COMPUTE STATISTICS noscan
spark.sql.tungsten.enabled true 设为true,则启用优化的Tungsten物理执行后端。Tungsten会显式的管理内存,并动态生成表达式求值的字节码
spark.sql.shuffle.partitions 200 配置数据混洗(shuffle)时(join或者聚合操作),使用的分区数。

分布式SQL引擎

Spark SQL可以作为JDBC/ODBC或者命令行工具的分布式查询引擎。在这种模式下,终端用户或应用程序,无需写任何代码,就可以直接在Spark SQL中运行SQL查询。

运行Thrift JDBC/ODBC server

这里实现的Thrift JDBC/ODBC server和Hive-1.2.1中的HiveServer2是相同的。你可以使用beeline脚本来测试Spark或者Hive-1.2.1的JDBC server。

在Spark目录下运行下面这个命令,启动一个JDBC/ODBC server

./sbin/start-thriftserver.sh

这个脚本能接受所有 bin/spark-submit 命令支持的选项参数,外加一个 –hiveconf 选项,来指定Hive属性。运行./sbin/start-thriftserver.sh –help可以查看完整的选项列表。默认情况下,启动的server将会在localhost:10000端口上监听。要改变监听主机名或端口,可以用以下环境变量:

export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh \
  --master <master-uri> \
  ...

或者Hive系统属性 来指定

./sbin/start-thriftserver.sh \
  --hiveconf hive.server2.thrift.port=<listening-port> \
  --hiveconf hive.server2.thrift.bind.host=<listening-host> \
  --master <master-uri>
  ...

接下来,你就可以开始在beeline中测试这个Thrift JDBC/ODBC server:

./bin/beeline

下面的指令,可以连接到一个JDBC/ODBC server

beeline> !connect jdbc:hive2://localhost:10000

可能需要输入用户名和密码。在非安全模式下,只要输入你本机的用户名和一个空密码即可。对于安全模式,请参考beeline documentation.

Hive的配置是在conf/目录下的hive-site.xml,core-site.xml,hdfs-site.xml中指定的。

你也可以在beeline的脚本中指定。

Thrift JDBC server也支持通过HTTP传输Thrift RPC消息。以下配置(在conf/hive-site.xml中)将启用HTTP模式:

hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice

同样,在beeline中也可以用HTTP模式连接JDBC/ODBC server:

beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

使用Spark SQL命令行工具

Spark SQL CLI是一个很方便的工具,它可以用local mode运行hive metastore service,并且在命令行中执行输入的查询。注意Spark SQL CLI目前还不支持和Thrift JDBC server通信。

用如下命令,在spark目录下启动一个Spark SQL CLI

./bin/spark-sql

Hive配置在conf目录下hive-site.xml,core-site.xml,hdfs-site.xml中设置。你可以用这个命令查看完整的选项列表:./bin/spark-sql –help

升级指南

1.5升级到1.6

  • 从Spark-1.6.0起,默认Thrift server 将运行于多会话并存模式下(multi-session)。这意味着,每个JDBC/ODBC连接有其独立的SQL配置和临时函数注册表。table的缓存仍然是公用的。如果你更喜欢老的单会话模式,只需设置spark.sql.hive.thriftServer.singleSession为true即可。当然,你也可在spark-defaults.conf中设置,或者将其值传给start-thriftserver.sh –conf(如下):
./sbin/start-thriftserver.sh \
     --conf spark.sql.hive.thriftServer.singleSession=true \
     ...

1.4升级到1.5

  • Tungsten引擎现在默认是启用的,Tungsten是通过手动管理内存优化执行计划,同时也优化了表达式求值的代码生成。这两个特性都可以通过把spark.sql.tungsten.enabled设为false来禁用。
  • Parquet schema merging默认不启用。需要启用的话,设置spark.sql.parquet.mergeSchema为true即可
  • Python接口支持用点(.)来访问字段内嵌值,例如df[‘table.column.nestedField’]。但这也意味着,如果你的字段名包含点号(.)的话,你就必须用重音符来转义,如:table.`column.with.dots`.nested。
  • 列式存储内存分区剪枝默认是启用的。要禁用,设置spark.sql.inMemoryColumarStorage.partitionPruning为false即可
  • 不再支持无精度限制的decimal。Spark SQL现在强制最大精度为38位。对于BigDecimal对象,类型推导将会使用(38,18)精度的decimal类型。如果DDL中没有指明精度,默认使用的精度是(10,0)
  • 时间戳精确到1us(微秒),而不是1ns(纳秒)
  • 在“sql”这个SQL变种设置中,浮点数将被解析为decimal。HiveQL解析保持不变。
  • 标准SQL/DataFrame函数均为小写,例如:sum vs SUM。
  • 当推测任务被启用是,使用DirectOutputCommitter是不安全的,因此,DirectOutputCommitter在推测任务启用时,将被自动禁用,且忽略相关配置。
  • JSON数据源不再自动加载其他程序产生的新文件(例如,不是Spark SQL插入到dataset中的文件)。对于一个JSON的持久化表(如:Hive metastore中保存的表),用户可以使用REFRESH TABLE这个SQL命令或者HiveContext.refreshTable来把新文件包括进来。

1.3升级到1.4

DataFrame数据读写接口

根据用户的反馈,我们提供了一个新的,更加流畅的API,用于数据读(SQLContext.read)写(DataFrame.write),同时老的API(如:SQLCOntext.parquetFile, SQLContext.jsonFile)将被废弃。

有关SQLContext.read和DataFrame.write的更详细信息,请参考API文档。

DataFrame.groupBy保留分组字段

根据用户的反馈,我们改变了DataFrame.groupBy().agg()的默认行为,在返回的DataFrame结果中保留了分组字段。如果你想保持1.3中的行为,设置spark.sql.retainGroupColumns为false即可。

// 在1.3.x中,如果要保留分组字段"department", 你必须显式的在agg聚合时包含这个字段
df.groupBy("department").agg($"department", max("age"), sum("expense"))

// 而在1.4+,分组字段"department"默认就会包含在返回的DataFrame中
df.groupBy("department").agg(max("age"), sum("expense"))

// 要回滚到1.3的行为(不包含分组字段),按如下设置即可:
sqlContext.setConf("spark.sql.retainGroupColumns", "false")

1.2升级到1.3

在Spark 1.3中,我们去掉了Spark SQL的”Alpha“标签,并清理了可用的API。从Spark 1.3起,Spark SQL将对1.x系列二进制兼容。这个兼容性保证不包括显式的标注为”unstable(如:DeveloperAPI或Experimental)“的API。

SchemaRDD重命名为DataFrame

对于用户来说,Spark SQL 1.3最大的改动就是SchemaRDD改名为DataFrame。主要原因是,DataFrame不再直接由RDD派生,而是通过自己的实现提供RDD的功能。DataFrame只需要调用其rdd方法就能转成RDD。

在Scala中仍然有SchemaRDD,只不过这是DataFrame的一个别名,以便兼容一些现有代码。但仍然建议用户改用DataFrame。Java和Python用户就没这个福利了,他们必须改代码。

统一Java和Scala API

在Spark 1.3之前,有单独的java兼容类(JavaSQLContext和JavaSchemaRDD)及其在Scala API中的镜像。Spark 1.3中将Java API和Scala API统一。两种语言的用户都应该使用SQLContext和DataFrame。一般这些类中都会使用两种语言中都有的类型(如:Array取代各语言独有的集合)。有些情况下,没有通用的类型(例如:闭包或者maps),将会使用函数重载来解决这个问题。

另外,java特有的类型API被删除了。Scala和java用户都应该用org.apache.spark.sql.types来编程描述一个schema。

隐式转换隔离,DSL包移除 – 仅针对scala

Spark 1.3之前的很多示例代码,都在开头用 import sqlContext._,这行将会导致所有的sqlContext的函数都被引入进来。因此,在Spark 1.3我们把RDDs到DataFrames的隐式转换隔离出来,单独放到SQLContext.implicits对象中。用户现在应该这样写:import sqlContext.implicits._

另外,隐式转换也支持由Product(如:case classes或tuples)组成的RDD,但需要调用一个toDF方法,而不是自动转换。

如果需要使用DSL(被DataFrame取代的API)中的方法,用户之前需要导入DSL(import org.apache.spark.sql.catalyst.dsl), 而现在应该要导入 DataFrame API(import org.apache.spark.sql.functions._)

移除org.apache.spark.sql中DataType别名 – 仅针对scala

Spark 1.3删除了sql包中的DataType类型别名。现在,用户应该使用 org.apache.spark.sql.types中的类。

UDF注册挪到sqlContext.udf中 – 针对java和scala

注册UDF的函数,不管是DataFrame,DSL或者SQL中用到的,都被挪到SQLContext.udf中。

sqlContext.udf.register("strLen", (s: String) => s.length())

Python UDF注册保持不变。

Python DataTypes不再是单例

在python中使用DataTypes,你需要先构造一个对象(如:StringType()),而不是引用一个单例。

Shark用户迁移指南

调度

用户可以通过如下命令,为JDBC客户端session设定一个Fair Scheduler pool。

SET spark.sql.thriftserver.scheduler.pool=accounting;

Reducer个数

在Shark中,默认的reducer个数是1,并且由mapred.reduce.tasks设定。Spark SQL废弃了这个属性,改为 spark.sql.shuffle.partitions, 并且默认200,用户可通过如下SET命令来自定义:

SET spark.sql.shuffle.partitions=10;
SELECT page, count(*) c
FROM logs_last_month_cached
GROUP BY page ORDER BY c DESC LIMIT 10;

你也可以把这个属性放到hive-site.xml中来覆盖默认值。

目前,mapred.reduce.tasks属性仍然能被识别,并且自动转成spark.sql.shuffle.partitions

缓存

shark.cache表属性已经不存在了,并且以”_cached”结尾命名的表也不再会自动缓存。取而代之的是,CACHE TABLE和UNCACHE TABLE语句,用以显式的控制表的缓存:

CACHE TABLE logs_last_month;
UNCACHE TABLE logs_last_month;

注意:CACHE TABLE tbl 现在默认是饥饿模式,而非懒惰模式。再也不需要手动调用其他action来触发cache了!

从Spark-1.2.0开始,Spark SQL新提供了一个语句,让用户自己控制表缓存是否是懒惰模式

CACHE [LAZY] TABLE [AS SELECT] ...

以下几个缓存相关的特性不再支持:

  • 用户定义分区级别的缓存逐出策略
  • RDD 重加载
  • 内存缓存直接写入策略

兼容Apache Hive

Spark SQL设计时考虑了和Hive metastore,SerDes以及UDF的兼容性。目前这些兼容性斗是基于Hive-1.2.1版本,并且Spark SQL可以连到不同版本的Hive metastore(从0.12.0到1.2.1,参考:http://spark.apache.org/docs/latest/sql-programming-guide.html#interacting-with-different-versions-of-hive-metastore

部署在已有的Hive仓库之上

Spark SQL Thrift JDBC server采用了”out of the box”(开箱即用)的设计,使用很方便,并兼容已有的Hive安装版本。你不需要修改已有的Hive metastore或者改变数据的位置,或者表分区。

支持的Hive功能

Spark SQL 支持绝大部分Hive功能,如:

  • Hive查询语句:
    • SELECT
    • GROUP BY
    • ORDER BY
    • CLUSTER BY
    • SORT BY
  • 所有的Hive操作符:
    • Relational operators (===<><>>=<=, etc)
    • Arithmetic operators (+-*/%, etc)
    • Logical operators (AND&&OR||, etc)
    • Complex type constructors
    • Mathematical functions (signlncos, etc)
    • String functions (instrlengthprintf, etc)
  • 用户定义函数(UDF)
  • 用户定义聚合函数(UDAF)
  • 用户定义序列化、反序列化(SerDes)
  • 窗口函数(Window functions)
  • Joins
    • JOIN
    • {LEFT|RIGHT|FULL} OUTER JOIN
    • LEFT SEMI JOIN
    • CROSS JOIN
  • Unions
  • 查询子句
    • SELECT col FROM ( SELECT a + b AS col from t1) t2
  • 采样
  • 执行计划详细(Explain)
  • 分区表,包括动态分区插入
  • 视图
  • 所有Hive DDL(data definition language):
    • CREATE TABLE
    • CREATE TABLE AS SELECT
    • ALTER TABLE
  • 绝大部分Hive数据类型:
    • TINYINT
    • SMALLINT
    • INT
    • BIGINT
    • BOOLEAN
    • FLOAT
    • DOUBLE
    • STRING
    • BINARY
    • TIMESTAMP
    • DATE
    • ARRAY<>
    • MAP<>
    • STRUCT<>

不支持的Hive功能

以下是目前不支持的Hive特性的列表。多数是不常用的。

不支持的Hive常见功能

  • bucket表:butcket是Hive表的一个哈希分区

不支持的Hive高级功能

  • UNION类操作
  • 去重join
  • 字段统计信息收集:Spark SQL不支持同步的字段统计收集

Hive输入、输出格式

  • CLI文件格式:对于需要回显到CLI中的结果,Spark SQL仅支持TextOutputFormat。
  • Hadoop archive — Hadoop归档

Hive优化

一些比较棘手的Hive优化目前还没有在Spark中提供。有一些(如索引)对应Spark SQL这种内存计算模型来说并不重要。另外一些,在Spark SQL未来的版本中会支持。

  • 块级别位图索引和虚拟字段(用来建索引)
  • 自动计算reducer个数(join和groupBy算子):目前在Spark SQL中你需要这样控制混洗后(post-shuffle)并发程度:”SET spark.sql.shuffle.partitions=[num_tasks];”
  • 元数据查询:只查询元数据的请求,Spark SQL仍需要启动任务来计算结果
  • 数据倾斜标志:Spark SQL不会理会Hive中的数据倾斜标志
  • STREAMTABLE join提示:Spark SQL里没有这玩艺儿
  • 返回结果时合并小文件:如果返回的结果有很多小文件,Hive有个选项设置,来合并小文件,以避免超过HDFS的文件数额度限制。Spark SQL不支持这个。

参考

数据类型

Spark SQL和DataFrames支持如下数据类型:

  • Numeric types(数值类型)
    • ByteType: 1字节长的有符号整型,范围:-128 到 127.
    • ShortType: 2字节长有符号整型,范围:-32768 到 32767.
    • IntegerType: 4字节有符号整型,范围:-2147483648 到 2147483647.
    • LongType: 8字节有符号整型,范围: -9223372036854775808 to 9223372036854775807.
    • FloatType: 4字节单精度浮点数。
    • DoubleType: 8字节双精度浮点数
    • DecimalType: 任意精度有符号带小数的数值。内部使用java.math.BigDecimal, BigDecimal包含任意精度的不缩放整型,和一个32位的缩放整型
  • String type(字符串类型)
    • StringType: 字符串
  • Binary type(二进制类型)
    • BinaryType: 字节序列
  • Boolean type(布尔类型)
    • BooleanType: 布尔类型
  • Datetime type(日期类型)
    • TimestampType: 表示包含年月日、时分秒等字段的日期
    • DateType: 表示包含年月日字段的日期
  • Complex types(复杂类型)
    • ArrayType(elementType, containsNull):数组类型,表达一系列的elementType类型的元素组成的序列,containsNull表示数组能否包含null值
    • MapType(keyType, valueType, valueContainsNull):映射集合类型,表示一个键值对的集合。键的类型是keyType,值的类型则由valueType指定。对应MapType来说,键是不能为null的,而值能否为null则取决于valueContainsNull。
    • StructType(fields):表示包含StructField序列的结构体。
      • StructField(name, datatype, nullable): 表示StructType中的一个字段,name是字段名,datatype是数据类型,nullable表示该字段是否可以为空

所有Spark SQL支持的数据类型都在这个包里:org.apache.spark.sql.types,你可以这样导入之:

import  org.apache.spark.sql.types._
Data type Value type in Scala API to access or create a data type
ByteType Byte ByteType
ShortType Short ShortType
IntegerType Int IntegerType
LongType Long LongType
FloatType Float FloatType
DoubleType Double DoubleType
DecimalType java.math.BigDecimal DecimalType
StringType String StringType
BinaryType Array[Byte] BinaryType
BooleanType Boolean BooleanType
TimestampType java.sql.Timestamp TimestampType
DateType java.sql.Date DateType
ArrayType scala.collection.Seq ArrayType(elementType, [containsNull])注意:默认containsNull为true
MapType scala.collection.Map MapType(keyTypevalueType, [valueContainsNull])注意:默认valueContainsNull为true
StructType org.apache.spark.sql.Row StructType(fields)注意:fields是一个StructFields的序列,并且同名的字段是不允许的。
StructField 定义字段的数据对应的Scala类型(例如,如果StructField的dataType为IntegerType,则其数据对应的scala类型为Int) StructField(namedataTypenullable)

NaN语义

这是Not-a-Number的缩写,某些float或double类型不符合标准浮点数语义,需要对其特殊处理:

  • NaN == NaN,即:NaN和NaN总是相等
  • 在聚合函数中,所有NaN分到同一组
  • NaN在join操作中可以当做一个普通的join key
  • NaN在升序排序中排到最后,比任何其他数值都大


FavoriteLoading添加本文到我的收藏
  • Trackback 关闭
  • 评论 (1)
    • eli
    • 2018/02/06 5:01下午

    发现一个错误。在 “Hive/Parquet schema调和” 这里。
    官网原文是 Hive is case insensitive, while Parquet is not .
    正确翻译应该是 “Hive大小写 不 敏感,而Parquet不是”。 上文中的翻译却是”Hive大小写敏感,而Parquet不是”

您必须 登陆 后才能发表评论

return top