弱鸡了吧?背各种SparkSQL调优参数?这个东西才是SparkSQL必须要懂的
- Spark SQL在Spark集群中是如何执行的?
- Unresolved执行计划和Resolved执行计划的区别什么?
- 逻辑执行计划和物理执行的区别?
- Spark SQL是如何对SQL进行优化的?
- Spark SQL中的Codegen是个什么组件?
执行计划这个词大家都不会感到陌生,而且基本上只要是支持SQL的计算引擎,基本上都会有执行计划。而当要优化SparkSQL应用时,一定是要了解SparkSQL执行计划的。不然,发现SQL执行很慢,根本发现不了问题在哪儿,应该在哪儿进行优化,应该是调整SQL的编写方式、还是用Hint、还是调参,而不是把SparkSQL优化方案拿来试一遍
执行计划从字面上来理解就是要执行某一项工作,执行这一项工作需要有个计划。执行计划是将SQL转换为一组经过优化后的逻辑和物理操作。在Spark SQL中,执行计划就是经过一系列组件的处理、优化,将SQL转换为一组Job(DAG),再放入到Spark Executors中执行的过程。
在Spark SQL中,执行计划是了解SQL执行详细信息的利器。 它包含许多有用的信息,清晰地描述了计划是如何执行的。在我们发现某些SQL执行效率低下时,可以根据执行计划中的信息,来发现效率低下的问题,并可以修改部分SQL查询语句或者调整参数以获得更好的执行性能。
Spark SQL中的执行计划分为两种:
逻辑执行计划
逻辑执行计划是对需要执行的所有转换步骤的简单描述,并不涉及具体该怎么执行。SparkContext负责生成和保存逻辑执行计划。逻辑执行计划是对特定的表进行一系列的转换操作,例如:Join、Filter、where、groupby等等,它描述的是SQL语句想要的执行图。
物理执行计划
物理执行计划是确定连接的类型、分区的数量、过滤器、where、groupBy子句的执行顺序等等。
这张图是Spark SQL执行计划的处理流程。
先不说具体的细节,感觉这个流程是比较长的。学习Java的同学可能知道,一个Java代码先要经过编译层字节码,然后将字节码通过classloader加载,并存储在JVM方法区中。执行找到main方法,然后开始执行其中的指令。其实Java的执行流程也比较长,只不过我们日常CRUD的时候根本考虑不到这些。
核心的执行过程一共有5个步骤:
这些操作和计划都是Spark SQL自动处理的,Spark SQL会议最优地方式来执行编写的SQL语句。
这几个步骤的操作会生成各种计划:
- Unresolved逻辑执行计划
- Resolved逻辑执行计划
- 优化后的逻辑执行计划
- 物理执行计划
此处,我用一个简单点的示例来测试:
case class Item(id:Int, name:String, price:Float)
case class Order(id:Int, itemId:Int, count:Int)
上述通过scala的两个样例类定义了两个表的schema,一个Item表有三个字段,另外一个是Order表也有三个字段。其中,Order中有itemId和Item的id是关联的。
val itemSumDF = spark.sql(
"""
| select
| ITEMS.name,
| ITEMS.price,
| SUM(ORDERS.count) as c
| from ITEMS, ORDERS
| where
| ITEMS.id=ORDERS.itemid
| and ITEMS.id=2
| group by
| ITEMS.name
| , ITEMS.price
""".stripMargin)
从3.0开始,explain方法有一个新的参数mode,该参数可以指定以什么样的格式展示执行计划:
- explain(mode=”simple”):只展示物理执行计划。
- explain(mode=”extended”):展示物理执行计划和逻辑执行计划。
- explain(mode=”codegen”) :展示要Codegen生成的可执行Java代码。
- explain(mode=”cost”):展示优化后的逻辑执行计划以及相关的统计。
- explain(mode=”formatted”):以分隔的方式输出,它会输出更易读的物理执行计划,并展示每个节点的详细信息。
案例,运行结果如下:
+-----------------+-------+---+
| name| price| c|
+-----------------+-------+---+
|iphone 11 pro max|10500.0| 4|
+-----------------+-------+---+
Unresolved逻辑执行计划
Resolved逻辑执行计划
Optimized逻辑执行计划
物理执行计划
formatted方式输出的物理执行计划(这种方式可读性更好):
(1) LocalTableScan[codegen id : 1]
Output[3]:[id#0, name#1, price#2]
Arguments:[id#0, name#1, price#2]
(2) LocalTableScan
Output[2]:[itemId#7, count#8]
Arguments:[itemId#7, count#8]
(3) BroadcastExchange
Input[2]:[itemId#7, count#8]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false]as bigint))),[id=#71]
(4) BroadcastHashJoin[codegen id : 1]
Left keys[1]:[id#0]
Right keys[1]:[itemid#7]
Join condition: None
…其他省略
执行计划的总体结构是一颗树,每个节点表示一个操作符,这个操作符描述了执行的一些操作(针对物理执行计划为:对哪张表的、哪个字段操作等等)。
Spark SQL中的Parser组件检查SQL语法上是否有问题,然后生成Unresolved(未决断)的逻辑计划。这也是逻辑执行计划的第一个版本。之所以叫做Unresolved逻辑执行计划,因为SQL语法可能是正确的,但有一些表名或者列名不存在。这一步是不检查表名、不检查列名的。
此外,SparkSQL中有一个名为Catalog的组件,这个组件其实是一个存储库。它里面包含了SparkSQL表信息、DataFrame、以及DataSet的所有信息,如果元数据来自于Hive,它会将MySQL存储的元数据拉入到Catalog组件中。
我们来看一下Spark SQL的输出:
==Parsed Logical Plan=='Aggregate['ITEMS.name, 'ITEMS.price],['ITEMS.name, 'ITEMS.price, 'SUM('ORDERS.count) AS c#12]
+- 'Filter (('ITEMS.id='ORDERS.itemid) AND ('ITEMS.id=2))
+- 'Join Inner
:- 'UnresolvedRelation[ITEMS]
+- 'UnresolvedRelation[ORDERS]
这个Unresolved逻辑执行计划,并没有明确解析关系表的名称和列。
我们之前说执行计划是一颗树。为了方便大家理解,举个简单的例子:1 + (3 + 4) * 2,这个算术表达式我们通过大脑很容易计算出来结果。其实它也可以将它描述为一颗树。
通过中缀遍历,我们就可以将该表达式计算出来。同样,执行计划也可以用一颗树来表示。如下所示,
只不过之前算术表达式的计算是+、-、*、/,而这里的操作是filter(过滤)、join inner(关联),操作的数据也不是之前的1、2、3、4,而是UnresolvedRelation(关系)——其实就是后边的表。
SparkSQL中的Analyzer组件会先分析之前生成的Unresolved逻辑执行计划,并通过访问Spark中的Catalog存储库来进行表名、列名的解析、验证。例如:之前的'UnresolvedRelation[ITEMS]、'UnresolvedRelation[ORDERS]标记为Unresolved的关系,在这个过程会被处理为实际的表和实际的列名。
在这个类似于元数据库的Catalog中,进一步地进行语义分析、验证数据结构、模式(schema)、类型等。如果一切都很顺利,那么该逻辑执行计划会标记为Resolved逻辑执行计划。这个操作是由Spark SQL中的Analyzer组件完成的,它帮助我们从Catalog存储库中解析验证语义、列名、表名等。如果Analyzer组件无法解析表名、列名则会失败。否则,就会生成Resolved逻辑执行计划。
我们来看一下逻辑计划树优化前后的对比:
==Analyzed Logical Plan==
name: string, price: float, c: bigint
Aggregate[name#1, price#2],[name#1, price#2, sum(cast(count#8 as bigint)) AS c#12L]
+- Filter ((id#0=itemid#7) AND (id#0=2))
+- Join Inner
:- SubqueryAlias items
: +- LocalRelation[id#0, name#1, price#2]
+- SubqueryAlias orders
+- LocalRelation[id#6, itemId#7, count#8]
我们看到以下变化:
- 所有的列的名称都加上了#1、#2等
- 在sum方法中进行了一些类型转换
- UnresolvedRelation也会处理为了别名,也解析出来了是LocalRelation(因为此处数据是在Driver代码中生成在本地的),而且每个关系上的列也都解析出来了。
生成了Analyzed逻辑执行计划之后,该逻辑执行计划会传递给Catalyst Optimizer,Catalysts Optimizer是Spark SQL重要的优化器,它根据各种规则(例如:过滤器、聚合)进行优化。它将逻辑操作重新排序以优化逻辑执行计划。例如:
- 在多表关联查询时,它来决定执行顺序
- 尝试在进行执行Project(投影查询)之前,评估Filter来优化查询等等。
优化结束后的逻辑执行计划就是Optimized逻辑执行计划。
如下所示:
==Optimized Logical Plan==Aggregate[name#1, price#2],[name#1, price#2, sum(cast(count#8 as bigint)) AS c#12L]
+- Project[name#1, price#2, count#8]
+- Join Inner, (id#0=itemid#7)
:- LocalRelation[id#0, name#1, price#2]
+- LocalRelation[itemId#7, count#8]
我们观察下,发现Filter已经被移动到了Join Inner节点(其实就是谓词下推的过程,可以有效减少表关联的数据量)。而且SubqueryAlias也已经被处理掉了,第二个LocalRelation也把id#6移除了。
select
ITEMS.name,
ITEMS.price,
SUM(ORDERS.count) as c
from ITEMS, ORDERS
where
ITEMS.id=ORDERS.itemid
and ITEMS.id=2
group by
ITEMS.name
, ITEMS.price
我们的SQL语句中确实也没有用到ORDERS表中的ID。
SparkSQL要能够在Spark执行,是必须要生成物理执行计划的。Spark SQL中的Planner组件依据Catalyst Optimizer基于各种策略会生成一个或多个物理执行计划。
==Physical Plan==* HashAggregate (8)
+- Exchange (7)
+- * HashAggregate (6)
+- * Project (5)
+- * BroadcastHashJoin Inner BuildRight (4)
:- * LocalTableScan (1)
+- BroadcastExchange (3)
+- LocalTableScan (2)
这里是将两个表进行关联,这两个表在实际开发中有可能是一个是小表、一个是大表,这些表中有可能数据是以不同分区的形式存储的,不同的分区散布在整个集群的不同节点中,Spark SQL会判断先关联哪些分区、连接的类型,以获得更好的性能。
这里先简单介绍下物理执行计划的操作符,后续我们还会用专门一个篇幅来介绍这些操作符。
- HashAggregate运算符表示数据聚合,一般HashAggregate是成对出现,第一个HashAggregate是将执行节点本地的数据进行局部聚合,另一个HashAggregate是将各个分区的数据进一步进行聚合计算。
- Exchange运算符其实就是shuffle,表示需要在集群上移动数据。很多时候HashAggregate会以Exchange分隔开来。
- Project运算符是SQL中的投影操作,就是选择列(例如:select name, age…)
- BroadcastHashJoin运算符表示通过基于广播方式进行HashJoin
- LocalTableScan运算符就是全表扫描本地的表
选择最佳物理执行计划
Planner组件会基于Cost Model(成本模型)会根据执行时间和资源消耗来预测估计每个物理执行计划,并且选择其中一个作为最终的追加物理执行计划。
==Optimized Logical Plan==Aggregate[name#1, price#2],[name#1, price#2, sum(cast(count#8 as bigint)) AS c#12L], Statistics(sizeInBytes=523.0 B)
+- Project[name#1, price#2, count#8], Statistics(sizeInBytes=471.0 B)
+- Join Inner, (id#0=itemid#7), Statistics(sizeInBytes=576.0 B)
:- LocalRelation[id#0, name#1, price#2], Statistics(sizeInBytes=36.0 B)
+- LocalRelation[itemId#7, count#8], Statistics(sizeInBytes=16.0 B)
这个执行计划中列举出来了每个过程的统计数据,里面包含了数据量的大小。点赞!
我们来看一下SparkSQL的物理执行计划。
通过WebUI,我们看到这个SQL的执行过程。注意WebUI是一颗颠倒的树,而在文本展示的执行计划是一颗正常的树展示。
选择了最佳物理计划后,就该查询生成可执行代码(RDD的DAG)了,该SQL语句将在集群中以分布式方式执行。
Codegen是可选的,默认spark.sql.codegen.wholeStage为true,是打开的。如果禁用了Codegen,Spark将使用物理执行计划直接开始在每个子节点中运行。Codegen阶段其实就是对物理执行计划进一步地进行优化。它将多个物理运算符(或者是在物理执行计划树中支持codegen的子树)融合到一个Java函数中,这个过程称之为Whole-Stage Java Code Generation。
我们来对比一下配置了Codegen和没有配置Codegen的差别。
大家看出来区别了吗?
没有配置Codegen,是没有两个大蓝色框的。像第一个蓝色框中的操作,是通过Codegen将这些操作串联在一个代码单元中。
总结:
Spark SQL在Spark集群中是如何执行的?
Spark SQL会经过以下过程,
- Parser组件将SQL转换为Unresolved逻辑执行计划
- Analyzer组件通过获取Catalog存储库将Unresolved逻辑执行计划处理为Resolved逻辑执行计划
- Catalyst Optimizer组件,将Resolved逻辑执行计划转换为Optimized逻辑执行计划
- Planner组件将Optimized逻辑执行计划转换为物理执行计划
- Planner组件对上一步的物理执行计划进行评估,选择出最终的物理执行计划
- Code Generation对物理执行计划进一步优化,将一些操作串联在一起
- 生成Job(DAG)由scheduler调度到spark executors中执行
Unresolved执行计划和Resolved执行计划的区别什么?
Unresolved执行计划对SQL语法解析,而Resolved执行计划会从Catalog中拉取元数据,解析表名和列名。
逻辑执行计划和物理执行计划的区别?
逻辑执行计划只是对SQL语句中以什么样的执行顺序做一个整体描述,而物理执行计划中包含了具体要进行什么的操作。例如:是BroadcastJoin、还是SortMergeJoin等等。
Spark SQL是如何对SQL进行优化的?
由Catalyst Optimizer组件根据一系列规则对SQL进行优化,是对逻辑执行计划进行优化。例如:我们常听说的谓词下推就是其中一个规则。
Spark SQL中的Codegen是个什么组件?
用来将一些非shuffle的操作整合到一个whole-stage中,Spark SQL会针对这些操作生成Java代码放在executor中执行。