Using PostgreSQL for Real-Time IoT Stream Processing Applications

Alibaba Cloud
DataSeries
Published in
14 min readOct 14, 2019

--

By Digoal

Background

One feature of the Internet of Things (IoT) is that everything is interconnected and a huge amount of data is generated. Some typical applications of stream processing include:

  • Drug supervision: From production, to transportation, to pharmacies, to sales, the information on each box of medicine is recorded on every node that it passes through.
  • Personal health and infrastructure monitoring: For example, health wristbands, location tracking watches for children, sensors for animal migration research (such as the Chinese sturgeon), water pattern monitoring, power grid monitoring, gas pipeline monitoring, and meteorological monitoring.
  • Real-time financial data monitoring: Real-time stock price prediction.
  • IoV: Real-time traffic statistics and real-time merging of vehicle trajectories. For example, real-time trajectory monitoring and alarms, real-time stop alarms, real-time vehicle trajectory deviation alarms, and real-time alarms for deviations between the vehicle odometer and actual mileage.
  • Smart shopping malls: Real-time customer flow statistics in shopping malls.
  • IT infrastructure monitoring: Real-time data monitoring, such as database monitoring, server monitoring, and operating system monitoring. Diversified sensors are deployed, and the sensors collect massive volumes of data.

In these scenarios, the data volume is often larger than that generated during Alibaba’s Double 11 Shopping Festival. How should we process such massive data? How can we implement real-time streaming data processing?

In this article, we will take a look at how PostgreSQL can be used for Stream Processing in IoT applications for real-time processing of trillions of data records per day. PostgreSQL provides a good stream-based data processing product, with a real-time computing capability up to 100,000 records per second on a single common X86 server.

Setting up Streaming Application with PipelineDB

Download and install PipelineDB, which is a PostgreSQL-based streaming data processing database.

# wget https://s3-us-west-2.amazonaws.com/download.pipelinedb.com/pipelinedb-0.8.5-centos6-x86_64.rpm    
#rpm -ivh pipelinedb-0.8.5-centos6-x86_64.rpm --prefix=/home/digoal/pipelinedb

Configure the environment variable script.

$vi env_pipe.sh     

export PS1="$USER@`/bin/hostname -s`-> "
export PGPORT=1922
export PGDATA=/disk1/digoal/pipe/pg_root
export LANG=en_US.utf8
export PGHOME=/home/digoal/pipelinedb
export LD_LIBRARY_PATH=/home/digoal/scws/lib:$PGHOME/lib:/lib64:/usr/lib64:/usr/local/lib64:/lib:/usr/lib:/usr/local/lib:$LD_LIBRARY_PATH
export DATE=`date +"%Y%m%d%H%M"`
export PATH=/home/digoal/scws/bin:$PGHOME/bin:$PATH:.
export MANPATH=$PGHOME/share/man:$MANPATH
export PGHOST=$PGDATA
export PGUSER=postgres
export PGDATABASE=pipeline
alias rm='rm -i'
alias ll='ls -lh'
unalias vi

$ . ./env_pipe.sh

Initialize the database.

$ pipeline-init -D $PGDATA -U postgres -E UTF8 --locale=C -W

Configure parameters.

$ cd $PGDATA    
$ vi pipelinedb.conf
listen_addresses = '0.0.0.0' # what IP address(es) to listen on;
port = 1922 # (change requires restart)
max_connections = 200 # (change requires restart)
unix_socket_directories = '.' # comma-separated list of directories
shared_buffers = 8GB # min 128kB
maintenance_work_mem = 640MB # min 1MB
dynamic_shared_memory_type = posix # the default is the first option
synchronous_commit = off # synchronization level;
wal_buffers = 16MB # min 32kB, -1 sets based on shared_buffers
wal_writer_delay = 10ms # 1-10000 milliseconds
checkpoint_segments = 400 # in logfile segments, min 1, 16MB each
log_destination = 'csvlog' # Valid values are combinations of
logging_collector = on # Enable capturing of stderr and csvlog
log_timezone = 'PRC'
datestyle = 'iso, mdy'
timezone = 'PRC'
lc_messages = 'C' # locale for system error message
lc_monetary = 'C' # locale for monetary formatting
lc_numeric = 'C' # locale for number formatting
lc_time = 'C' # locale for time formatting
default_text_search_config = 'pg_catalog.english'
continuous_query_combiner_work_mem = 1GB
continuous_query_batch_size = 100000
continuous_query_num_combiners = 8
continuous_query_num_workers = 4
continuous_queries_enabled = on

Start the database.

$ pipeline-ctl start

Scenario 1: Sensor Data Analysis

Assume that the sensor uploads three types of data: sensor ID, time, and sample value.

gid, crt_time, val

The application needs to collect real-time statistics to obtain the maximum value, minimum value, average value, and count of values uploaded by each sensor every minute, every hour, and every day.

Create three continuous views, each of which represents a statistical dimension.

The procedure is as follows.

Create streams to consume data from the table.

pipeline=# CREATE CONTINUOUS VIEW sv01  AS SELECT gid::int,date_trunc('min',crt_time::timestamp),max(val::int),min(val),avg(val),count(val) FROM stream01 group by gid,date_trunc('min',crt_time);     

pipeline=# CREATE CONTINUOUS VIEW sv02 AS SELECT gid::int,date_trunc('hour',crt_time::timestamp),max(val::int),min(val),avg(val),count(val) FROM stream01 group by gid,date_trunc('hour',crt_time);

pipeline=# CREATE CONTINUOUS VIEW sv03 AS SELECT gid::int,date_trunc('day',crt_time::timestamp),max(val::int),min(val),avg(val),count(val) FROM stream01 group by gid,date_trunc('day',crt_time);

Activate streams.

pipeline=# activate;    
ACTIVATE

Insert data for testing.

pipeline=# insert into stream01(gid,val,crt_time) values (1,1,now());    
INSERT 0 1
pipeline=# select * from sv01;
gid | date_trunc | max | min | avg | count
-----+---------------------+-----+-----+------------------------+-------
1 | 2015-12-15 13:44:00 | 1 | 1 | 1.00000000000000000000 | 1
(1 row)

pipeline=# select * from sv02;
gid | date_trunc | max | min | avg | count
-----+---------------------+-----+-----+------------------------+-------
1 | 2015-12-15 13:00:00 | 1 | 1 | 1.00000000000000000000 | 1
(1 row)

pipeline=# select * from sv03;
gid | date_trunc | max | min | avg | count
-----+---------------------+-----+-----+------------------------+-------
1 | 2015-12-15 00:00:00 | 1 | 1 | 1.00000000000000000000 | 1
(1 row)

The stress testing result is as follows:

Assume that 0.1 million sensors are available, and the values uploaded by sensors range from 1 to 100.

$ vi test.sql    
\setrandom gid 1 100000
\setrandom val 1 100
insert into stream01(gid,val,crt_time) values (:gid,:val,now());

./pgsql9.5/bin/pgbench -M prepared -n -r -f ./test.sql -P 5 -c 24 -j 24 -T 100
progress: 5.0 s, 95949.9 tps, lat 0.247 ms stddev 0.575
progress: 10.0 s, 98719.9 tps, lat 0.240 ms stddev 0.549
progress: 15.0 s, 100207.8 tps, lat 0.237 ms stddev 0.573
progress: 20.0 s, 101596.4 tps, lat 0.234 ms stddev 0.517
progress: 25.0 s, 102830.4 tps, lat 0.231 ms stddev 0.492
progress: 30.0 s, 103055.0 tps, lat 0.230 ms stddev 0.488
progress: 35.0 s, 102786.0 tps, lat 0.231 ms stddev 0.482
progress: 40.0 s, 99066.3 tps, lat 0.240 ms stddev 0.578
progress: 45.0 s, 102912.5 tps, lat 0.231 ms stddev 0.494
progress: 50.0 s, 100398.2 tps, lat 0.236 ms stddev 0.530
progress: 55.0 s, 105719.8 tps, lat 0.224 ms stddev 0.425
progress: 60.0 s, 99041.0 tps, lat 0.240 ms stddev 0.617
progress: 65.0 s, 97087.0 tps, lat 0.245 ms stddev 0.619
progress: 70.0 s, 95312.6 tps, lat 0.249 ms stddev 0.653
progress: 75.0 s, 98768.3 tps, lat 0.240 ms stddev 0.593
progress: 80.0 s, 106203.8 tps, lat 0.223 ms stddev 0.435
progress: 85.0 s, 103423.1 tps, lat 0.229 ms stddev 0.480
progress: 90.0 s, 106143.5 tps, lat 0.223 ms stddev 0.429
progress: 95.0 s, 103514.5 tps, lat 0.229 ms stddev 0.478
progress: 100.0 s, 100222.8 tps, lat 0.237 ms stddev 0.547
transaction type: Custom query
scaling factor: 1
query mode: prepared
number of clients: 24
number of threads: 24
duration: 100 s
number of transactions actually processed: 10114821
latency average: 0.235 ms
latency stddev: 0.530 ms
tps = 101089.580065 (including connections establishing)
tps = 101101.483296 (excluding connections establishing)
statement latencies in milliseconds:
0.003051 \setrandom gid 1 100000
0.000866 \setrandom val 1 100
0.230430 insert into stream01(gid,val,crt_time) values (:gid,:val,now());

The application needs to process about 0.1 million records per second. The statistical dimension is shown in the preceding stream SQL.

After several rounds of testing, the result is as follows:

pipeline=# select sum(count) from sv03;    
sum
----------
53022588
(1 row)

pipeline=# select * from sv01 limit 10;
gid | date_trunc | max | min | avg | count
-------+---------------------+-----+-----+------------------------+-------
1 | 2015-12-15 13:44:00 | 1 | 1 | 1.00000000000000000000 | 1
53693 | 2015-12-15 13:47:00 | 68 | 1 | 28.0000000000000000 | 6
588 | 2015-12-15 13:47:00 | 88 | 11 | 47.6250000000000000 | 8
60154 | 2015-12-15 13:47:00 | 95 | 1 | 40.9090909090909091 | 11
38900 | 2015-12-15 13:47:00 | 90 | 17 | 57.2000000000000000 | 5
12784 | 2015-12-15 13:47:00 | 93 | 13 | 64.1250000000000000 | 8
79782 | 2015-12-15 13:47:00 | 60 | 16 | 43.1666666666666667 | 6
5122 | 2015-12-15 13:47:00 | 100 | 3 | 46.8333333333333333 | 12
97444 | 2015-12-15 13:47:00 | 98 | 9 | 59.5833333333333333 | 12
34209 | 2015-12-15 13:47:00 | 86 | 13 | 52.2857142857142857 | 7
(10 rows)

pipeline=# select * from sv02 limit 10;
gid | date_trunc | max | min | avg | count
-------+---------------------+-----+-----+---------------------+-------
91065 | 2015-12-15 14:00:00 | 100 | 0 | 51.4299065420560748 | 321
24081 | 2015-12-15 14:00:00 | 100 | 0 | 52.1649831649831650 | 297
29013 | 2015-12-15 14:00:00 | 100 | 0 | 50.9967213114754098 | 305
13134 | 2015-12-15 14:00:00 | 100 | 0 | 49.6968750000000000 | 320
84691 | 2015-12-15 14:00:00 | 100 | 0 | 49.5547445255474453 | 274
91059 | 2015-12-15 14:00:00 | 100 | 1 | 47.7536764705882353 | 272
50115 | 2015-12-15 14:00:00 | 100 | 1 | 49.4219269102990033 | 301
92610 | 2015-12-15 14:00:00 | 100 | 0 | 50.1197183098591549 | 284
36616 | 2015-12-15 14:00:00 | 100 | 1 | 48.8750000000000000 | 312
46390 | 2015-12-15 14:00:00 | 99 | 0 | 48.3246268656716418 | 268
(10 rows)

pipeline=# select * from sv03 limit 10;
gid | date_trunc | max | min | avg | count
-------+---------------------+-----+-----+---------------------+-------
68560 | 2015-12-15 00:00:00 | 100 | 0 | 51.2702702702702703 | 555
42241 | 2015-12-15 00:00:00 | 100 | 0 | 49.5266903914590747 | 562
64946 | 2015-12-15 00:00:00 | 100 | 0 | 48.2409177820267686 | 523
2451 | 2015-12-15 00:00:00 | 100 | 0 | 49.8153564899451554 | 547
11956 | 2015-12-15 00:00:00 | 100 | 0 | 51.2382739212007505 | 533
21578 | 2015-12-15 00:00:00 | 100 | 0 | 49.2959558823529412 | 544
36451 | 2015-12-15 00:00:00 | 100 | 0 | 51.1292035398230088 | 565
62380 | 2015-12-15 00:00:00 | 100 | 0 | 48.9099437148217636 | 533
51946 | 2015-12-15 00:00:00 | 100 | 0 | 51.0318091451292247 | 503
35084 | 2015-12-15 00:00:00 | 100 | 0 | 49.3613766730401530 | 523
(10 rows)

Scenario 2: Vehicle Sensor Data Analysis

Assume that the location information is uploaded at a regular interval in the vehicle running process.

gid, crt_time, poi

The application needs to draw the vehicle trajectory by day by aggregating multiple points into a certain path type, array type, or string type.

Assume that there are 10 million vehicles and each vehicle uploads the coordinate and time information, or a batch of information.

The application requirements are as follows:

  • Draw the vehicle trajectory by day.
  • Collect hourly statistics on the number of vehicles passing through each region.
  • Create a stream. Assume that the point information has been binary encoded and expressed in INT8 to facilitate stress testing.
CREATE CONTINUOUS VIEW sv04  AS SELECT gid::int,date_trunc('day',crt_time::timestamp),array_agg(poi::int8||' -> '||crt_time) FROM stream02 group by gid,date_trunc('day',crt_time);

The stress testing result is as follows:

$ vi test.sql    
\setrandom gid 1 10000000
\setrandom poi 1 1000000000
insert into stream02(gid,poi,crt_time) values (:gid,:poi,now());

./pgsql9.5/bin/pgbench -M prepared -n -r -f ./test.sql -P 5 -c 24 -j 24 -T 100
progress: 5.0 s, 106005.0 tps, lat 0.223 ms stddev 0.370
progress: 10.0 s, 109884.8 tps, lat 0.216 ms stddev 0.347
progress: 15.0 s, 111122.1 tps, lat 0.213 ms stddev 0.368
progress: 20.0 s, 111987.0 tps, lat 0.212 ms stddev 0.353
progress: 25.0 s, 111835.4 tps, lat 0.212 ms stddev 0.363
progress: 30.0 s, 111759.7 tps, lat 0.212 ms stddev 0.366
progress: 35.0 s, 112110.4 tps, lat 0.211 ms stddev 0.358
progress: 40.0 s, 112185.4 tps, lat 0.211 ms stddev 0.352
progress: 45.0 s, 113080.0 tps, lat 0.210 ms stddev 0.345
progress: 50.0 s, 113205.4 tps, lat 0.209 ms stddev 0.353
progress: 55.0 s, 113415.1 tps, lat 0.209 ms stddev 0.352
progress: 60.0 s, 113519.8 tps, lat 0.209 ms stddev 0.342
progress: 65.0 s, 112683.6 tps, lat 0.210 ms stddev 0.358
progress: 70.0 s, 112748.3 tps, lat 0.210 ms stddev 0.360
progress: 75.0 s, 112158.9 tps, lat 0.211 ms stddev 0.373
progress: 80.0 s, 112580.8 tps, lat 0.210 ms stddev 0.355
progress: 85.0 s, 111895.5 tps, lat 0.212 ms stddev 0.370
progress: 90.0 s, 112229.2 tps, lat 0.211 ms stddev 0.442
progress: 95.0 s, 104915.8 tps, lat 0.226 ms stddev 2.852
progress: 100.0 s, 103079.9 tps, lat 0.230 ms stddev 2.054
transaction type: Custom query
scaling factor: 1
query mode: prepared
number of clients: 24
number of threads: 24
duration: 100 s
number of transactions actually processed: 11112035
latency average: 0.213 ms
latency stddev: 0.836 ms
tps = 111106.652772 (including connections establishing)
tps = 111118.651135 (excluding connections establishing)
statement latencies in milliseconds:
0.002939 \setrandom gid 1 10000000
0.000887 \setrandom poi 1 1000000000
0.209177 insert into stream02(gid,poi,crt_time) values (:gid,:poi,now());

pipeline=# select * from sv04 limit 3;
448955 | 2015-12-15 00:00:00 | {"306029686 -> 2015-12-15 14:53:01.273121","885962518 -> 2015-12-15 14:53:03.352406"}
7271368 | 2015-12-15 00:00:00 | {"615447469 -> 2015-12-15 14:53:01.2616","944473391 -> 2015-12-15 14:53:04.543387"}
8613957 | 2015-12-15 00:00:00 | {"473349491 -> 2015-12-15 14:53:01.288332","125413709 -> 2015-12-15 14:53:08.742894"}

Scenario 3: Vehicle Status Analysis

Collect statistics on vehicle information collected by traffic probes.

For example:

  • Collect the probe location information by vehicle and generate trajectory data accordingly.
  • Collect statistics on the traffic information by probe at each intersection. Assume that one probe is mapped to one intersection.

The first requirement is the same as that in the previous example for drawing the vehicle trajectory. Statistics of traffic at each intersection is performed by probe ID.

The methods for using PipelineDB are similar, so no more examples are given here.

Scenario 4: Predicting Stock Prices in Real Time

You can use MADlib or PLR to implement multiple rounds of regression. Select the best R2 and predict the next group of stock prices based on the intercept and slope.

You need to use UDFs in this scenario. For more information about how to use UDFs, see the previous articles.

No more examples are given here.

Scenario 5: Targeted Marketing through Wi-Fi Sensors

Collect real-time statistics from Wi-Fi sensors in shopping malls.

Count the people in each store in real time based on the location information provided by Wi-Fi. Statistical dimensions include the average stay duration and total stay duration of each store.

Scenario 6: Miscellaneous

Assume that the existing functions of PG cannot cope with your data processing scenario. What should you do? PG provides a series of APIs, such as UDFs, data types, operators, and indexing methods, to help you solve this problem. You can use these APIs to address your business requirements.

There are many other use methods, which cannot be completely listed here.

Integrating Kafka with PipelineDB

The following provides a very popular message queue, from which PipelineDB can retrieve data and perform real-time computing.

Start a local NGINX server and use Siege to simulate HTTP requests. NGINX records these behaviors and stores them in a JSON file.

Start a local Kafka server and use kafkacat to continuously push NGINX access logs to Kafka.

Subscribe to Kafka messages in PipelineDB and convert the data into the desired statistical information in real time, for example, web page visitor count or latency.

Install Kafka.

http://kafka.apache.org/07/quickstart.html    

# wget http://www.us.apache.org/dist/kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz
# tar -zxvf kafka_2.10-0.8.2.2.tgz

# git clone https://github.com/edenhill/librdkafka.git
# cd librdkafka
./configure
make
make install

# git clone https://github.com/lloyd/yajl.git
# cd yajl
./configure
make
make install

# vi /etc/ld.so.conf
/usr/local/lib
# ldconfig

# git clone https://github.com/edenhill/kafkacat.git
# cd kafkacat
./configure
make
make install

Install Siege and NGINX.

# yum install -y siege nginx

Create an NGINX configuration file and record the access logs in /tmp/access.log in JSON format.

cd /tmp    

cat <<EOF > nginx.conf
worker_processes 4;
pid $PWD/nginx.pid;
events {}
http {

log_format json
'{'
'"ts": "\$time_iso8601", '
'"user_agent": "\$http_user_agent", '
'"url": "\$request_uri", '
'"latency": "\$request_time", '
'"user": "\$arg_user"'
'}';

access_log $PWD/access.log json;
error_log $PWD/error.log;

server {
location ~ ^/ {
return 200;
}
}
}
EOF

Start the NGINX server.

nginx -c $PWD/nginx.conf -p $PWD/

Configure the host name.

# hostname    
digoal.org
# vi /etc/hosts
127.0.0.1 digoal.org

Start the Kafka server.

cd /opt/soft_bak/kafka_2.10-0.8.2.2    
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &

Generate a random URL file.

for x in {0..1000000}; do echo "http://localhost/page$((RANDOM % 100))/path$((RANDOM % 10))?user=$((RANDOM % 100000))" >> urls.txt; done

Use Siege to simulate access to these URLs. NGINX will generate the access logs in /tmp/access.log.

siege -c32 -b -d0 -f urls.txt >/dev/null 2>&1    

/tmp/access.log举例,格式为JSON
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page68/path7?user=18583", "latency": "0.002", "user": "18583"}
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page78/path0?user=24827", "latency": "0.003", "user": "24827"}
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page19/path6?user=3988", "latency": "0.003", "user": "3988"}
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page55/path2?user=18433", "latency": "0.003", "user": "18433"}
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page62/path3?user=10801", "latency": "0.001", "user": "10801"}
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page9/path2?user=4915", "latency": "0.001", "user": "4915"}
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page10/path2?user=5367", "latency": "0.001", "user": "5367"}

Output access logs to kafkacat and push them to the Kafka message system. The corresponding topic is logs_topic.

( tail -f /tmp/access.log | kafkacat -b localhost:9092 -t logs_topic ) &

The original consumption method is as follows:

# cd /opt/soft_bak/kafka_2.10-0.8.2.2    
# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic logs_topic --from-beginning
# Ctrl+C

Next, use PipelineDB to consume these messages in real time and convert them into the desired statistical results.

CREATE EXTENSION pipeline_kafka;    
SELECT kafka_add_broker('localhost:9092'); -- Add a Kafka broker, that is, a node of the Kafka cluster.
CREATE STREAM logs_stream (payload json); -- Create a stream and map it to the Kafka message system.
CREATE CONTINUOUS VIEW message_count AS SELECT COUNT(*) FROM logs_stream; -- Create a continuous view to consume and process Kafka messages in real time.
SELECT kafka_consume_begin('logs_topic', 'logs_stream'); -- Start to consume the specified topic, logs_topic.
kafka_consume_begin
------------------
success
(1 row)

Query the continuous view to obtain the current NGINX access statistics.

SELECT * FROM message_count;    
count
--------
24
(1 row)

SELECT * FROM message_count;
count
--------
36
success
(1 row)

Next, conduct an in-depth real-time analysis to analyze the total visits, number of unique visitors, and 99th-percentile access latency of each URL.

/*     
* This function will strip away any query parameters from each url,
* as we're not interested in them.
*/
CREATE FUNCTION url(raw text, regex text DEFAULT '\?.*', replace text DEFAULT '')
RETURNS text
AS 'textregexreplace_noopt' -- textregexreplace_noopt@src/backend/utils/adt/regexp.c
LANGUAGE internal;

CREATE CONTINUOUS VIEW url_stats AS
SELECT
url, -- URL
percentile_cont(0.99) WITHIN GROUP (ORDER BY latency_ms) AS p99, -- 99th-percentile request latency
count(DISTINCT user) AS uniques, -- Number of unique visitors
count(*) total_visits -- Total visits
FROM
(SELECT
url(payload->>'url'), -- URL
payload->>'user' AS user, -- User ID
(payload->>'latency')::float * 1000 AS latency_ms, -- Access latency
arrival_timestamp
FROM logs_stream) AS unpacked
WHERE arrival_timestamp > clock_timestamp() - interval '1 day'
GROUP BY url;

CREATE CONTINUOUS VIEW user_stats AS
SELECT
day(arrival_timestamp),
payload->>'user' AS user,
sum(CASE WHEN payload->>'url' LIKE '%landing_page%' THEN 1 ELSE 0 END) AS landings,
sum(CASE WHEN payload->>'url' LIKE '%conversion%' THEN 1 ELSE 0 END) AS conversions,
count(DISTINCT url(payload->>'url')) AS unique_urls,
count(*) AS total_visits
FROM logs_stream GROUP BY payload->>'user', day;

-- What are the top-10 most visited urls?
SELECT url, total_visits FROM url_stats ORDER BY total_visits DESC limit 10;
url | total_visits
---------------+--------------
/page62/path4 | 10182
/page51/path4 | 10181
/page24/path5 | 10180
/page93/path3 | 10180
/page81/path0 | 10180
/page2/path5 | 10180
/page75/path2 | 10179
/page28/path3 | 10179
/page40/path2 | 10178
/page74/path0 | 10176
(10 rows)


-- What is the 99th percentile latency across all urls?
SELECT combine(p99) FROM url_stats;
combine
------------------
6.95410494731137
(1 row)

-- What is the average conversion rate each day for the last month?
SELECT day, avg(conversions / landings) FROM user_stats GROUP BY day;
day | avg
------------------------+----------------------------
2015-09-15 00:00:00-07 | 1.7455000000000000000000000
(1 row)

-- How many unique urls were visited each day for the last week?
SELECT day, combine(unique_urls) FROM user_stats WHERE day > now() - interval '1 week' GROUP BY day;
day | combine
------------------------+---------
2015-09-15 00:00:00-07 | 100000
(1 row)

-- Is there a relationship between the number of unique urls visited and the highest conversion rates?
SELECT unique_urls, sum(conversions) / sum(landings) AS conversion_rate FROM user_stats
GROUP BY unique_urls ORDER BY conversion_rate DESC LIMIT 10;
unique_urls | conversion_rate
-------------+-------------------
41 | 2.67121005785842
36 | 2.02713894173361
34 | 2.02034637010851
31 | 2.01958418072859
27 | 2.00045348712296
24 | 1.99714899522942
19 | 1.99438839453606
16 | 1.98083502184886
15 | 1.87983011139079
14 | 1.84906254929873
(1 row)

When PipelineDB is used together with Kafka, more application scenarios are available.

Conclusion

For many DBAs in the field of IoT, you’ve probably wondered how to build a larger real-time message processing cluster with PipelineDB and Kafka. A common approach to address this issue is to properly plan the data sharding rules to avoid cross-node statistics. If cross-node access is needed, use dimension tables on each node.

For example, how should you process trillions of messages every day? Based on the preceding stress testing results, each server processes an average of 0.1 million records per second (8.6 billion records per day) and 116 PostgreSQL servers are needed for computing. This is easy and convenient.

As shown in the following figure, every layer can be expanded, including LVS, HAproxy, Kafka, PostgreSQL, and HAWQ.

Original Source

--

--

Alibaba Cloud
DataSeries

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com