How to Execute Mars in a Distributed Manner
We have already introduced what Mars is in a previous article, and have made it open source on GitHub after testing it on our internal systems. This article introduces the distributed execution architecture implemented by Mars.
Mars provides a library for distributed execution of tensors. The library is written using the actor model implemented by
mars.actors and includes schedulers, workers and web services.
The graph submitted by the client to Mars Web Service consists of tensors. The web service receives the graphs and submits them to a scheduler. Before submitting the job to each worker, a Mars scheduler compiles the tensor graph into a graph composed of chunks and operands, and then analyzes and splits the graph. Then, the scheduler creates a series of OperandActors that control the execution of a single operand in all schedulers based on the consistent hash. Operands are scheduled in a topological order. Once all operands have been executed, the entire figure is marked as completed, and the client can pull the results from the Web. The entire execution process is shown in the following figure.
Submitting a Job
The client submits jobs to the Mars service through the RESTful API. The client writes the code on the tensor, and then converts the tensor operation into a graph composed of tensors through
session.run(tensor) and submits it to the web API. After that, the web API submits the job to the SessionActor and creates a GraphActor in the cluster for graph analysis and management. The client starts to query the execution state of the graph until the execution ends.
In the GraphActor, we convert the tensor graph into a graph composed of operands and chunks according to the chunks settings first. The process enables the graph to be further split and executed in parallel. Afterwards, we conduct a series of analyses on the graph to obtain the priority of operands and assign workers to the starting operand. For details about this part, see the section “Prepare the execution graph”. Then, each operand creates an OperandActor to control the specific execution of the operand. When the operand is in the
READY state (as described in the
Operand state section), the scheduler selects the target worker for the operand, and then the job is submitted to the worker for actual execution.
Controlling the Execution
When an operand is submitted to a worker, the OperandActor waits for a callback on the worker. If the operand is executed successfully, the successor for the operand is scheduled. If the operand fails to execute, the OperandActor tries several times. If it still fails, the execution is marked as failed.
Canceling a Job
Clients can cancel running jobs using the RESTful API. The cancel request is written to the state storage of the graph, and the cancel interface on the GraphActor is called. If the job is in the preparation phase, it ends immediately after the stop request is detected, otherwise the request is sent to each OperandActor and the state is set to CANCELLING. If the operand is not running at this time, the operand state is directly set to CANCELLED. If the operand is running, the stop request is sent to the worker and causes an ExecutionInterrupted error, which is returned to OperandActor. At this time, the operand state is marked as CANCELLED.
Preparing the Execution Graph
When a tensor graph is submitted to the Mars scheduler, a finer-grained graph composed of operands and chunks is generated according to the chunks parameters contained in the data source.
Compressing a Graph
After the chunk graph is generated, we scale down the size of the graph by fusing the adjacent nodes in the graph. This fusing also enables us to make full use of accelerated libraries, such as numexpr, to accelerate the computation process. Currently, Mars only fuses operands that form a single chain. For example, when the following code is executed:
import mars.tensor as mt
a = mt.random.rand(100, chunks=100)
b = mt.random.rand(100, chunks=100)
c = (a + b).sum()
Mars fuses the ADD and SUM operands into a FUSE node. Rand operands are not fused, because they do not form a simple straight line with ADD and SUM.
Allocating an Initial Worker
It is critical to allocate workers to operands for the performance of the graph execution. Random allocation of initial operands may result in huge network overhead and imbalance of job allocation among different workers. The allocation of non-initial nodes can be easily determined according to the physical distribution of the data generated by its precursor and the idle state of each worker. Therefore, in the preparation phase of the execution graph, we only consider the allocation of initial operands.
We should follow several principles for initial worker allocation. First, the operands allocated to each worker needs to be balanced as much as possible. This enables the computing cluster to have a higher utilization rate during the entire execution phase, which is especially important in the final phase of execution. Second, the initial node allocation requires minimal network traffic when subsequent nodes are executed. In other words, the initial node allocation should fully follow the locality principle.
It should be noted that the principles above may conflict with each other in some cases. An allocation solution with minimal network traffic may be very skewed. We have developed a heuristic algorithm to obtain the balance between the two goals. The algorithm is described as follows:
- The first initial node and the first machine in the list are selected;
- In the undirected graph converted from the operand graph, depth-first search is started from the node;
- If another unallocated initial node is accessed, we allocate it to the machine selected in step 1;
- When the total number of operands accessed exceeds the average number of operands accepted by each worker, the allocation is stopped;
- If there are still workers who have not been allocated operands, go to step 1. Otherwise, the computation ends.
When a graph composed of operands is executed, an appropriate execution sequence reduces the amount of data temporarily stored in the cluster, thus reducing the possibility of data being dumped to disk. An appropriate worker can reduce the total network traffic during execution.
Selection Policy for Operands
An appropriate execution sequence can significantly reduce the total amount of data temporarily stored in the cluster. The following figure shows an example of the Tree Reduction. The circles represent operands, the squares represent chunks, red represents the operand is being executed, blue represents the operand can be executed, green represents the chunks generated by the operand have been stored, and gray represents the operand and its related data have been released. Assuming that we have two workers and each operand uses the same amount of resources, each figure below shows the state after 5 time units of execution under different policies. The figure on the left shows that the nodes are executed according to the hierarchy, while the figure on the right shows that the nodes are executed in order of depth-first priority. The data of 6 chunks needs to be temporarily stored in the left graph, while the data of only 2 chunks needs to be stored in the right graph.
Our goal is to reduce the total amount of data stored in the cluster, so we have set a priority policy for operands in the READY state:
- Operands with a larger depth need to be executed first;
- Operands, which are relied upon by deeper operands, needs to be executed first;
- Nodes with smaller output sizes need to be executed first.
Selection Policy for Workers
When the scheduler is ready to execute the graph, the worker of the initial operand has been determined. We allocate workers for subsequent operators based on the worker where the input data is located. If a worker has the largest input data size, the worker is selected to execute subsequent operands. If there are multiple workers with the same input data size, the resource state of each candidate worker plays a decisive role.
Each operator in Mars is separately scheduled by an OperandActor. The execution process is a process of state transition. In the OperandActor, we define a state transition function for the process of entering each state. The initial operand is in READY state during initialization, while the non-initial operand is in UNSCHEDULED state during initialization. When the given conditions are met, the operand is transitioned to another state and the corresponding operations are performed. The process of state transition can be seen in the following figure:
The following describes the meaning of each state and the operations performed by Mars in these states.
- UNSCHEDULED: An operand is in this state when its upstream data is not ready.
- READY: An operand is in this state when all upstream input data is ready. After it enters this state, the OperandActor submits jobs to all workers selected in the AssignerActor. If a worker is ready to run a job, it sends a message to the scheduler. The scheduler sends a stop message to other workers, and then send a message to the worker to start the job execution.
- RUNNING: An operand is in this state when its execution has been started. When it enters this state, the OperandActor checks whether the job has been submitted. If not, the OperandActor constructs a graph composed of FetchChunk Operands and the current operand, and submits it to the worker. Afterwards, the OperandActor registers a callback in the worker to obtain the message indicating that the job is completed.
- FINISHED: An operand is in this state when the job is completed. When an operand enters this state and has no successors, a message is sent to the GraphActor to determine whether the execution of the entire graph has ended. At the same time, the OperandActor sends a message to its precursors and successors that the execution is completed. If a precursor receives the message, it checks whether all the successors have been completed. If so, the data on the current operand can be released. If a successor receives the message, it checks whether all the precursors have been completed. If so, the successor state can be transitioned to READY.
- FREED: An operand is in this state when all its data has been released.
- FATAL: An Operand is in this state when all attempts to re-execute fail. When an operand enters this state, it passes the same state to the successor node.
- CANCELLING: An operand is in this state when it is being canceled. If the job is currently executing, a request to cancel execution is sent to the worker.
- CANCELLED: An operand is in this state when the execution is canceled and stopped. If the execution enters this state, the OperandActor attempts to transition the state of all successors to CANCELLING.
Execution Details in Workers
A Mars worker contains multiple processes to reduce the impact of GIL on execution. The specific execution is completed in an independent process. To reduce unnecessary memory copying and inter-process communication, a Mars worker uses shared memory to store execution results.
When a job is submitted to a worker, it is first placed in a queue waiting for memory allocation. When the memory is allocated, the data on other workers or the data that has been dumped to the disk on the current worker is reloaded into the memory. At this point, all the data required for computing is already in the memory, and the actual computing process is ready to start. When the computation is completed, the worker puts the job into shared storage. The transition relationships of the four execution states are shown in the figure below
Controlling the Execution
A Mars worker controls the execution of all operators in the worker through an ExecutionActor. The actor itself is not involved in actual operations or data transfers, and only submits tasks to other actors.
The OperandActor in the scheduler submits jobs to the worker through the
enqueue_graphcall on the ExecutionActor. The worker accepts the submission of operands and caches it in the queue. When the job can be executed, the ExecutionActor sends a message to the scheduler, which determines whether to execute the operation. When the scheduler determines to execute the operand on the current worker, it calls the
start_executionmethod and registers a callback through
add_finish_callback. This design allows execution results to be received at multiple locations, which is valuable for fault recovery.
The ExecutionActor uses the
mars.promise module to process execution requests from multiple operands simultaneously. The specific execution steps are linked through the "then" method of the Promise class. When the final execution result is stored, the previously registered callback is triggered. If an error occurs in any of the previous execution steps, the error is passed to the handler function registered by the catch method and processed.
Sorting of Operands
All operands in the READY status are submitted to the worker selected by the scheduler. Therefore, for most of the execution time, the number of operands submitted to the worker is usually larger than the total number of operands that a worker can handle. The worker needs to sort the operands and then select some of them to execute. This sorting process is performed in the TaskQueueActor, which maintains a priority queue that stores information about the operands. At the same time, the TaskQueueActor regularly runs a job allocation task, allocating execution resources for the operands in the top of the priority queue until there are no additional resources to run operands. This allocation process is also triggered when a new operand is submitted or when the execution of an operand is completed.
A Mars worker manages two aspects of memory. The first part is the private memory of each worker process, which is held by each process itself. The second part is the memory shared by all processes, which is held by plasma_store in Apache Arrow.
To avoid process memory overflow, we have introduced a worker-level QuotaActor to allocate process memory. Before an operand starts executing, it sends a batch of memory requests to the QuotaActor for input and output chunks. If the remaining memory space can meet the request, the request is accepted by the QuotaActor. Otherwise, requests are queued to wait for free resources. When the relevant memory is released, the requested resources are also released. At this point, the QuotaActor can allocate resources to other operands.
The shared memory is managed by plasma_store, which usually takes up 50% of the total memory. There is no possibility of overflow, so this part of memory is allocated directly through the relevant plasma_store method without going through the QuotaActor. When the shared memory is used up, the Mars worker tries to dump some unused chunks to the disk to free up space for new chunks.
The data in chunks dumped from the shared memory to the disk may be reused by subsequent operands, while reloading the data from the disk into the shared memory may be very IO resource-intensive, especially when the shared memory is exhausted and other chunks need to be dumped to the disk to accommodate the loaded chunks. Therefore, when data sharing is not needed (for example, the chunk is only used by one operand), we load the chunk directly into the private memory of the process, instead of the shared memory. This can significantly reduce the total execution time of the job.
Mars is currently undergoing rapid iteration. We are considering implementing the worker-level failover and shuffle support in the near future, and scheduler-level failover is also in planning.