Use Relational Cache to Accelerate EMR Spark in Data Analysis

How Relational Cache Works

Using Relational Cache

CACHE [LAZY] TABLE table_name
[REFRESH ON (DEMAND | COMMIT)]
[(ENABLE | DISABLE) REWRITE]
[USING datasource
[OPTIONS (key1=val1, key2=val2, ...)]
[PARTITIONED BY (col_name1, col_name2, ...)]
[ZORDER BY (col_name3, col_name4, ...)]
[CLUSTERED BY (col_name5, col_name6, ...) INTO num_buckets BUCKETS]
[COMMENT table_comment]
[TBLPROPERTIES (key1=val1, key2=val2, ...)]]
[AS select_statement]
UNCACHE TABLE [IF EXISTS] table_name
ALTER TABLE table_name (ENABLE | DISABLE) REWRITE
ALTER TABLE table_name REFRESH ON (DEMAND | COMMIT)
REFRESH TABLE cache_name
SHOW CACHES
(DESC | DESCRIBE) (EXTENDED | FORMATTED) table_name
SELECT n_name, sum(o_totalprice)
FROM orders, customer, nation
WHERE o_custkey = c_custkey AND c_nationkey = n_nationkey
GROUP BY n_name
== Physical Plan ==
*(7) HashAggregate(keys=[n_name#36], functions=[sum(o_totalprice#10)])
+- Exchange hashpartitioning(n_name#36, 200)
+- *(6) HashAggregate(keys=[n_name#36], functions=[partial_sum(o_totalprice#10)])
+- *(6) Project [o_totalprice#10, n_name#36]
+- *(6) BroadcastHashJoin [c_nationkey#30L], [n_nationkey#35L], Inner, BuildRight
:- *(6) Project [o_totalprice#10, c_nationkey#30L]
: +- *(6) SortMergeJoin [o_custkey#8L], [c_custkey#27L], Inner
: :- *(2) Sort [o_custkey#8L ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(o_custkey#8L, 200)
: : +- *(1) Project [o_custkey#8L, o_totalprice#10]
: : +- *(1) Filter isnotnull(o_custkey#8L)
: : +- *(1) FileScan parquet tpch_sf100_parquet.orders[o_custkey#8L,o_totalprice#10,o_orderdate#15] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://emr-header-1:9000/tpch/sf100_parquet/tpch/sf100_parquet/orders], PartitionCount: 2406, PartitionFilters: [], PushedFilters: [IsNotNull(o_custkey)], ReadSchema: struct<o_custkey:bigint,o_totalprice:double>
: +- *(4) Sort [c_custkey#27L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(c_custkey#27L, 200)
: +- *(3) Project [c_custkey#27L, c_nationkey#30L]
: +- *(3) Filter (isnotnull(c_custkey#27L) && isnotnull(c_nationkey#30L))
: +- *(3) FileScan parquet tpch_sf100_parquet.customer[c_custkey#27L,c_nationkey#30L,c_mktsegment#34] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://emr-header-1:9000/tpch/sf100_parquet/tpch/sf100_parquet/customer], PartitionCount: 5, PartitionFilters: [], PushedFilters: [IsNotNull(c_custkey), IsNotNull(c_nationkey)], ReadSchema: struct<c_custkey:bigint,c_nationkey:bigint>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
+- *(5) Project [n_nationkey#35L, n_name#36]
+- *(5) Filter isnotnull(n_nationkey#35L)
+- *(5) FileScan parquet tpch_sf100_parquet.nation[n_nationkey#35L,n_name#36] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://emr-header-1:9000/tpch/sf100_parquet/tpch/sf100_parquet/nation], PartitionFilters: [], PushedFilters: [IsNotNull(n_nationkey)], ReadSchema: struct<n_nationkey:bigint,n_name:string>
CREATE VIEW nation_cust_cache AS
SELECT n_name, o_custkey, c_custkey, n_nationkey, c_nationkey, o_totalprice, o_orderstatus, o_orderdate
FROM orders, customer, nation
WHERE o_custkey = c_custkey AND c_nationkey = n_nationkey;
CACHE TABLE nation_cust_cache
ENABLE REWRITE
USING parquet;
CACHE TABLE nation_cust_cache
ENABLE REWRITE
USING parquet
AS
SELECT n_name, o_custkey, c_custkey, n_nationkey, c_nationkey, o_totalprice, o_orderstatus, o_orderdate
FROM orders, customer, nation
WHERE o_custkey = c_custkey AND c_nationkey = n_nationkey;
== Physical Plan ==
*(2) HashAggregate(keys=[n_name#35], functions=[sum(o_totalprice#20)])
+- Exchange hashpartitioning(n_name#35, 200)
+- *(1) HashAggregate(keys=[n_name#35], functions=[partial_sum(o_totalprice#20)])
+- *(1) Project [o_totalprice#20, n_name#35]
+- *(1) Filter (((isnotnull(o_custkey#18L) && isnotnull(c_custkey#26L)) && isnotnull(c_nationkey#29L)) && isnotnull(n_nationkey#34L))
+- *(1) FileScan parquet tpch_sf100_parquet._cache_nation_cust_cache[n_name#35,o_custkey#18L,c_custkey#26L,n_nationkey#34L,c_nationkey#29L,o_totalprice#20] Batched: true, Format: Parquet, Location: FullScanFileMetaWithStats[hdfs://emr-header-1.cluster-100048:9000/user/hive/warehouse/tpch_sf100_..., PartitionFilters: [], PushedFilters: [IsNotNull(o_custkey), IsNotNull(c_custkey), IsNotNull(c_nationkey), IsNotNull(n_nationkey)], ReadSchema: struct<n_name:string,o_custkey:bigint,c_custkey:bigint,n_nationkey:bigint,c_nationkey:bigint,o_to...

Summary

Original Source

--

--

--

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

I don’t know anything about tableau

Python Progress # 9999999

MVPs and Iterating Your Way to a Finished Product

Agile methodology — Zomato (Assignment)

Technologies I learned and that I do not use

Using Microsoft Terminal? Let’s customize it with Oh My Zsh.

Containers and Cloud Native Technology: Realizing the Value of Cloud

What is Mobile Testing? Mobile App Testing Tools and Types

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

More from Medium

from the above table we can easily understand that 1st offset will process the 24 rows(4+12+08)…

Query data from Cross Region Cross Account AWS Glue Data catalog

Quill- Most efficient Scala driver for Apache Cassandra and Spark

Demystifying Streaming