Building a Cloud-Native Feed Streaming System with Apache Kafka and Spark on Alibaba Cloud — Part B: Streaming Processing

  • Part A: Step-by-Step Guide on Feed Stream System Service Setup on Alibaba Cloud
  • Part B: Demo Solution on Building the Spark Streaming Project for Streaming Processing
  • Part C: Demo Solution on Building the Spark MLlib for intelligent profile tagging

Data Lake Analytics (DLA) Serverless Spark Overview

  • The value of spark.dla.eni.enable is true, which indicates that VPC is enabled.
  • The value of spark.dla.eni.vswitch.id is the VSwitch ID obtained from your selected VSwitch ID using the ECS instance.
  • The value of spark.dla.eni.security.group.id is the security group ID obtained from your selected security group ID using the ECS instance.

Get Ready for the Spark-SQL Command-Line Tool

wget https://dla003.oss-cn-hangzhou.aliyuncs.com/dla_spark_toolkit_1/dla-spark-toolkit.tar.gz
tar zxvf dla-spark-toolkit.tar.gz
#  cluster information
# AccessKeyId
keyId = [AccessKeyId]
# AccessKeySecret
secretId = [AccessKeySecret]
# RegionId
regionId = [RegionId]
# set spark virtual cluster vcName
vcName = [vcName]
# set OssUploadPath, if you need upload local resource
ossUploadPath = [ossUploadPath]
##spark conf
# driver specifications : small 1c4g | medium 2c8g | large 4c16g
spark.driver.resourceSpec = small
# executor instance number
spark.executor.instances = 2
# executor specifications : small 1c4g | medium 2c8g | large 4c16g
spark.executor.resourceSpec = small
# when use ram, role arn
#spark.dla.roleArn =
# when use option -f or -e, set catalog implementation
#spark.sql.catalogImplementation =
# config dla oss connectors
# spark.dla.connectors = oss
# config eni, if you want to use eni
spark.dla.eni.enable = true
spark.dla.eni.vswitch.id = [vswitch ID from your ecs]
spark.dla.eni.security.group.id = [security group ID from your ecs]
# config spark read dla table
#spark.sql.hive.metastore.version = dla
./bin/spark-submit --verbose --class [class.name] --name [task.name]  --files [oss path log4j.property file] [spark jar] [arg0] [arg1] ...

Developing the DLA Demo Solution on the Feed Streams System

Figure 1. Feed Streams Processor Solution Overview
  1. Publishing financial news updates to Kafka MQ
  2. Subscribing to Kafka MQ
  3. Enriching the financial news with reference data and transform it into financial records
  4. Saving the financial records into HBase
  5. Publishing the financial records into Kafka MQ

Step 1. Design and Provision the HBase Table with Phoenix

Figure 2. DBeaver Setup for Apache Phoenix Database Connection
Figure 3. DBeaver Setup for Apache Phoenix Database Connection — Properties
CREATE TABLE USER_TEST.STOCK (ROWKEY VARCHAR PRIMARY KEY,"STOCK_CODE" VARCHAR, "STOCK_NAME" VARCHAR, "LAST_PRICE" DECIMAL(12,4),"TIMESTAMP" TIMESTAMP ) SALT_BUCKETS = 16;
create index STOCK_code_index ON USER_TEST.STOCK ("STOCK_CODE")  SALT_BUCKETS = 16;
SELECT * FROM USER_TEST.STOCK 
WHERE "STOCK_CODE" = '2318.HK' -- Filtering on secondary index
ORDER BY "TIMESTAMP" desc
;

Step 2. Load the Reference Dataset Into MongoDB

Step 3. Set up the Scala Project With Maven

Step 4. Simulating the Publish of Financial Updates

val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "30000");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
val kafkaProducer = new KafkaProducer[String, String](props)
for (nEvents <- Range(0, events)) {
val runtime = new Date().getTime()
val ip = "192.168.2." + rnd.nextInt(255)
val msg = SKUs.HSIStocks(rnd.nextInt(SKUs.HSIStocks.length-1)) + "," + runtime;
val data = new ProducerRecord[String, String](topicName, ip, msg)
kafkaProducer.send(data)
Thread.sleep(1000)
}

Step 5. Simulating the Subscription and Enrichment of Financial Updates

  1. Loading static reference data from MongoDB with MongoSpark.load(sparkSession, loadMongo())...
  2. Loading streaming financial updates from Kafka with SparkSession.readStream.format("kafka")...
  3. Joining streaming financial updates and the static reference data
  4. Save the joint dataset into HBase
val sparkSession = loadConfig();
val dfStatic = MongoSpark.load(sparkSession, loadMongo())
.selectExpr("symbol as Symbol", "CompanyName", "LastPrice", "Volume")
val dfStream = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topicName)
.option("group.id", groupId)
.load()
.selectExpr("CAST(key AS STRING) as SourceIP", "split(value, ',')[0] as StockCode", "CAST(split(value, ',')[1] AS long)/1000 as TS")
val dfJoin = dfStream
.join(dfStatic, dfStream("StockCode") === dfStatic("Symbol"), "leftouter")
.selectExpr("concat_ws('|', nvl(SourceIP, ''), nvl(StockCode, '')) as ROWKEY", "StockCode as stock_code", "CompanyName as stock_name", "LastPrice as last_price", "from_unixtime(TS) as timestamp")
SparkHBaseWriter.open();

val query = dfJoin
.writeStream
.foreachBatch { (output: DataFrame, batchId: Long) =>
SparkHBaseWriter.process(output);
}
.start
.awaitTermination();
SparkHBaseWriter.close();
object SparkHBaseWriter {
var address: String = null;
var zkport: String = null;
var driver: String = null;
def open(): Unit = {
address = ConfigFactory.load().getConfig("com.alibaba-inc.cwchan").getConfig("HBase").getString("zookeeper")
zkport = ConfigFactory.load().getConfig("com.alibaba-inc.cwchan").getConfig("HBase").getString("zkport")
driver = "org.apache.phoenix.spark"
}
def close(): Unit = {
}
def process(record: DataFrame): Unit = {
record.write
.format(driver)
.mode(SaveMode.Overwrite)
.option("zkUrl", address.split(",")(0) + ":" + zkport)
.option("table", "USER_TEST.STOCK")
.save()
}
}
}

Step 6. Running in Local Standalone Mode with IDE (e.g., Eclipse)

SELECT * FROM USER_TEST.STOCK 
ORDER BY "TIMESTAMP" desc
;
SELECT * FROM USER_TEST.STOCK 
WHERE "STOCK_CODE" = '2318.HK'
ORDER BY "TIMESTAMP" desc
;

Step 7. Running in Cluster Mode in DLA at Alibaba Cloud

Figure 4. Compile the Scala Project Into an Uber Jar for Deployment
[root@centos7 dla-spark-toolkit]# pwd
/root/dla-spark-toolkit
[root@centos7 dla-spark-toolkit]# ./bin/spark-submit --verbose --class com.alibabacloud.cwchan.SparkApp --name spark-kafka-test --files oss://dla-spark-hk/dla-jars/log4j.properties /root/dla-spark-demo-0.0.1-SNAPSHOT-shaded.jar run sub

Go to the DLA Management Console to Access the SparkUI

Figure 5. Data Lake Analytics Management Page
Figure 6. SparkUI of DLA

Original Source:

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com