Use Relational Cache to Accelerate EMR Spark in Data Analysis

Image for post
Image for post

Cache can broadly be defined as hardware or software of some kind that is used in various fields and directions of data processing. For many computing devices, I/O access speeds are highly dependent on the storage medium used for caching. As such, typically, the I/O access speeds of HDDs slower than SDDs, which are slower than NVMes, which comes after Mem, and L3-L2-L1 Cache, and Register, and the CPU. Generally speaking, the closer the storage device is to the CPU, the lower the speed gap between computing and I/O access, and the faster the data processing speed. However, the costs increase as speeds increase, with capacity typically decreasing as well. Cache pushes the data to be processed closer to the computation at the cost of more resource consumption, accelerating data processing operations, and filling the gap between computing and I/O access speeds.

For Spark, specifically, file systems, such as HDFS cache and Alluxio, provide file-level cache services. By caching files to the memory, the data processing speed is accelerated, and it is completely transparent to computing frameworks, such as Spark.

In addition, another method for caching is also available. If the same data needs to be processed multiple times, and the processing logic is similar, we can cache the intermediate results, so that each time we process the data, we start processing from the intermediate results, eliminating the computation from the original data to the intermediate results. Cached data is closer to the computing result. Compared with the original data, the results can be obtained through less computation, and the processing speed can also be accelerated. Materialized views in data warehouses are typical applications of this cache type.

Spark also provides dataset-level cache. You can use SQL DDLs or Dataset APIs to cache relational data (not files) with schema information located in the memory. Subsequent data processing based on the dataset can save time for computing the dataset by directly reading the cached data in the memory. Unlike the materialized view in data warehouses, the current Spark dataset cache still has many shortcomings:

  1. Spark cached dataset can only be reused in the same Spark Context, and cannot be shared across Spark contexts. When Spark Context exits, the cached data is also deleted.
  2. The dataset cache only supports precise matching and reuse of execution plans. In other words, only when the execution plans of subsequent queries accurately matches the execution plans of cached datasets, can cache be used to optimize queries, which greatly reduces the optimization scope of cache.
  3. The cached dataset data can only be stored in memory or local disk. Larger data volumes require more memory. The persistent data is serialized binary data with no data schema information, which is costly to deserialize and SQL optimization processing, such as project filter push-down, cannot be supported.

How Relational Cache Works

Because of the shortcomings mentioned above, Spark dataset cache is not widely used in practical applications, and it often cannot meet the requirements of interactive analysis scenarios, such as Star model-based multi-dimensional data analysis. Generally, the cube is built in advance and the SQL execution plan is rewritten, so to meet the sub-second level interactive analysis requirements. Relational Cache, conversely, aims to take into account the usability of Spark dataset cache and the optimization efficiency of materialized views. Its main objectives include:

  1. Users can cache any relational data, including tables, views or datasets. The cache support for any relational data can greatly extend the scope of use of the Relational Cache. Any use case that includes repeated computing or pre-determined computing logic, such as multi-dimensional data analysis, reports, dashboards, and ETL, may benefit from Relational Cache.
  2. The cached data can be stored in memory, local disk, or any datasource supported by Spark. Temporarily cached data stored in memory can be accessed quickly, but cross-Spark Context sharing is not supported. For cache with a large amount of data, for example, the materialized view or cube built by many enterprises, may reach the PB level. In this case, Relational Cache is obviously more suitable to be stored in persistent distributed file systems, such as HDFS and OSS.
  3. Cached data can be used to optimize any subsequent user queries that can be optimized.

EMR Spark extends Spark to implement Relational Cache. Our work includes the following:

  1. Extending the Spark SQL DDLs. The existing CACHE syntax is extended to support the addition, deletion, modification, and query of any table/view cache.
  2. Enabling Metastore to support caching meta information. Persistently cached metadata management is supported by Metastore.
  3. Extending Spark Catalyst to support the Cache Based Optimizer, which optimizes execution plans for subsequent queries using in-memory or persistent caches.
  4. Choosing the cache based on CBO. Multiple caches may meet the requirements for rewriting the execution plan, so we need to choose the appropriate cache for rewriting the final execution plan.

Using Relational Cache

Let’s see how you can create a relational cache.

CACHE [LAZY] TABLE table_name
[USING datasource
[OPTIONS (key1=val1, key2=val2, ...)]
[PARTITIONED BY (col_name1, col_name2, ...)]
[ZORDER BY (col_name3, col_name4, ...)]
[CLUSTERED BY (col_name5, col_name6, ...) INTO num_buckets BUCKETS]
[COMMENT table_comment]
[TBLPROPERTIES (key1=val1, key2=val2, ...)]]
[AS select_statement]

The syntax for creating a cache is shown above. We can use this syntax to cache any Spark table or view, and support data formats (such as Json, Parquet, and ORC), data sources (such as HDFS, and OSS), and the organization of cached data (such as Partition, Bucket and Z-order).

In the above syntax, REFRESH ON (DEMAND || COMMIT) specifies how the cache is updated - whether it is automatically updated when the base table data is updated (in COMMIT mode), or whether the user manually triggers the update by updating DDL (in DEMAND mode). Next, (ENABLE | DISABLE) REWRITE specifies whether to allow the cache to be used for subsequent execution plan optimization.

In addition, EMR Spark provides and extends more Relational Cache-related DDLs for adding, deleting, modifying, and querying the cache.

REFRESH TABLE cache_name

EMR Spark also provides session-level parameters to control whether to enable Relational Cache-based execution plan optimization. Users can enable or disable execution plan optimization through the spark.sql.cache.queryRewrite parameter.

Next, you can optimize queries with Relational Cache. The following is a simple example of how the Relational Cache optimizes Spark queries. The original SQL query is:

SELECT n_name, sum(o_totalprice)
FROM orders, customer, nation
WHERE o_custkey = c_custkey AND c_nationkey = n_nationkey
GROUP BY n_name

The corresponding physical execution plan includes two Join and two Aggregate operations, with an execution time of 16.9s, as shown below:

== Physical Plan ==
*(7) HashAggregate(keys=[n_name#36], functions=[sum(o_totalprice#10)])
+- Exchange hashpartitioning(n_name#36, 200)
+- *(6) HashAggregate(keys=[n_name#36], functions=[partial_sum(o_totalprice#10)])
+- *(6) Project [o_totalprice#10, n_name#36]
+- *(6) BroadcastHashJoin [c_nationkey#30L], [n_nationkey#35L], Inner, BuildRight
:- *(6) Project [o_totalprice#10, c_nationkey#30L]
: +- *(6) SortMergeJoin [o_custkey#8L], [c_custkey#27L], Inner
: :- *(2) Sort [o_custkey#8L ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(o_custkey#8L, 200)
: : +- *(1) Project [o_custkey#8L, o_totalprice#10]
: : +- *(1) Filter isnotnull(o_custkey#8L)
: : +- *(1) FileScan parquet tpch_sf100_parquet.orders[o_custkey#8L,o_totalprice#10,o_orderdate#15] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://emr-header-1:9000/tpch/sf100_parquet/tpch/sf100_parquet/orders], PartitionCount: 2406, PartitionFilters: [], PushedFilters: [IsNotNull(o_custkey)], ReadSchema: struct<o_custkey:bigint,o_totalprice:double>
: +- *(4) Sort [c_custkey#27L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(c_custkey#27L, 200)
: +- *(3) Project [c_custkey#27L, c_nationkey#30L]
: +- *(3) Filter (isnotnull(c_custkey#27L) && isnotnull(c_nationkey#30L))
: +- *(3) FileScan parquet tpch_sf100_parquet.customer[c_custkey#27L,c_nationkey#30L,c_mktsegment#34] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://emr-header-1:9000/tpch/sf100_parquet/tpch/sf100_parquet/customer], PartitionCount: 5, PartitionFilters: [], PushedFilters: [IsNotNull(c_custkey), IsNotNull(c_nationkey)], ReadSchema: struct<c_custkey:bigint,c_nationkey:bigint>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
+- *(5) Project [n_nationkey#35L, n_name#36]
+- *(5) Filter isnotnull(n_nationkey#35L)
+- *(5) FileScan parquet tpch_sf100_parquet.nation[n_nationkey#35L,n_name#36] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://emr-header-1:9000/tpch/sf100_parquet/tpch/sf100_parquet/nation], PartitionFilters: [], PushedFilters: [IsNotNull(n_nationkey)], ReadSchema: struct<n_nationkey:bigint,n_name:string>
Image for post
Image for post

A Relational Cache can be created in two ways. You can create a view first, and then cache the data of the view through cache syntax, as follows:

CREATE VIEW nation_cust_cache AS
SELECT n_name, o_custkey, c_custkey, n_nationkey, c_nationkey, o_totalprice, o_orderstatus, o_orderdate
FROM orders, customer, nation
WHERE o_custkey = c_custkey AND c_nationkey = n_nationkey;
CACHE TABLE nation_cust_cache
USING parquet;

Alternatively, you can directly create a view and cache data.

CACHE TABLE nation_cust_cache
USING parquet
SELECT n_name, o_custkey, c_custkey, n_nationkey, c_nationkey, o_totalprice, o_orderstatus, o_orderdate
FROM orders, customer, nation
WHERE o_custkey = c_custkey AND c_nationkey = n_nationkey;

After the data is cached, we re-execute the user query SQL. The execution plan is as follows:

== Physical Plan ==
*(2) HashAggregate(keys=[n_name#35], functions=[sum(o_totalprice#20)])
+- Exchange hashpartitioning(n_name#35, 200)
+- *(1) HashAggregate(keys=[n_name#35], functions=[partial_sum(o_totalprice#20)])
+- *(1) Project [o_totalprice#20, n_name#35]
+- *(1) Filter (((isnotnull(o_custkey#18L) && isnotnull(c_custkey#26L)) && isnotnull(c_nationkey#29L)) && isnotnull(n_nationkey#34L))
+- *(1) FileScan parquet tpch_sf100_parquet._cache_nation_cust_cache[n_name#35,o_custkey#18L,c_custkey#26L,n_nationkey#34L,c_nationkey#29L,o_totalprice#20] Batched: true, Format: Parquet, Location: FullScanFileMetaWithStats[hdfs://emr-header-1.cluster-100048:9000/user/hive/warehouse/tpch_sf100_..., PartitionFilters: [], PushedFilters: [IsNotNull(o_custkey), IsNotNull(c_custkey), IsNotNull(c_nationkey), IsNotNull(n_nationkey)], ReadSchema: struct<n_name:string,o_custkey:bigint,c_custkey:bigint,n_nationkey:bigint,c_nationkey:bigint,o_to...
Image for post
Image for post

We can see that the cache-based optimized execution plan directly reads data from the cache, saving the computing time of two Join operations. The overall execution time also reduced from 16.9s to 1.9s.


The powerful functions of the Relational Cache give Spark more possibilities. With the Relational Cache, you can cache any Relational data, such as tables, views, and datasets, to any data source supported by Spark in advance, and can flexibly organize the cached data. As such, the Relational Cache can help you accelerate Spark data analysis in many application scenarios. In specific application scenarios, for example, aggregation analysis of multi-dimensional data in the Star model, a sub-second response of PB-level data can be achieved.

Original Source

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store