Ali-Perseus: Unified and Distributed Communication Framework for Deep Learning
Co-authors: Yu Ce (Gong Zhigang), Sun Jiang (Lin Lixiang), Fei Lian (Wang Zhiming), Yun Long (You Liang)
In recent years, deep learning has witnessed fast development in fields such as image recognition and NLP. More and more computing power is needed to train a variety of network models.
Take the typical medium-scale image classification network ResNet-50 for example. The basic training accuracy is Top-1 76%, and the Top-5 accuracy is 93%. To reach this accuracy, generally it is required to have 90 epoches of the 1.28 million images in the entire Imagenet dataset. It takes six days for a single P100 computing processor to perform such a large number of computations and complete the training. It will take months or years to complete the training if the recent groundbreaking BERT model or the pre-training model GPT-2 is applied on a single machine.
Therefore, it is urgent to use distributed training in the actual production environment to reduce the time required for model training and improve the model iteration speed. To meet this urgent need, major deep learning frameworks have basically implemented support for distributed training.
Parameter server (PS)MPI(NCCL)TensorflowYY (Horovod)MXNetYY (Horovod)PytorchNY (Horovod, Gloo)CaffeNY (NVCaffe)
According to our experiences, the existing distributed training frameworks have the following problems in the cloud environment:
- Different frameworks have different forms of support for distributed training. The workload from the standalone to the distributed training is in direct proportion to the types of frameworks.
- The distributed implementations of all frameworks show unsatisfactory scalability. The preliminary profiling result shows that the network bandwidth is not efficiently utilized. For example, run the ResNet-50 model training on four TensorFlow + Horovod machines and 32 P100 GPUs, with a batch size of 128. The scalability is only 86%, which is lower than the expected scalability.
- Scalability optimization requires the support for different frameworks and involves repeated work and complex maintenance.
These three issues indicate the same trend, that is, to unify the distributed communication of individual frameworks into one single architecture. We noticed that the open-source Horovod project have partially solved the preceding issues. Currently, Horovod supports TensorFlow, PyTorch, and Keras. Recently, it also added support for MXNet. Horovod serves as a good reference. The unified distributed communication framework is feasible, and gradient synchronization communication can be abstracted into framework-independent architectures. However, the biggest problem in Horovod is that it does not solve the challenge of the scalability on the cloud.
After our discussion, we decided to develop a completely independent project: Ali-Perseus and create a highly optimized and unified distributed communication framework for deep learning on Alibaba Cloud.
Design of the Universal Distributed Communication Architecture
After considering the characteristics of Alibaba Cloud infrastructure, we developed a completely independent universal distributed framework. In addition to TensorFlow and PyTorch supported in Horovod, we also added support for MXNet and Caffe based on the needs of our main AI users. The following figure shows the system architecture.
Block diagram of the Ali-Perseus training communication framework
The goal of this system design is to make all communication-related operations transparent to upper-layer training framework and split support for different frameworks into mutually independent modules. To implement this goal, we implemented the following module design:
The communication framework needs to provide two types of interfaces: Python interfaces and C interfaces. The interface layer is required to provide the following features for performing upper-layer model training tasks:
- Create the communication layer and enable the support for the corresponding frameworks.
- Obtain the IDs and scales of both the global tasks and the node tasks in the current process
- Disable the current communication framework
For MXNet, PyTorch, and TensorFlow, it is required to prepare Python interfaces for corresponding features; for Caffe, it is only required to provide C interfaces. For detailed description of Python interfaces for individual frameworks, see the Framework layer section.
The interfaces in the communication layer are divided into two simple types of APIs:
- Registration of gradients
- Gradient communication
Before each framework has gradient reduction communication, it is required to register a gradient first. The parameter in the registration process is the name of the unique identifier of the upper-layer training framework in that gradient. A value is assigned internally to represent that gradient. Then gradient communication can be implemented. An interface adds the corresponding gradient to the communication queue and marks the communication type as broadcast or Allreduce. At the same time, it is also required to register a callback function. When the distributed communication finishes, the callback function will be invoked to notify the upper-layer training framework that the gradient communication has already been completed and that it is ready to perform the subsequent parameter update operation. All gradient communication details do not need to be exposed to any interfaces.
The following block diagram shows the internal modules in the communication layer.
Ali-Perseus communication modules
The main communication work is done in the main background thread and asynchronous waiting is done in the multi-stream thread pool. This asynchronous multi-stream approach can minimize the global impact of a single slow connection. The following are the main steps:
- The main thread processes the gradients that wait for communication.
- Multiple nodes negotiate the statuses of gradients to see if gradients are ready.
- Determine the list of ready gradients that are fully consistent with the generation order of each node.
- Each node judges whether its list is compliant with communication granularity requirement.
- Merge the sharded communication. At this point, pay attention to the synchronization relationship with the upper-layer framework. Make sure that the upper-layer framework has already generated the corresponding gradient output before merging communication. A sharded multi-stream resource is obtained synchronously. If no stream resources are available, wait for some previous streams to finish. If a stream resource is available, use that stream to send out that piece of sharded data and notify the corresponding thread in the thread pool.
- The distribution of a completed gradient corresponds to a callback function snippet. Check the callback function snippet to determine if this gradient has finished the communication. If it has finished, notify the upper-layer training framework that this gradient has finished the communication.
- Multi-stream thread pool
- Each stream has a dedicated thread and waits for the notification from the main thread. After the notification is received, wait for the corresponding CUDA stream of NCCL to finish, ensure that the communication in the upper part has finished and the operations have been generated in the lower part. Then join the corresponding queue.
In the communication layer, more underlying primitives that implements the actual communication are encapsulated. Currently, two communication patterns are supported:
- MPI + CUDA
- MPI (not data channel) + NCCL (data channel)
TensorFlow and PyTorch
After considering the wide popularity of Horovod and its support for TensorFlow and PyTorch, for the Python interfaces supported in the framework layer of these two frameworks, we decided to choose interfaces that are fully consistent with Horovod so that users can seamlessly integrate their existing model codes into the Ali-Perseus communication library with almost zero conversion cost. For the parts that connects to the framework layer, it is only required to define their own Tensors and the virtual implementations targeting Context. This is not a complicated process.
After the implementation, simply replace the distributed models that use Horovod:
# For tensorflow with horovod
sed -i 's/import horovod.tensorflow as/import perseus.tensorflow.horovod as/g' model.py# For pytorch with horovod
sed -i 's/import horovod.torch as/import perseus.torch.horovod as/g' model.py
Support for MXNet will not be explained here again.
Because Caffe provides relatively primitive distributed support and is not modular, support for Caffe is relatively more difficult compared with the three other frameworks. The support for the three preceding frameworks does not require any changes to the framework code, except few modifications to MXNet. However, we need to make many modifications to the Caffe framework, which mainly include the following:
- Change the single-process and multi-GPU model to the single-process and single-GPU model and launch training on multiple machines and GPUs by using MPI.
- Use the APIs of the Ali-Perseus framework to merge gradients.
- It is required to construct a callback mechanism to allow the Ali-Perseus communication framework to notice the Caffe framework that all the gradients in the entire batch have completed communication.
Ali-Perseus also needs to add proper implementations of Caffe. Finally, after the integration, Ali-Perseus can support multiple machines and machines in Caffe.
After implementing the preceding architecture, we have integrated the distributed communication of all the training frameworks into one framework. All optimization work in this communication framework will directly benefit all the training frameworks. Most of our optimization work in the first phase is related to the VPC network depth. Optimization in this phase generally falls into two types:
After profiling the NCCL implementation method in Horovod, we found that it was very difficult to reach the upper limit of the network bandwidth on the cloud. We made analysis and confirmed that the main reason was the single-stream communication in the TCP network. This was further evidenced by using Iperf. Therefore, we decided to use sharding and multiple streams. For sharding and multiple streams, a relatively simple sharding method is to perform sharding targeting gradients merged at a time and implement multi-stream communication. This method is simple and does not need to process different collections of merged gradients in parallel. However, this method has one disadvantage: The communication for the next time has to wait until all shards merged and the split this time completes the communication. We further found that individual streams under the multi-stream communication model have drastically different communication speeds. Therefore, we finally chose the combination of multiple gradients and multiple streams. This method has much more complicated control logic and the whole processing procedure has to be split into the upper half part and the lower half part. The upper half part is responsible for merging, reduction within nodes, and communication among nodes. The lower half part is responsible broadcasting results to individual nodes. During this process, be careful with the relationship among multiple streams in NCCL to prevent Hang.
Latency is generally not a major issue in the distributed training of deep learning models. However, this is not the case for large-scale distributed training. An artificial single-point hot spot will be generated during gradient negotiation for gradient data communication. In Horovod, all nodes will negotiate with the root node and the root node then coordinates all nodes. The burden on the root node will increase significantly as the number of nodes grows. One of our customers has 320 nodes in the CTR scenario. The overhead of negotiation causes the root node to completely lose scalability. By reforming the negotiation algorithm and deprecating the central point-to-point negotiation model in Horovod, we have reduced the complexity of gradient negotiation by one or two orders of magnitude. Scalability is also achieved in the aforementioned CTR scenario.
After the two optimizations, we found that some parameters need fine-tuning, such as the merging granularity and the sharding granularity. It is very hard to configure these parameters offline. We must find optimal configuration for these parameters in a dynamic manner. The following is our main optimization work related to communication:
- Multi-stream and self-adaptive gradient merging. The granularity of merging multiple gradients and the sharding granularity use self-adaptive algorithms. In the early stages of training, an optimal merging granularity and an optimal sharding granularity (the number of streams) are selected from a parameter space in the current network environment.
- Gradient compression. Currently, only FP16 compression is supported. We can perform scaling as needed to prevent precision loss of precision. Make gradient compression pluggable and allow users to customize gradient compression algorithms of different depths.
- Decentralization of gradient negotiation. The point-to-point negotiation mechanism in Horovod will cause serious scalability problems in a large-scale node scenario. The decentralized negotiation mechanism in Ali-Perseus can perfectly solve this problem.
After the overall implementation of the architecture and communication performance optimization, we compared the performance of distributed training for different libraries and found that for most network models, Horovod shows better performance than these libraries themselves. Ali-Perseus has better performance than Horovod. Generally, Ali-Perseus shows obvious performance advantages in the two following cases:
- Relatively large network models, such as BERT for NLP and deep insight for facial recognition;
- Distributed training in the case of 16 or more nodes.
The following charts show performance tests and performance comparison in the process of supporting our customers. Some network models using Ali-Perseus have been used in the production environment of our customers.
TensorFlow BERT Base Model
TensorFlow BERT Base Model with Ali-Perseus shows 200%-500% better performance than Horovod
The scalability of ResNet-18 (synthetic data) in Ali-Perseus on 4 machines and 32 GPUs is increased to 94.2% (86.4% in MXNet)
The scalability of ResNet-18 (real data) in Ali-Perseus on 4 machines and 32 GPUs is increased to 87.4% (75.9% in MXNet)
The scalability of ResNet-50 on 4 machines and 32 GPUs is increased to 91.9% (79.4% in MXNet)
The facial recognition model shows 200%-250% better performance in Ali-Perseus than in MXNet
Customer Case Study
The implementation of this universal distributed communication framework for deep learning decouples distributed optimizations of deep learning frameworks from frameworks themselves and allows us to perform deep optimizations of Alibaba Cloud infrastructure in a unified framework. The deep VPC network optimization in the first phase allows Tensorflow, MXNet, Pytorch, and Caffe to improve the multiple-machine scalability on Alibaba Cloud, significantly improves the platform optimization efficiency and reduces the post maintenance workload. Currently, some customers are in the following stages of using Ali-Perseus:
- Customer A has already begun to use Ali-Perseus, TensorFlow, and Bert to perform pre-training.
- Customer B is evaluating Ali-Perseus+MXNet. Customer B says that Ali-Perseus is smoothly integrated with its own MXNet. The test model on 4 machines and 32 GPUs has a 10% performance increase and shows almost linear performance. The customer plans to perform further tests.
Our customers can retain their original open-source training frameworks while utilizing the performance advantages of Ali-Perseus. This is an acceptable solution for most customers. IaaS resources that our customers have purchased also have direct performance gains and the heterogeneous computing products from Alibaba Cloud have a higher cost-effectiveness.
The latest release of Ali-Perseus 0.9.4 supports Python 2.7 and 3.6, Centos 7, and Ubuntu 16.04, as shown in the following figure. The following are the main features of Ali-Perseus:
- Support for multiple training frameworks (TensorFlow, PyTorch, MXNet, and Caffe)
- Light coupling with training frameworks and support for customers’ private training frameworks
- Self-adaptive network optimization and multi-stream support
- Gradient compression (support for float16)
- Decentralized gradient negotiation
- NaN check
Main components of Ali-Perseus training communication framework