Spark catalyst optimizer and query optimization

krishnaprasad k
4 min readJul 24, 2021

--

The term optimization refers to the process in which system works more efficiently with the same amount of resources.

Spark SQL is the most important component in Apache spark which deals with both SQL queries and DataFrame APIs. In depth of spark SQL lies a catalyst optimizer.

Catalyst optimizer supports both rule based and cost based optimization. Rule based optimization is defined as the set of rules to determine how to execute the query, while cost based optimization is most suitable way to carry out SQL statement. Now let us look at the structure of catalyst optimizer in detail and see how it optimizes SQL queries.

The catalyst optimizer

catalyst optimizer

Spark SQL deals with both SQL queries and DataFrame APIs now let us see step by step optimization procedure.

  • Unresolved Logical Plan

Unresolved logical plan is your plan on how you want to transform your data. This plan is called unresolved because all the entities such as column names would not be resolved in this stage.

scala> var df = spark.read.option("header",true).csv("/user/test/olympic-summary/Summer-Olympic-medals-1976-to-2008.csv").orderBy("Year").filter(col("Country")==="East Germany").count()

The query is to read a dataset consists of data of Olympic medals from 1976. Read the dataset order by year and filter out details of East Germany and take the count.

Unresolved logical plan would look like following

Aggregate [count(1) AS count#1197L]
+- AnalysisBarrier
+- Filter (Country#1169 = East Germany)
+- Sort [Year#1162 ASC NULLS FIRST], true
+- Relation[City#1161,Year#1162,Sport#1163,Discipline#1164,Event#1165,Athlete#1166,Gender#1167,Country_Code#1168,Country#1169,Event_gender#1170,Medal#1171] csv

We can see that it is similar to our own query, read the csv file, sort according to year and filter out details of East Germany.

Logical Plan

The catalogue refers how our metadata is stored in the table, in the analysis phase column and table names are validated against data.

count: bigint
Aggregate [count(1) AS count#1197L]
+- Filter (Country#1169 = East Germany)
+- Sort [Year#1162 ASC NULLS FIRST], true
+- Relation[City#1161,Year#1162,Sport#1163,Discipline#1164,Event#1165,Athlete#1166,Gender#1167,Country_Code#1168,Country#1169,Event_gender#1170,Medal#1171] csv

Optimized Logical plan

Logical optimization is the phase where first step of optimization takes place. The result of this phase is will be optimized logical plan, actual query will be reordered.

Aggregate [count(1) AS count#1197L]
+- Project
+- Sort [Year#1162 ASC NULLS FIRST], true
+- Project [Year#1162]
+- Filter (isnotnull(Country#1169) && (Country#1169 = East Germany))
+- Relation[City#1161,Year#1162,Sport#1163,Discipline#1164,Event#1165,Athlete#1166,Gender#1167,Country_Code#1168,Country#1169,Event_gender#1170,Medal#1171] csv

Now we can see the reorder in which filter is applied first, after applying filter a sort function is applied.

Physical plan

It is the final step of optimization. One or more physical plan is generated according the the query. Each physical plan is evaluated according to the cost model, from that best model is selected. Code is generated from the selected from the physical plan and applied.

DAG Analysis

Based on the execution plan spark creates a Directed Acyclic Graph(DAG) now let us see analyze the steps and stages in a simple DAG (count) example and a join example.

DAG created for Dataframe count
  • Wholestagecodegen: Wholestagecodegen is the process of generating actual code for execution.
var df = spark.read.option("header",true).csv("/user/test/olympic-summary/Summer-Olympic-medals-1976-to-2008.csv")df.queryExecution.debug.codegen

The above query can be used to view the generated code for reading a csv file.

  • HashAggregate: Scala has a Aggutils object in which all aggregation methods are defined. These methods are invoked while calling all the aggregation methods like min,max,count etc.
planAggregateWithOneDistinct(​
groupingExpressions: Seq[NamedExpression],​
functionsWithDistinct: Seq[AggregateExpression],​
functionsWithoutDistinct: Seq[AggregateExpression],​
resultExpressions: Seq[NamedExpression],​
child: SparkPlan): Seq[SparkPlan]

planAggregateWithOneDistinct method is used called when a sort is called.

Exchanges

An exchange in spark is process where data is transferred between different nodes. Exchanges in spark can be broadly classified into two

  • Broadcast exchanges: The process where smaller dataset is broadcasted among different nodes.
  • Shuffle exchanges: Transfer of data among different nodes

Now let us look these exchanges in detail

Broadcast exchange

val t1=spark.range(10)
val t2=spark.range(10)
val t3=t1.join(broadcast(t2)).where(t1("id")===t2("id"))

Dataset t2 is broadcasted to different executers, broadcasting dataset will be more efficient than a shuffle. In spark only a dataset of size less than 8 GB can be broadcasted.

scala> t3.explain()
== Physical Plan ==
*(2) BroadcastHashJoin [id#1363L], [id#1365L], Inner, BuildRight
:- *(2) Range (0, 10, step=1, splits=8)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *(1) Range (0, 10, step=1, splits=8)

Shuffle Exchanges

Repartition method in which data is divided into different spark partition is an example of shuffle exchange.

val q1=spark.range(5).repartition(5)
scala> q1.explain()
== Physical Plan ==
Exchange RoundRobinPartitioning(5)
+- *(1) Range (0, 5, step=1, splits=8)

--

--

krishnaprasad k
krishnaprasad k

Written by krishnaprasad k

Data engineer | Deep learning enthusiast | Back end developer |

No responses yet