Apache Spark Optimization Techniques
Below are Spark optimization techniques which would reduce your data processing time and make your spark applications more efficient
- Caching: Spark Cache is used to save frequently used dataframes to avoid re-computation. Caching saves your data in memory or to disk .Spark provides two functions Cache and Persist. Cache function saves your dataframe to memory and if spilled then to disk. Persist function gives you an option to specify where to cache your data and we can cache in memory or disk. We need to be careful with the amount of data being cached in memory because cached data would occupy your compute memory and you may want to make sure you have sufficient compute memory to process your data. Persisting the cache on disk can be considered since it would not occupy your compute power. It is also a good practice to unpersist the cached dataframe using the unpersist() method once the computations are completed. Note that it is not recommended to cache a table just read using “spark.read” command because catalyst optimizer cannot apply query optimizations like predicate pushdown on a cached dataframe
- Optimize shuffle partition: It is duty of the spark developer to set the shuffle partition for every spark application. Default partition size is 200. In most cases we can set the shuffle partition proportional to the number of cores in the cluster.
- Use Higher order functions and avoid UDF’s: Use Spark SQL functions wherever possible and avoid UDF’s because incase of Pyspark, Since there is code serialization between Python and Java at every executor which will slow down the query performance
- Enable AQE: Adaptive Query Execution is a new feature introduced in Spark 3.0 and enabled by default from Spark 3.2. AQE provides faster query performance by allowing Spark to change the query plan at run time while the query is being executed.This feature can be turned ON using spark.conf.set (“spark.sql.adaptive.enabled”, True). When AQE is ON, based on the runtime statistics, spark knows the size of data in shuffle files. So, when working out the query execution plan at query run time, it can switch join strategies to choose between shuffle sort join or broadcast join based on the size of the tables at run time, coalesce the number of shuffle partitions or optimize the skew joins.
The 3 advantages of Adaptive Query Execution:
Join Strategy switch:
If you have a JOIN condition in your query then while making the initial query plan, spark will read the size of table (data not loaded yet) and if one of the table has a size less than 10 MB then Broadcast Join(table is pushed to all the executors’ memory. No data shuffle during join) is used . Else usual sort-merge join is used (Sort merge join causes data shuffling between nodes which is expensive)
AQE enables Spark to modify the physical query plan at run time using the run time statistics. When the query is executed, initially spark may have chosen sort-merge join based on scanning the two table sizes being joined since table sizes are more than 10MB. But at run time, spark does predicate push down where filters are applied directly on the data source, this may reduce size of table being loaded to less than 10 MB. In this case, if AQE is enabled then spark can alter the query plan and change the sort-merge join to Broadcast JOIN which will boost the query performance
Coalesce Shuffle partition:
spark.sql.shuffle.partitions should be set by developer for every job. Every wide partition will output partitions equal to the set number of shuffle partitions. Default value is 200. We can set spark shuffle partitions proportional total number of cores in the cluster. More on this discussed later in this blog
If AQE is ON, then spark will choose the optimal shuffle partition itself based on run time statistics.
NOTE: Set a higher shuffle partition proportional to the total number of cores in the cluster so that spark can automatically scale down the shuffle partition based on run time statistics (AQE needs to be ON).
Optimize Skew Joins:
Data Skew occurs when the data is unevenly distributed among the data partitions in memory.
If AQE is turned ON, spark can detect skewness from shuffle file statistics while the query runs and it can sub-partition the skewed data so that each task would complete at the approximately the same amount of time
5. Broadcast join: Broadcast join distributes the complete table (which is of small size usually less than 15 MB) to all the worker nodes. Since the complete table is present on all the worker nodes, there is no data shuffle resulting in improved performance. Default broadcast size set by Spark is 10 MB and same can be altered using the command
Spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, 10MB)
Broadcast join speeds up the join operation by avoiding shuffle. It is advantageous to broadcast tables of size only around or below 100MB.
Spark can choose the optimal join itself if Adaptive Query Execution (AQE) is turned ON
6. Avoid Skew and Spill: Skew is a condition where one of the data partition is of very large size. This will make the task to run longer than the other task and the the stage will complete only after the skewed data partition is processed. Skew may also result in spill if there is no sufficient executor memory. Spill is a condition where spark writes the intermediate data to disk since there is no available memory. Skew can be avoided if AQE is turned ON and spill can be avoided by making sure there is enough Spark memory. Skew and spill can be detected in the Spark UI
7. Avoid scanning issue:
Register highly partitioned files as tables. Highly partitioned files, for example a file with year, month and days columns which is partitioned on the same year, month and days column, it is better to register them as tables so that table meta data are already registered in the meta store resulting in faster query performance. Also avoid too many small files (ex 100s of parquet part files of very small size) by combining them to a single file using functions like coalesce. Small files would make queries slower since spark would take more time to scan the files in the folder
8. Bucketing:
Bucketing is a mitigation strategy to avoid expensive shuffle operations for tables of size in terabyte range or more. Predominantly used to join terabyte sized tables
Bucketing should only be used for datasets in range of terabytes or more where the tables will be queried /joined frequently
Bucketing pre-shuffles the data so that data is located in specific partition or part files. Location of part files and content of part files are tracked by the meta-store
when bucketing two tables which are frequently joined, then both tables should be bucketed on the same column with same number of buckets even if one table is in terabytes range and other table is in MB range
Number of spark partitions while reading the file from disk is equal to the number of buckets on the dataset. Note that defaultMaxPartitionBytes is ignored to calculate the partitions here
Bucketing is only useful when you are joining two tables very frequently to avoid shuffle. Note that you are pre-shuffling the data to get the data bucketed and then doing a join operation on the bucketed data for the improved query performance. So, if you are a joining the table only once then you might be better off without bucketing
9. Disk partitioning:
The advantages of disk partition are predicate pushdown where spark would read only the necessary columns and rows from the file which gives significant performance gain . The other advantage is reduced IO because column names and data types are inferred from the subdirectories. Both are finalized during scanning and NOT when reading the actual data. IO is reduced since the columns values are inferred from directory name itself instead of having to read through every single record
Disk partition for low cardinality columns like Years, months and date where filters fetches a group of records. Disk partition helps to apply predicate push down and reduced IO since the partition column values can be inferred from the directory and sub-directories instead of having to read the part files
10. Z-order indexing:
Z-ordering is a technique which sorts the data in delta tables. Z-ordering provides quick query performance by reading only necessary data files which are needed for your query while skipping others.
Z-ordering reduces the scan time for highly selective queries. (example: filtering a record which happened only once on a dataset containing billion records).
You can use z-order for any number of columns. But we should order the columns based on columns most used in your queries. Adding more columns will decrease the performance of z-ordering
Z-ordering code example below
-- SQL
OPTIMIZE events
WHERE date >= current_timestamp() - INTERVAL 1 day
ZORDER BY (eventType,date)
11. file format selection:
Prefer Columnar formats when you need quick reads and Row formats for quick writes. Row based file formats like AVRO offer faster write but slower reads and on the other hand, columnar file formats like ORC and Parquet offer slower writes when compared to AVRO but has faster read performance. Row based file format are used for files that are written and read once. Columnar file formats are used for files which are read and transformed more frequently.
Parquet is most used file format in Spark and provides faster query performance using data skipping and data are always compressed on storage saving storage space. Data skipping is a process where Spark can read only necessary rows and columns from the parquet file to process a query instead of having to read the entire file
12. Pass Schema manually:
Pass the schema manually whenever possible. Because spark will have to scan entire files in case of CSV and JSON file format to infer the schema. Even in case of parquets, spark will have to read at least one part file to infer the schema.
13. filter rows and columns as early as possible
Apply filters to have only necessary rows and select only necessary columns before doing transformations on the data. This would reduce the processing time and necessary compute power
I hope you found this useful! let me know in the comments section if I have to add any other important optimization techniques to this blog
14. Databricks Disk Cache (Delta cache):
Disk cache is databricks proprietary feature which saves parquet files in cluster node’s local disk when you read a parquet file from remote source like S3/ADLS and subsequent read of the data would be fetched from node’s local disk and not read from remote source
This feature is automatically enabled by databricks and is useful when you choose cluster nodes with SSD based local storage . Check if disk cache is enabled using the below command
spark.conf.get("spark.databricks.io.cache.enabled")
For more info on disk cache, refer this databricks blog
Happy learning!