ClickHouse: A Business Weapon for Crowd Selection

Alibaba Cloud
11 min readJun 22, 2021

What Is Crowd Selection?

The data volume on data platforms in various industries is growing in the data age. The requirements regarding users’ personalized operation are also being highlighted. Therefore, the user tag system has emerged as a basic service for personalized operation. Nowadays, real-time, precise marketing is required in almost all industries, including the Internet, gaming, and education. You can locate target groups by generating user profiles and filtering users with a combination of conditions in marketing. For example:

  • E-Commerce Industry: Before activities, merchants need to select a group of target customers according to the characteristics of the target groups to push advertisements to and assess if they fit the activities.
  • Gaming Industry: The platforms present gifts to players based on player characteristics to improve player activity.
  • Education Industry: Targeted exercises need to be pushed to students based on their characteristics to help them learn.
  • Business Scenarios, such as Searches, Portals, and Video Websites: Specific content is pushed to users based on user concerns.

Let’s take a typical target crowd selection scenario in e-commerce platforms as an example. The following table describes how to collect, tag, and clean potential customer information in the apparel industry:

In the table above, the first column is the unique identifier of the users, which is often used as the primary key. The other columns are all tag columns.

If the company wants to launch a high-end male sports product, the selection conditions are listed below:

  1. Male: The target group for the product is male.
  2. Sports Enthusiasts: Sports enthusiasts are more likely to consume sports products.
  3. First-Tier Cities: Users in first-tier cities may consume more high-end products compared to users in second and third-tier cities.

The table above shows the typical table structure for crowd selection, with the first column displaying the user ID and all the others as tag columns and query conditions. The crowd selection service faces some common pain points:

  • Various user tags with hundreds or thousands of tag columns
  • A large amount of data and users that require a great deal of computing
  • Various Combinations of Selection Conditions: No fixed index is available for optimization and the storage space usage is high.
  • High Performance Requirements: The in-time selection results are required, and too much delay may result in inaccurate group marketing.
  • High Requirements on the Timeliness of Data Updates: User profiles need to be updated in near real-time. The expired user information will directly affect the accuracy of the selection.

This article provides an in-depth analysis and description of how ClickHouse builds a crowd selection system, why ClickHouse is important, and its comparative advantages.

Why ClickHouse?

This article uses open-source Elasticsearch (ES) as an example to compare ClickHouse in terms of crowd selection. The open-source ES is an efficient search engine. ES can perform complex combination operations and data aggregation with its excellent indexing technology. ClickHouse is a popular open-source columnar storage AnalyticDB. Its core feature is its high storage compression ratio and query performance, especially large table queries. Therefore, compared with ClickHouse, ES has the necessary ca pabilities for crowd selection, but it still has the following three deficiencies:

Cost

Open-source Elasticsearch (ES) uses Lucene as the underlying storage, which mainly includes row storage (storefailed), columnar storage (docvalues), and inverted indexes (invertindex). The _source field in row storage controls raw doc data storage. When writing data, ES regards the entire json structure of raw doc data as a string and stores it as a _source field. Hence, the _source field occupies a large storage space, and update operation is not supported after _source field is disabled. Indexing is also an indispensable part of ES. By default, all columns are indexed in ES. Specific columns can be set to be unindexed, but query will not be supported on the unindexed columns. In the crowd selection scenario, the conditions for filtering tags are arbitrary, varied, and change constantly. It is unrealistic not to index on any tag columns. Therefore, for a large, wide table with hundreds of columns, the storage cost must be doubled with a full-column index.

ClickHouse is a complete columnar storage database. Since it does not rely on indexes for query or force index building, no additional index files are retained. At the same time, the number of ClickHouse storage data copies can be adjusted to minimize costs.

Data Update and Governance

Indexes provide efficient query performance for ES, but index construction is complicated and time-consuming. Each time, the index creation requires scanning and sorting the data in a column to generate an index file. However, in the crowd selection business, the crowd information is increasing continuously. The correspondingly constant updating of tags will force ES to recreate the indexes frequently, which will be a huge burden to ES performance.

ClickHouse queries are index-independent, and no index creation is required. Therefore, for newly added data, index update and maintenance are not included by ClickHouse.

Ease of Use

Open-source ES does not completely support SQL, and the json format of query requests is complex. At the same time, ES lacks optimization for the execution strategy of multi-condition filtering and aggregation. Let’s take sorting out of the target group of a high-end male sports product like the example mentioned above. The following SQL statements are available: SELECT user_id FROM whatever_table WHERE city_level = 'first-tier city' AND gender = 'male' AND is_like_sports = 'yes ';

For the preceding SQL statements, ES performs three index scans on the three tags and then merges the results of the three scans, as shown in the following figure:

The execution of ClickHouse is more elegant. ClickHouse adopts a standard SQL statement, which is simple and useful. When the where statement is executed, a layer will be generated automatically to execute the PreWhere statement. Therefore, the second scan is performed based on the result of the first scan, as shown in the following figure:

ClickHouse optimizes the filtering process of complex multi-condition filtering scenarios. The scanned data volume is smaller with more efficient performance than ES.

Building a Crowd Selection System Based on ClickHouse

After the model selection, the next thing is building a crowd selection system based on ClickHouse. After reviewing the service description at the beginning of the article and the typical SQL statements in the above part (SELECT user_id FROM whatever_table WHERE city_level = 'first-tier city' AND gender = 'male' AND is_like_sports = 'yes'; ), the requirements of the crowd selection business on database capabilities are listed below:

  1. Efficient batch data import performance
  2. Frequent processing and real-time updating
  3. DDL ability to add and delete columns
  4. Efficient query capability by specifying any column to be filtered

How can we make full use of ClickHouse in the crowd selection scenario while avoiding its disadvantages and addressing the needs listed above?

Update Statement in Place of Insert Statement

The asynchronous Update mechanism of ClickHouse should be prioritized. The execution of Update by ClickHouse is inefficient. Once a Data Part is generated by the MergeTree storage in the ClickHouse kernel, this Data Part cannot be changed. Therefore, at the MergeTree storage kernel level, ClickHouse is inept at updating and deleting data. Bare Update operations are not supported by ClickHouse and thus are added to the Alter Table.

ALTER TABLE [db.]table UPDATE column1 = expr1 [, ...] WHERE filter_expr;

So when an Update operation (above) is performed and responded, the ClickHouse kernel only does the following two things:

  1. Checks whether the Update operation is legal
  2. Saves the Update command to the storage file and awakens a worker thread to process merge and mutation asynchronously.

Despite the extreme complexity, the workflow of asynchronous threads are summarized below:

  • Locate the datapart where the data to be updated is
  • Scan the entire datapart and update the data that needs changing
  • Rewrite the data to the disk to generate a new datapart
  • Remove the expired datapart and replace it with a new one

This is how ClickHouse executes the Update command. Frequent Update operations are disastrous for ClickHouse. Therefore, the update statement is substituted with the insert statement. New data from that user needs to be inserted to update tags of a specified user. For example, update the data of user 07 in the table:

Consequently, there may be multiple records for each user. For the crowd selection scenario, the query results are misled by the same user’s disordered and redundant information. Thus, the requirements for precise selection cannot be met. The next section describes how to use primary keys to eliminate duplication in ClickHouse. You must update the same user’s data by overwriting the existing data with the newly inserted data.

Using the AggregatingMergeTree Table Engine

MergeTree is the most important and core storage kernel in ClickHouse. MergeTree is similar to LSM-Tree in terms of the idea. The complexity is difficult to explain clearly in a single article, so the implementation principles will not be explained here. This section of the article focuses on the crowd selection scenario and describes how to use the AggregatingMergeTree, the MergeTree variant, and the data aggregation effects using AggregatingMergeTree. Derived from MergeTree, AggregatingMergeTree is identical to the basic MergeTree in terms of storage. However, for the former, “additional merge logic” is added in the data merge process. AggregatingMergeTree substitutes a single row that stores a series of aggregated functions state for all rows with the same primary key (within a data fragment.) Let’s take the table architecture from the beginning of this article as an example. The following shows how to create a table using the AggregatingMergeTree table engine:

CREATE TABLE IF NOT EXISTS whatever_table ON CLUSTER default
(
user_id UInt64,
city_level SimpleAggregateFunction(anyLast, Nullable(Enum('First-tier city' = 0, 'Second-tier city' = 1, 'Third-tier city' = 2, 'Fourth-tier city' = 3))),
gender SimpleAggregateFunction(anyLast, Nullable(Enum('Female' = 0, 'Male' = 1))),
interest_sports SimpleAggregateFunction(anyLast, Nullable(Enum('No' = 0, 'Yes' = 1))),
reg_date SimpleAggregateFunction(anyLast, Datetime),
comment_like_cnt SimpleAggregateFunction(anyLast, Nullable(UInt32)),
last30d_share_cnt SimpleAggregateFunction(anyLast, Nullable(UInt32)),
user_like_consume_trend_type SimpleAggregateFunction(anyLast, Nullable(String)),
province SimpleAggregateFunction(anyLast, Nullable(String)),
last_access_version SimpleAggregateFunction(anyLast, Nullable(String)),
others SimpleAggregateFunction(anyLast,Array(String))
)ENGINE = AggregatingMergeTree() partition by toYYYYMMDD(reg_date) ORDER BY user_id;

Based on the preceding table creating statement, each row of data except for the primary key (user) is altered into a pre-aggregated state by AggregatingMergeTree in combination with anyLast function. The anyLast aggregation function declares that the aggregation policy is to retain the last updated data.

Data Consistency Assurance

The previous section describes how to select a table engine and an aggregation function for crowd selection scenarios. However, AggregatingMergeTree does not guarantee that the results of any query are aggregated, and no flag is provided to check the aggregation state and progress. Therefore, the optimize command needs to be manually issued to force the aggregation to ensure the data is aggregated before the query. Besides, for convenience, the optimize command can be set to be issued periodically. For example, you can issue the optimize command every ten minutes. The execution cycle of the optimize command can be determined on real-time business requirements and computing resources. If the data volume is too large for the optimize command to take effect, the optimize command can be issued in parallel at the partition level. After the optimize operation takes effect, the deduplication logic can be implemented.

Demo

import java.sql.*;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.concurrent.TimeoutException;
public class Main {
private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat(DATE_FORMAT);
public static void main(String[] args) throws ClassNotFoundException, SQLException, InterruptedException, ParseException {
String url = "your url";
String username = "your username";
String password = "your password";
Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
String connectionStr = "jdbc:clickhouse://" + url + ":8123";
try {
Connection connection = DriverManager.getConnection(connectionStr, username, password);
Statement stmt = connection.createStatement();
// Create a local table
String createLocalTableDDL = "CREATE TABLE IF NOT EXISTS whatever_table ON CLUSTER default " +
"(user_id UInt64, " +
"city_level SimpleAggregateFunction(anyLast, Nullable(Enum('First-tier city' = 0, 'Second-tier city' = 1, 'Third-tier city' = 2, 'Fourth-tier city' = 3))), " +
"gender SimpleAggregateFunction(anyLast, Nullable(Enum('Female' = 0, 'Male' = 1)))," +
"interest_sports SimpleAggregateFunction(anyLast, Nullable(Enum('No' = 0, 'Yes' = 1)))," +
"reg_date SimpleAggregateFunction(anyLast, Datetime)) " +
"comment_like_cnt SimpleAggregateFunction(anyLast, Nullable(UInt32)),\n" +
"last30d_share_cnt SimpleAggregateFunction(anyLast, Nullable(UInt32)),\n" +
"user_like_consume_trend_type SimpleAggregateFunction(anyLast, Nullable(String)),\n" +
"province SimpleAggregateFunction(anyLast, Nullable(String)),\n" +
"last_access_version SimpleAggregateFunction(anyLast, Nullable(String)),\n" +
"others SimpleAggregateFunction(anyLast, Array(String)),\n" +
"ENGINE = AggregatingMergeTree() PARTITION by toYYYYMM(reg_date) ORDER BY user_id;";
stmt.execute(createLocalTableDDL);
System.out.println("create local table done.");
// Create a distributed table
String createDistributedTableDDL = "CREATE TABLE IF NOT EXISTS whatever_table_dist ON cluster default " +
"AS default.whatever_table " +
"ENGINE = Distributed(default, default, whatever_table, intHash64(user_id));";
stmt.execute(createDistributedTableDDL);
System.out.println("create distributed table done");
// Insert mock data
String insertSQL = "INSERT INTO whatever_table(\n" +
"\tuser_id,\n" +
"\tcity_level,\n" +
"\tgender,\n" +
"\tinterest_sports,\n" +
"\treg_date,\n" +
"\tcomment_like_cnt,\n" +
"\tlast30d_share_cnt,\n" +
"\tuser_like_consume_trend_type,\n" +
"\tprovince,\n" +
"\tlast_access_version,\n" +
"\tothers\n" +
"\t)SELECT\n" +
" number as user_id,\n" +
" toUInt32(rand(11)%4) as city_level,\n" +
" toUInt32(rand(30)%2) as gender,\n" +
" toUInt32(rand(28)%2) as interest_sports,\n" +
" (toDateTime('2020-01-01 00:00:00') + rand(1)%(3600*24*30*4)) as reg_date,\n" +
" toUInt32(rand(15)%10) as comment_like_cnt,\n" +
" toUInt32(rand(16)%10) as last30d_share_cnt,\n" +
"randomPrintableASCII(64) as user_like_consume_trend_type,\n" +
"randomPrintableASCII(64) as province,\n" +
"randomPrintableASCII(64) as last_access_version,\n" +
"[randomPrintableASCII(64)] as others\n" +
" FROM numbers(100000);\n";
stmt.execute(insertSQL);
System.out.println("Mock data and insert done.");
System.out.println("Select count(user_id)...");
ResultSet rs = stmt.executeQuery("select count(user_id) from whatever_table_dist");
while (rs.next()) {
int count = rs.getInt(1);
System.out.println("user_id count: " + count);
}
// Merge data
String optimizeSQL = "OPTIMIZE table whatever_table final;";
// If the data merge time is too long, execute the optimize command in parallel at the partition level.
String optimizeByPartitionSQL = "OPTIMIZE table whatever_table PARTITION 202001 final;";
try {
stmt.execute(optimizeByPartitionSQL);
}catch (SQLTimeoutException e){
// View merge progress
// String checkMergeSQL = "select * from system.merges where database = 'default' and table = 'whatever_table ';";
Thread.sleep(60*1000);
}
// Crowd Selection(city_level='First-tier city',gender='Male',interest_sports='Yes', reg_date<='2020-01-31 23:59:59')
String selectSQL = "SELECT user_id from whatever_table_dist where city_level=0 and gender=1 and interest_sports=1 and reg_date <= NOW();";
rs = stmt.executeQuery(selectSQL);
while (rs.next()) {
int user_id = rs.getInt(1);
System.out.println("Got suitable user: " + user_id);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

PostScript

Alibaba Cloud has launched the ClickHouse cloud hosting product. You can visit the product homepage for more information ApsaraDB for ClickHouse .

Original Source:

--

--

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com