How to Execute Mars in a Distributed Manner


Mars provides a library for distributed execution of tensors. The library is written using the actor model implemented by mars.actors and includes schedulers, workers and web services.

Submitting a Job

The client submits jobs to the Mars service through the RESTful API. The client writes the code on the tensor, and then converts the tensor operation into a graph composed of tensors through and submits it to the web API. After that, the web API submits the job to the SessionActor and creates a GraphActor in the cluster for graph analysis and management. The client starts to query the execution state of the graph until the execution ends.

Controlling the Execution

When an operand is submitted to a worker, the OperandActor waits for a callback on the worker. If the operand is executed successfully, the successor for the operand is scheduled. If the operand fails to execute, the OperandActor tries several times. If it still fails, the execution is marked as failed.

Canceling a Job

Clients can cancel running jobs using the RESTful API. The cancel request is written to the state storage of the graph, and the cancel interface on the GraphActor is called. If the job is in the preparation phase, it ends immediately after the stop request is detected, otherwise the request is sent to each OperandActor and the state is set to CANCELLING. If the operand is not running at this time, the operand state is directly set to CANCELLED. If the operand is running, the stop request is sent to the worker and causes an ExecutionInterrupted error, which is returned to OperandActor. At this time, the operand state is marked as CANCELLED.

Preparing the Execution Graph

When a tensor graph is submitted to the Mars scheduler, a finer-grained graph composed of operands and chunks is generated according to the chunks parameters contained in the data source.

Compressing a Graph

After the chunk graph is generated, we scale down the size of the graph by fusing the adjacent nodes in the graph. This fusing also enables us to make full use of accelerated libraries, such as numexpr, to accelerate the computation process. Currently, Mars only fuses operands that form a single chain. For example, when the following code is executed:

import mars.tensor as mt
a = mt.random.rand(100, chunks=100)
b = mt.random.rand(100, chunks=100)
c = (a + b).sum()

Allocating an Initial Worker

It is critical to allocate workers to operands for the performance of the graph execution. Random allocation of initial operands may result in huge network overhead and imbalance of job allocation among different workers. The allocation of non-initial nodes can be easily determined according to the physical distribution of the data generated by its precursor and the idle state of each worker. Therefore, in the preparation phase of the execution graph, we only consider the allocation of initial operands.

  1. The first initial node and the first machine in the list are selected;
  2. In the undirected graph converted from the operand graph, depth-first search is started from the node;
  3. If another unallocated initial node is accessed, we allocate it to the machine selected in step 1;
  4. When the total number of operands accessed exceeds the average number of operands accepted by each worker, the allocation is stopped;
  5. If there are still workers who have not been allocated operands, go to step 1. Otherwise, the computation ends.

Scheduling Policy

When a graph composed of operands is executed, an appropriate execution sequence reduces the amount of data temporarily stored in the cluster, thus reducing the possibility of data being dumped to disk. An appropriate worker can reduce the total network traffic during execution.

Selection Policy for Operands

An appropriate execution sequence can significantly reduce the total amount of data temporarily stored in the cluster. The following figure shows an example of the Tree Reduction. The circles represent operands, the squares represent chunks, red represents the operand is being executed, blue represents the operand can be executed, green represents the chunks generated by the operand have been stored, and gray represents the operand and its related data have been released. Assuming that we have two workers and each operand uses the same amount of resources, each figure below shows the state after 5 time units of execution under different policies. The figure on the left shows that the nodes are executed according to the hierarchy, while the figure on the right shows that the nodes are executed in order of depth-first priority. The data of 6 chunks needs to be temporarily stored in the left graph, while the data of only 2 chunks needs to be stored in the right graph.

  1. Operands with a larger depth need to be executed first;
  2. Operands, which are relied upon by deeper operands, needs to be executed first;
  3. Nodes with smaller output sizes need to be executed first.

Selection Policy for Workers

When the scheduler is ready to execute the graph, the worker of the initial operand has been determined. We allocate workers for subsequent operators based on the worker where the input data is located. If a worker has the largest input data size, the worker is selected to execute subsequent operands. If there are multiple workers with the same input data size, the resource state of each candidate worker plays a decisive role.

Operand State

Each operator in Mars is separately scheduled by an OperandActor. The execution process is a process of state transition. In the OperandActor, we define a state transition function for the process of entering each state. The initial operand is in READY state during initialization, while the non-initial operand is in UNSCHEDULED state during initialization. When the given conditions are met, the operand is transitioned to another state and the corresponding operations are performed. The process of state transition can be seen in the following figure:

  • UNSCHEDULED: An operand is in this state when its upstream data is not ready.
  • READY: An operand is in this state when all upstream input data is ready. After it enters this state, the OperandActor submits jobs to all workers selected in the AssignerActor. If a worker is ready to run a job, it sends a message to the scheduler. The scheduler sends a stop message to other workers, and then send a message to the worker to start the job execution.
  • RUNNING: An operand is in this state when its execution has been started. When it enters this state, the OperandActor checks whether the job has been submitted. If not, the OperandActor constructs a graph composed of FetchChunk Operands and the current operand, and submits it to the worker. Afterwards, the OperandActor registers a callback in the worker to obtain the message indicating that the job is completed.
  • FINISHED: An operand is in this state when the job is completed. When an operand enters this state and has no successors, a message is sent to the GraphActor to determine whether the execution of the entire graph has ended. At the same time, the OperandActor sends a message to its precursors and successors that the execution is completed. If a precursor receives the message, it checks whether all the successors have been completed. If so, the data on the current operand can be released. If a successor receives the message, it checks whether all the precursors have been completed. If so, the successor state can be transitioned to READY.
  • FREED: An operand is in this state when all its data has been released.
  • FATAL: An Operand is in this state when all attempts to re-execute fail. When an operand enters this state, it passes the same state to the successor node.
  • CANCELLING: An operand is in this state when it is being canceled. If the job is currently executing, a request to cancel execution is sent to the worker.
  • CANCELLED: An operand is in this state when the execution is canceled and stopped. If the execution enters this state, the OperandActor attempts to transition the state of all successors to CANCELLING.

Execution Details in Workers

A Mars worker contains multiple processes to reduce the impact of GIL on execution. The specific execution is completed in an independent process. To reduce unnecessary memory copying and inter-process communication, a Mars worker uses shared memory to store execution results.

Controlling the Execution

A Mars worker controls the execution of all operators in the worker through an ExecutionActor. The actor itself is not involved in actual operations or data transfers, and only submits tasks to other actors.

Sorting of Operands

All operands in the READY status are submitted to the worker selected by the scheduler. Therefore, for most of the execution time, the number of operands submitted to the worker is usually larger than the total number of operands that a worker can handle. The worker needs to sort the operands and then select some of them to execute. This sorting process is performed in the TaskQueueActor, which maintains a priority queue that stores information about the operands. At the same time, the TaskQueueActor regularly runs a job allocation task, allocating execution resources for the operands in the top of the priority queue until there are no additional resources to run operands. This allocation process is also triggered when a new operand is submitted or when the execution of an operand is completed.

Memory Management

A Mars worker manages two aspects of memory. The first part is the private memory of each worker process, which is held by each process itself. The second part is the memory shared by all processes, which is held by plasma_store in Apache Arrow.

Future Work

Mars is currently undergoing rapid iteration. We are considering implementing the worker-level failover and shuffle support in the near future, and scheduler-level failover is also in planning.



Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website: