By Li Yunxiang (Xiangyun)
With the rapid development of the big data industry in recent years, various distributed products and architectures have emerged. I would like to discuss some tools that I have used in big data system practices in this article and work with you to build a distributed product roadmap.
The following figure shows the 2019 AI and big data landscape as outlined by the famous data thinker Matt Turck in his blog at https://mattturck.com/. Starting from 2012, Matt Turck has drawn one of these diagrams each year. It describes the companies in this industry and their data products as well as the problems these products address. Most of these products are commercial software, and most Internet companies are more familiar with intermediate green open-source products. Most of these open-source products belong to the Apache Software Foundation (ASF).
In the following sections, I will just give a simple introduction to some concepts. If you want a detailed description of these concepts, you can search for other articles online or on our official blog. You are welcome to contact me at any time to discuss content that you are interested in and correct any mistakes in this article.
Official website: http://hadoop.apache.org/
The Hadoop project contains many subprojects from computing to storage, such as Hadoop Distributed File System (HDFS), MapReduce, YARN, and HBase.
HDFS consists of one NameNode (NN) and multiple DataNodes (DNs). Data files are divided into multiple blocks. By default, these blocks have three replicas that are stored on different nodes based on the host or rack policy. Each block is 128 MB in size by default. As the disk addressing speed increases, the block size will also increase. The NN stores metadata for these blocks. When a client queries data, the DN provides the position of the required data. This structure has an obvious issue, which is the standalone NN. In Hadoop 2.x, some improvements have been made to the NN.
First, a secondary NN is added as the backup node of the primary serving NN. When the serving NN is faulty, the secondary NN automatically takes over service provision. The ZKFC is responsible for health status and service switching for the primary and secondary NNs. The Quorum Journal Node is responsible for information synchronization between the primary and secondary NNs.
Second, as a single NN stores a large amount of metadata, the NN will become a system bottleneck as the HDFS data volume continuously increases. To solve this problem, Hadoop 2.x adds the Federation technology, which allows multiple NNs to provide services simultaneously. These NNs horizontally split all file paths in DNs, and each DN is responsible for a different path, ensuring effective scale-out.
In addition to HDFS, Hadoop 2.x also introduces YARN, which manages cluster resources and schedules jobs. It consists of one ResourceManager (RM) and multiple NodeManagers (NMs). When a job is submitted to YARN, it starts an ApplicationMaster (AM) on a server. The AM applies for resources from the RM, the RM searches for idle resources in the cluster through an NM, and the NM packages resources as containers and submits them to the AM. The AM distributes data and programs to corresponding nodes for processing. If a job in a container fails to run, the AM applies for a new container from the RM.
Apache Hadoop HBase & Kudu
Official website: http://hbase.apache.org/
HBase is a well-known distributed column-based storage system and also a subproject of Hadoop. I will not give a detailed description of the advantages and disadvantages of column-based storage here. In this article, I will describe the write-ahead logging (WAL) and log-structured merge-tree (LSM tree) capabilities of HBase. In a system that uses WAL, data modifications are recorded in a log before they are applied. If the service crashes, the log can be used for data restoration. In MySQL, the redo log of the InnoDB engine is a typical application of WAL. In HBase, the HLog module implements WAL. You can find a lot of articles on the Internet that describe the LSM tree. In HBase, the LSM tree is the underlying storage structure of the Memstore module.
The Memstore module has three functions: (1) Data can be directly written to Memstore to improve the data writing efficiency. (2) Memstore can function as the cache for reading data. (3) Memstore periodically deduplicates data and stores data to the disk. The main components of HBase include HMaster and HRegionServer, which are in a one-to-many relationship. ZooKeeper is responsible for the status of all nodes. Kudu is similar to HBase. However, unlike HBase, Kudu does not rely on ZooKeeper to manage its clusters. In addition, HBase data is stored on HDFS, while Kudu has its own data file format.
Official website: https://spark.apache.org/
Spark is a distributed computing engine launched by the University of California, Berkeley. On the Spark official website, there is a chart that compares the performance of Spark and Hadoop. Despite the accuracy of this chart, Spark has certainly raised the performance of Hadoop (mainly MapReduce) by an order of magnitude. In my opinion, this is due to two factors. First, intermediate data generated during Spark computing is no longer stored to the disk, that is, the spill phase is removed. Second, a directed acyclic graph (DAG) is used to break down jobs into multiple phases, and each phase contains multiple tasks. With the DAG, tasks without dependencies can run concurrently.
In addition to good performance in batch processing, Spark focuses on building an entire ecosystem. Its stream processing framework, “Spark Streaming”, uses micro-batch processing to achieve excellent stream processing performance. It also launched Structured Streaming to ensure status-based stream processing. It provides Spark SQL to help non-developers call Spark services and Spark MLlib more conveniently.
However, Spark consumes a large amount of memory resources and is not as stable as MapReduce. Some large companies still use MapReduce for routine data warehouse tasks to ensure reliability. When our system was first switched from MapReduce to Spark, task exceptions frequently occurred at night. We eventually adjusted task and resource allocation and used a retry mechanism to solve this problem.
Official website: https://flink.apache.org/
Flink is a distributed computing system developed by Data Artisans, a Berlin-based startup. It was acquired by the Alibaba Group in early 2019. Spark and Kafka both have seen the excellent prospects of stream computing and set up their own stream computing ecosystems.
However, Spark Streaming uses micro-batch processing. Even though the batch size is small, it is essentially batch processing and cannot meet the requirements of certain scenarios. Now, Spark has transferred to Structured Streaming, which has made up for many shortcomings of Spark Streaming. However, it also uses micro-batch processing. Flink is real stream computing.
Flink is designed to consider both batch and stream processing. You can use one computing engine to meet the requirements of both batch and stream processing scenarios. To use Flink, I think you need to understand state management, the checkpoint fault tolerance mechanism, sliding windows, and watermark disorders. I will not give a detailed description of these features here. You can find a lot of articles about these features online.
Official website: https://impala.apache.org/
Impala is a query system developed by Cloudera using C++. It supports SQL semantics, can be used to query content in HDFS, HBase, and Kudu, and supports various serialization and compression formats. Impala also uses memory-based computing and is much faster than traditional MapReduce. Our team has used Spark and does not apply Impala on a large scale. Based on surveys and industry knowledge, other companies also seldom use Impala.
Official website: https://zookeeper.apache.org/
ZooKeeper is widely used in both data systems and other backend systems. It can be used as a distributed lock service, be used as a configuration center for systems, assist in leader election for consensus algorithms, and be used as the ZKFC for node health checks. In short, it is very useful. It uses the ZooKeeper Atomic Broadcast (ZAB) protocol that supports crash recovery. It consists of one leader and multiple followers and submits data based on the two-phase commit (2PC) protocol. When the leader crashes, the followers automatically switch their statuses to re-select a leader. After the leader is selected, data is aligned among multiple nodes.
Official website: https://sqoop.apache.org/
Sqoop is a tool for transferring data between traditional relational databases and HDFS. It provides a large number of parameters for both import and export operations. It adopts distributed execution, so its data transfer speed is fast. You need to pay attention to abnormal data in data sources, which often causes Sqoop to crash during data transfer. Sqoop 2 makes up for the simple features of Sqoop 1 but has a more complex architecture. I have not used Sqoop 2.
Official website: http://flume.apache.org/
Flume is a distributed data transfer tool that supports files, Netcat, JMS, HTTP, and other data sources. It consists of the source, channel, and sink. The source caches obtained data in the channel. The channel can be a file, memory, or JDBC. The sink consumes data from the channel and transfers the data to other system modules, such as HBase, HDFS, and Kafka.
Official website: http://kafka.apache.org/
Kafka was once a distributed message queue product developed by Scala. Since Kafka Streaming was launched, Kafka has become a stream processing platform. I will not give a detailed description of Kafka Streaming here because I have not used it.
Kafka queues are divided by topic. Each topic consists of multiple partitions. Messages in a partition are ordered. This structure ensures the sequential write of messages to the disk, saving the disk addressing time. Therefore, data can be written to disks quickly. In addition, mmap reduces the number of data copies between the user state and the kernel state. mmap is a technology that maps file content to memory addresses, significantly improving efficiency. Kafka and Flume form a classic framework for stream processing.
Apache Ranger & Sentry
Official website: http://ranger.apache.org/
Official website: http://sentry.apache.org/
Ranger and Sentry are distributed data security tools with basically the same features. Specifically, they both manage the permissions of products in the big data computing ecosystem. Sentry integrates itself into Impala, Hive, HDFS, Solr, and other products in the form of a plugin. When a user sends a request to these products, the products first send the request to the Sentry server for verification. Sentry can also be used together with Kerberos for cross-platform unified permission management. Ranger provides similar features but supports more products, including HDFS, HBase, Hive, YARN, Storm, Solr, Kafka, and Atlas. It also uses a Ranger Admin to connect to multiple Ranger plugins integrated into these products for permission verification.
Official website: https://atlas.apache.org/
Apache Atlas is an important product in a data governance system. It mainly manages metadata, that is, data used to describe data, such as the data type, name, attribute, function, lifecycle, valid range, and lineage. In a big data system, metadata has great value. Generally, a mature data system has a metadata management platform.
Metadata not only allows business personnel to understand our data and businesses more easily and quickly but also helps us improve the data quality, eliminate information asymmetry, and quickly locate data problems. Therefore, many people are always thinking about how to effectively use the metadata so that the data can generate greater value.
Currently, Atlas supports Hive, Sqoop, Storm, and other data sources and hook and batch import. It first uses batch synchronization and then uses hooks to acquire data source changes and update its own data.
Official website: http://kylin.apache.org/
Kylin is a distributed data warehouse product customized for online analytical processing (OLAP) scenarios. It provides multidimensional analysis and can seamlessly interwork with multiple BI analysis tools, such as Tableau and Superset. Kylin provides a frontend platform that allows you to customize data dimensions. Kylin periodically analyzes required data in advance, forms multiple cubes, and saves the cubes to HBase. The HBase environment must support Kylin deployment. During data processing and computing, Kylin consumes a large amount of resources of the device where it is located.
Apache Hive & Tez
Official website: https://hive.apache.org/
Official website: https://tez.apache.org/
Hive is the most famous data warehouse tool. It organizes data on HDFS into relational databases and provides Hive SQL for structured queries. Data analysts can seamlessly transit from traditional relational databases to HDFS. However, some Hive SQL functions are different from those of traditional SQL, and Hive SQL does not support update and delete operations by default. Developers can develop user-defined functions (UDFs) to expand Hive SQL functions. Hive performs computing based on MapReduce. In response to Spark SQL, the development team launched Hive on Spark. It ensures that Hive interprets, analyzes, and optimizes SQL statements while Spark executes SQL statements, achieving a speed similar to Spark SQL.
Tez is another optimization for Hive. It introduces DAGs and adds task concurrency to improve the query speed of Hive. However, it is essentially MapReduce. Its performance improvement is not very obvious when compared with that of Hive on Spark.
Official website: https://prestodb.io/
Presto is a distributed query engine developed by Facebook. It supports a lot of connectors that can connect multiple data sources on one platform and can perform aggregated computing for the content of these data sources. Presto also allows you to develop new connectors yourself. The entire Presto computing process is based on memory, so the speed is fast. However, Presto only provides obvious performance improvement in certain computing scenarios. For more information, you can find articles that provide a detailed analysis on the Internet. Presto was previously used to jointly query data in offline and real-time data warehouses and provide the results to real-time data platforms.
When using Presto, I discovered the following three shortcomings: (1) Presto often crashes and causes task failures when resources are tight because it uses memory-based computing. (2) Presto tasks are submitted in serial mode. Large tasks may block small tasks. Maybe parameter tuning can solve this problem, but I did not perform in-depth research. (3) There are no good web platforms to query Presto. Hue uses PostgreSQL to link Presto. However, this solution is complex. The mature Airpal platform is no longer updated. Finally, we used yanagishima. Even though the platform can provide basic features, it does not have user management features and permissions cannot be controlled.
Apache Parquet & Orc
Official website: https://parquet.apache.org/
Official website: https://orc.apache.org/
Parquet and ORC are two frequently used column-based storage formats. Column-based storage is different from row-based storage in traditional relational databases; these two storage models were generated due to different requirements in online transaction processing (OLTP) and OLAP scenarios. OLTP scenarios require storage systems to support quick Create, Read, Update, and Delete (CRUD) operations, which use the row as the unit. OLAP scenarios involve huge data volumes but have low real-time requirements. Column-based storage meets this requirement. When data is stored by columns, the engine reads smaller volumes of data during queries.
In addition, data in the same column usually has similar content and is easier to compress. However, column-based storage is not suitable for scenarios where data is frequently updated or deleted. Parquet and ORC both provide column-based storage, but use different storage formats. This causes performance differences when storing different types of data. According to some articles on the Internet, ORC has better performance than Parquet. However, Impala does not support ORC. Data lake products, such as Delta Lake, are based on Parquet. Therefore, you need to select a column-based storage format based on your own business features.
Official website: http://griffin.apache.org/
Data quality management is indispensable in a data system. Usually, we add some simple scripts in different ETL phases to check generated data. Apache Griffin is a data quality monitoring platform developed by eBay and escalated to a top-level Apache project. It provides data verification and alarm features and can visually display some parameters. You can complete related configuration steps on the Griffin page. In addition to some basic and simple data checks, such as whether exceptional values exist, Griffin compiles data statistics, such as the maximum, minimum, and median values. It also provides professional graph and table reports.
Official website: http://zeppelin.apache.org/
Zeppelin is a convenient online notebook with a similar user experience to Python’s Jupyter NoteBook. As shown in the preceding figure, you can execute and draw simple graphs and tables online. In addition, Zeppelin introduced the user concept, making multi-person collaboration more convenient. Zeppelin supports many data sources. You can call Hive, Cassandra, R, Kylin, Flink, Spark, ElasticSearch, HBase, Python, and Shell on this platform.
I have encountered unstable Spark connections when using Zeppelin and solved the problem through repeated logons. I like this tool very much.
Official website: http://superset.apache.org/
Superset is an open-source virtualization tool that allows you to quickly and easily create data dashboards. Similar products include Redash and Metabase. After some investigation, I personally prefer Superset. Tableau was introduced at the same time, so Superset was not used in actual projects.
Official website: https://www.tableau.com/
Unlike the other software I have introduced, Tableau is a commercial software, which is paid by the year based on the number of accounts purchased. I mention Tableau here because it is famous in the BI field. Tableau consists of a server and client. You can quickly generate a data dashboard through dragging and dropping on the client. The dashboard development work is transferred from the development side to the demand side. Tableau also provides complete user management features and supports many data sources. Compared with open-source software, commercial software provides more complete features and a better user experience. I think the only difficulty is how to implement the development and maintenance work on the demand side because this has a steep learning curve.
Official website: http://www.tpc.org/
The Transaction Processing Performance Council (TPC) is a non-profit organization formed by dozens of companies. It formulates the benchmark test specifications for different industries. Alibaba Cloud MaxCompute and OceanBase have participated in these tests and achieved good performance. TPCx-BB is a big data benchmark test tool. The tool simulates an online retail scenario. The tool inserts predefined tables and data into the tested system and then performs a series of SQL operations to evaluate the performance of a big data cluster.
TPC provides many ready-to-download tools for different test scenarios at: http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp