Use Relational Cache to Accelerate EMR Spark in Data Analysis

How Relational Cache Works

Using Relational Cache

CACHE [LAZY] TABLE table_name
[REFRESH ON (DEMAND | COMMIT)]
[(ENABLE | DISABLE) REWRITE]
[USING datasource
[OPTIONS (key1=val1, key2=val2, ...)]
[PARTITIONED BY (col_name1, col_name2, ...)]
[ZORDER BY (col_name3, col_name4, ...)]
[CLUSTERED BY (col_name5, col_name6, ...) INTO num_buckets BUCKETS]
[COMMENT table_comment]
[TBLPROPERTIES (key1=val1, key2=val2, ...)]]
[AS select_statement]
UNCACHE TABLE [IF EXISTS] table_name
ALTER TABLE table_name (ENABLE | DISABLE) REWRITE
ALTER TABLE table_name REFRESH ON (DEMAND | COMMIT)
REFRESH TABLE cache_name
SHOW CACHES
(DESC | DESCRIBE) (EXTENDED | FORMATTED) table_name
SELECT n_name, sum(o_totalprice)
FROM orders, customer, nation
WHERE o_custkey = c_custkey AND c_nationkey = n_nationkey
GROUP BY n_name
== Physical Plan ==
*(7) HashAggregate(keys=[n_name#36], functions=[sum(o_totalprice#10)])
+- Exchange hashpartitioning(n_name#36, 200)
+- *(6) HashAggregate(keys=[n_name#36], functions=[partial_sum(o_totalprice#10)])
+- *(6) Project [o_totalprice#10, n_name#36]
+- *(6) BroadcastHashJoin [c_nationkey#30L], [n_nationkey#35L], Inner, BuildRight
:- *(6) Project [o_totalprice#10, c_nationkey#30L]
: +- *(6) SortMergeJoin [o_custkey#8L], [c_custkey#27L], Inner
: :- *(2) Sort [o_custkey#8L ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(o_custkey#8L, 200)
: : +- *(1) Project [o_custkey#8L, o_totalprice#10]
: : +- *(1) Filter isnotnull(o_custkey#8L)
: : +- *(1) FileScan parquet tpch_sf100_parquet.orders[o_custkey#8L,o_totalprice#10,o_orderdate#15] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://emr-header-1:9000/tpch/sf100_parquet/tpch/sf100_parquet/orders], PartitionCount: 2406, PartitionFilters: [], PushedFilters: [IsNotNull(o_custkey)], ReadSchema: struct<o_custkey:bigint,o_totalprice:double>
: +- *(4) Sort [c_custkey#27L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(c_custkey#27L, 200)
: +- *(3) Project [c_custkey#27L, c_nationkey#30L]
: +- *(3) Filter (isnotnull(c_custkey#27L) && isnotnull(c_nationkey#30L))
: +- *(3) FileScan parquet tpch_sf100_parquet.customer[c_custkey#27L,c_nationkey#30L,c_mktsegment#34] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://emr-header-1:9000/tpch/sf100_parquet/tpch/sf100_parquet/customer], PartitionCount: 5, PartitionFilters: [], PushedFilters: [IsNotNull(c_custkey), IsNotNull(c_nationkey)], ReadSchema: struct<c_custkey:bigint,c_nationkey:bigint>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
+- *(5) Project [n_nationkey#35L, n_name#36]
+- *(5) Filter isnotnull(n_nationkey#35L)
+- *(5) FileScan parquet tpch_sf100_parquet.nation[n_nationkey#35L,n_name#36] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://emr-header-1:9000/tpch/sf100_parquet/tpch/sf100_parquet/nation], PartitionFilters: [], PushedFilters: [IsNotNull(n_nationkey)], ReadSchema: struct<n_nationkey:bigint,n_name:string>
CREATE VIEW nation_cust_cache AS
SELECT n_name, o_custkey, c_custkey, n_nationkey, c_nationkey, o_totalprice, o_orderstatus, o_orderdate
FROM orders, customer, nation
WHERE o_custkey = c_custkey AND c_nationkey = n_nationkey;
CACHE TABLE nation_cust_cache
ENABLE REWRITE
USING parquet;
CACHE TABLE nation_cust_cache
ENABLE REWRITE
USING parquet
AS
SELECT n_name, o_custkey, c_custkey, n_nationkey, c_nationkey, o_totalprice, o_orderstatus, o_orderdate
FROM orders, customer, nation
WHERE o_custkey = c_custkey AND c_nationkey = n_nationkey;
== Physical Plan ==
*(2) HashAggregate(keys=[n_name#35], functions=[sum(o_totalprice#20)])
+- Exchange hashpartitioning(n_name#35, 200)
+- *(1) HashAggregate(keys=[n_name#35], functions=[partial_sum(o_totalprice#20)])
+- *(1) Project [o_totalprice#20, n_name#35]
+- *(1) Filter (((isnotnull(o_custkey#18L) && isnotnull(c_custkey#26L)) && isnotnull(c_nationkey#29L)) && isnotnull(n_nationkey#34L))
+- *(1) FileScan parquet tpch_sf100_parquet._cache_nation_cust_cache[n_name#35,o_custkey#18L,c_custkey#26L,n_nationkey#34L,c_nationkey#29L,o_totalprice#20] Batched: true, Format: Parquet, Location: FullScanFileMetaWithStats[hdfs://emr-header-1.cluster-100048:9000/user/hive/warehouse/tpch_sf100_..., PartitionFilters: [], PushedFilters: [IsNotNull(o_custkey), IsNotNull(c_custkey), IsNotNull(c_nationkey), IsNotNull(n_nationkey)], ReadSchema: struct<n_name:string,o_custkey:bigint,c_custkey:bigint,n_nationkey:bigint,c_nationkey:bigint,o_to...

Summary

Original Source

--

--

--

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Majority Element

Human Turing Machine Puzzles

Elixir Is Erlang, Not Ruby

Focus Time: Stay focused and beat procrastination counting the time

10 Things About jQuery i18n You May Not Have Known

“Request-URI Too Long” Error on the Final Screen of WLSDM Wizard

The 4 Rules No One Tells You About Documentation

6 Ways Development Field Will Evolve in 2020

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

More from Medium

About reading raw json files in spark

Apache Pulsar Edge IoT Applications with Python for TVOC

Triggers in Apache Airflow

Using Airflow and Spark operator to Add Partitions to Hive Metastore