Drilling into Big Data-Data preparation (5)

Alibaba Cloud
8 min readApr 16, 2019

--

By Priyankaa Arunachalam, Alibaba Cloud Tech Share Author. Tech Share is Alibaba Cloud’s incentive program to encourage the sharing of technical knowledge and best practices within the cloud community.

We have already introduced you to Hadoop and clusters in our previous article. At this point, you may be wondering why we need a multi-node cluster. Since most of the services will be running on the master, why don’t we just create a single-node cluster instead? There are two main reasons for using a multi-node cluster. First, the amount of data to be stored and processed can be too large for a single-node to handle. Second, the computational power of a single-node cluster can be limited.

Data Preparation

As discussed in the previous articles, we have data which we collect for a reason, but left it as is without any analysis. For businesses, there is no meaning of obtaining data and keeping it as it is. Data preparation means preparing or transforming the raw data into refined information, which can be used effectively for various business purposes and analysis. So our ultimate goal is to turn data into information and information into insight, which can help you in various aspects of decision making and business improvements. Data processing or preparation is not a new term to look at, as it has been there from the beginning days when processing has been done manually. Now that data has become big, and it is time to perform processing by automatic means to save time and arrive at better accuracy.

If you browse for the top five Big Data processing frameworks, you will find this list of words popped up

  • Hadoop
  • Spark
  • Storm
  • Flink
  • Samza

The first 2 among the 5 frameworks are the well-known and most implemented among various projects. They are also mainly batch processing frameworks. It seems like they are similar, but there is much difference between these two. Let’s have a quick look at a comparative analysis

CriteriaSparkHadoop MapReduceProcessingIn-memoryPersists on the disk after map and reduce functionsEase of useEasy due to support of Scala and pythonTough as only Java is supportedSpeedRuns applications 100 times fasterSlowerLatencyLowerHigherTask SchedulingSchedules tasks by itselfRequires external schedulers

According to the table, there are various factors which made us jump from MapReduce to Spark. Another simple reason is its ease of use, as it comes with user-friendly APIs for Scala, Java, Python, and Spark SQL. Spark SQL is similar to SQL 92, hence it’s easy even for the beginners. Some of the key features that make Spark a strong big data processing engine are

  • Equipped with MLlib
  • Supports multiple languages like Scala, Python and Java
  • A single library is capable of performing SQL, graph analytics and streaming
  • Stores data in RAM of the servers which makes access easier and analytics faster
  • Real time processing
  • Compatible with Hadoop(works independently and on top of Hadoop)

Spark over Storm

We compared the first two and arrived at a solution. Sometimes, people may prefer the third stack too, which is Storm. Both are common stack for real time processing and analytics. Storm is a pure streaming framework but many features like MLlib are not available in Storm as it is a smaller framework. Spark is preferred over storm for details like scaling up and scaling down of services. It’s better to know the differences to switch between various tools based on the requirement. In this article, we will focus on Spark, a widely used processing tool.

Components of Spark

Apache Spark Ecosystem

  • Spark Core — It is the base which consists of general execution engine used for dispatching and scheduling.
  • Spark SQL — It is a component on top of Spark Core which introduces a new set of data abstraction called Schema RDD. This supports both structured and semi-structured data.
  • Spark Streaming — This component enables fault-tolerant real-time processing of live data streams which provides an API to manipulate data streams
  • MLlib (Machine Learning Library) — Apache Spark is equipped with MLlib which contains a wide array of machine learning algorithms, collaboration filters, etc.

Applications

Some applications of Apache Spark are

  • Machine Learning — As known, Apache Spark is equipped with Machine Learning Library called MLlib that can perform advanced analytics such as clustering, classification, etc.
  • Event detection — Spark streaming allows the organization to keep a track of rare and unusual behaviors for protecting the system.

Spark Compatibility

  • File Formats: Spark supports all the file formats supported by Hadoop from unstructured like text to structured, like Sequence Files. But as discussed earlier, using appropriate file formats can result in better performance.
  • File systems: Local, Amazon s3, HDFS, etc.
  • Databases: Supports many like Cassandra, Hbase, Elastic search with the help of Hadoop connectors and custom spark connectors

Usage

Spark is a powerful tool which provides an interactive shell to analyze data in an interactive manner. The points below will highlight on opening, using and closing a spark shell.

Opening Spark Shell

Generally, spark is built using Scala. Type the following command to initiate the spark shell.

$ spark-shell

If the Spark shell opens successfully then you will find the following screen. The last line of the output “Spark context available as sc” means spark has automatically created spark context object with the name sc. If this is not there, then before starting, create a SparkContext object

Now you are all set to carry on with Scala programs

Press “Ctrl+z” to come out of spark shell if needed.

Spark context

The SparkContext can connect to several types of cluster managers, which can allocate resources across applications. Let me show two different scenarios with two different languages.

Let’s find out Museum count by state from the data ingested, using Scala and write back the output as csv file into hadoop

import java.io._
import scala.Array._
import scala.io._
import java.io.BufferedOutputStream
import java.io.FileOutputStream
import java.io.InputStream
import java.io.OutputStream
import java.util.Calendar
import java.lang._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.conf.Configured
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.IOUtils
import org.apache.hadoop.util.Tool
import org.apache.hadoop.util.ToolRunner
import org.apache.spark.sql.SparkSession
//import com.databricks.spark.avro._
import java.util.Calendar
object museum {
def main(args: Array [String]) {
println (Calendar.getInstance ().getTime())
var cols=""
val spark1 = org.apache.spark.sql.SparkSession.builder.master("local").appName("Spark Avro Reader").getOrCreate
var df1 = spark1.read.format("com.databricks.spark.csv").option("header","True").option("escape","\"").load("hdfs://emr-header-1.cluster-95904:9000/user/demo/tripadvisor_merged.csv").coalesce(1)
df1.createOrReplaceTempView("museum")
val df2 = spark1.sql("select State, count(MuseumName) Museum_Count from museum group by State")
var flag=0
df2.foreachPartition(itr =>{
val conf = new Configuration()
conf.set("fs.defaultFS", "hdfs://emr-header-1.cluster-95904:9000")
val fs= FileSystem.get(conf)
val output = fs.create(new Path("/user/ogs/etl/processed/MUSEUM_COUNT_BY_STATE/MUSEUM_COUNT_BY_STATE.csv"))
val pw1 = new PrintWriter(output)
if(flag==0){ cols="State"+","+"Count"+"\n" ; pw1.write(cols) ; flag=1 }
while(itr.hasNext) {
val item = itr.next().toString()
val l=item.length
cols =item.toString().substring(1, l-1)
cols=cols.concat("\n")
pw1.write(cols)
//println(cols)
}
pw1.close
})

Here spark reads this file remembering it as a comma separated file. But a column named Address in this sheet, has commas by itself. So to avoid splitting them into different columns, we use “escape” here. Scala is dependent on Java and hence there is a need to import various libraries. Let’s make it short using “pyspark”

Before initiating Spark with Python, install the needed libraries. Here I am installing pandas which is used for efficient file handling.

Now initiate the shell using “pyspark” command

Let’s find out the top 10 museums by visitor count. The following code makes use of Spark SQL and the conventions of Pyspark shell. You can also make use of pandas data frame to read and process a file. But using Spark reading and writing formats ends up in better efficiency.

import pandas as pd
from pyspark.sql import SparkSession
df = spark.read.format("com.databricks.spark.csv").option("header","True").option("escape", "\"").load("hdfs://emr-header-1.cluster-95904:9000/user/demo/sqoop/tripadvisor_merged.csv")
df.createOrReplaceTempView("family")
from pyspark.sql.functions import lit
df1 = spark.sql("select MuseumName,Families_Count Count from (select MuseumName,Families_Count,rank() over(order by length(Families_Count) desc, Families_Count desc) rank from family) where rank <=30").withColumn("Visitor_Type", lit("Families_Count"))
df2 = spark.sql("select MuseumName ,Couples_Count Count from (select MuseumName,Couples_Count,rank() over(order by length(Couples_Count) desc,Couples_Count desc) rank from family) where rank <=10").withColumn("Visitor_Type", lit("Couples_Count"))
df3 = spark.sql("select MuseumName,Solo_Count Count from (select MuseumName,Solo_Count,rank() over(order by length(Solo_Count) desc,Solo_Count desc) rank from family) where rank <=10").withColumn("Visitor_Type", lit("Solo_Count"))
df4 = spark.sql("select MuseumName,Business_Count Count from (select MuseumName,Business_Count,rank() over(order by length(Business_Count) desc,Business_Count desc) rank from family) where rank <=10").withColumn("Visitor_Type", lit("Business_Count"))
df5 = spark.sql("select MuseumName,Friends_Count Count from (select MuseumName,Friends_Count,rank() over(order by length(Friends_Count) desc,Friends_Count desc) rank from family) where rank <=10").withColumn("Visitor_Type", lit("Friends_Count"))
df6 = df1.unionAll(df2).unionAll(df3).unionAll(df4).unionAll(df5)
df6.write.csv('/user/demo/spark/top_museums_by_count.csv')

We can also save this script with .py extension and submit the application using spark-submit. We had various counts to be found out. Hence we created separate DataFrames and merged them using union. Sorting is done by ordering of first digit as oppose to the number, if you are using normal sort code. So the result will be something like below

In this case, include the length of the column also for exact results.

For example,

df1 = spark.sql ("select MuseumName, Families_Count from family order by length(Families_Count) desc, Families_Count desc")

Once done, write the Spark DataFrame as a CSV file. The default behaviour is to save the output in multiple part-*.csv files in the provided path. Let’s query the folder where we wrote back. You can see “top_museums.csv” which is not a csv file but a directory in which your output is saved in multiple parts. This structure of folder reference plays a major role in distributed storage and processing.

Suppose, I have to save a Dataframe with

  • Path which maps to the exact file name instead of folder
  • Write as a single file instead of multiple files

Then, coalesce the DF and then save the file.

Benefits of Spark on Alibaba Cloud

Adaptive execution

Spark SQL of Alibaba Cloud supports adaptive execution. It is used to set the number of reduce tasks automatically and solve data skew by itself. By setting the range of the shuffle partition number, the adaptive execution framework of Spark SQL can dynamically adjust the number of reduce tasks at different stages of different jobs.

Data skew

Data skew refers to the scenario where certain tasks involve too much data in the processing. Spark SQL does not perform optimization for skewed data, which can be solved by the Adaptive Execution framework of Spark SQL. This can automatically detect skewed data and perform run time optimizations

Best Practices

  • Do not collect large RDD and prefer the default dataset API over RDD, if possible.
  • Avoid UDF and replace them with Spark SQL functions
  • On executing HDFS read/write job, set the number of concurrent jobs for each executor to be less than or equal to 5 for reading and writing.
  • A general tip is to look for the execution time and boost the job accordingly

Hope you enjoyed learning Spark. Our next steps would be to explore creating and submitting various jobs using Alibaba Cloud UI, as well as to perform querying and analysis. In the next article, we will walk you through the basics of Hive, including table creation and other underlying concepts for big data applications.

“The goal is to turn data into information and information into insight,” Carly Fiorina

Reference:https://www.alibabacloud.com/blog/drilling-into-big-data-data-preparation-5_594671?spm=a2c41.12760976.0.0

--

--

Alibaba Cloud
Alibaba Cloud

Written by Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

No responses yet