What’s New with Mars — Alibaba’s Distributed Scientific Computing Engine

Image for post
Image for post

By Ji Sheng

In March 2020, Mars 0.4.0b1, 0.4.0b2, 0.3.2, and 0.3.3 were released. You can click the different links to view the release notes. Two versions were released this month due to a special circumstance. In V0.4.0b2, the urgent problems in V0.4.0b1 were fixed.

Mars Project Release Cycle

Click the GitHub project’s milestones to view the latest pre-release and official versions.

Click the GitHub project to view the categorized issues and Pull Requests (PRs).

Image for post
Image for post

The v0.4 release contains the ongoing issues and PRs, which are archived by version. In other releases, issues and PRs are categorized by module.

Features in the New Version

Better Aggregation and Group Aggregation

  • #1054 added support for DataFrame.aggregate and Series.aggregate.
  • #1019 and #1069 added support for cumulative computing such as cummax.

For example, in pandas, we can perform the following operations on MovieLens data:

In [1]: import pandas as pd                                                     In [2]: %%time 
...: df = pd.read_csv('Downloads/ml-20m/ratings.csv')
...: df.groupby('movieId').agg({'rating': ['max', 'min', 'mean', 'std']})
...:
...:
CPU times: user 5.41 s, sys: 1.28 s, total: 6.7 s
Wall time: 4.3 s
Out[2]:
rating
max min mean std
movieId
1 5.0 0.5 3.921240 0.889012
2 5.0 0.5 3.211977 0.951150
3 5.0 0.5 3.151040 1.006642
4 5.0 0.5 2.861393 1.095702
5 5.0 0.5 3.064592 0.982140
... ... ... ... ...
131254 4.0 4.0 4.000000 NaN
131256 4.0 4.0 4.000000 NaN
131258 2.5 2.5 2.500000 NaN
131260 3.0 3.0 3.000000 NaN
131262 4.0 4.0 4.000000 NaN
[26744 rows x 4 columns]

We can aggregate the data according to movie IDs to obtain the maximum, minimum, and average values and the standard deviation of user reviews.

When Mars is used:

In [1]: import mars.dataframe as md                                             In [2]: %%time 
...: df = md.read_csv('Downloads/ml-20m/ratings.csv')
...: df.groupby('movieId').agg({'rating': ['max', 'min', 'mean', 'std']}).execute()
...:
...:
CPU times: user 5.81 s, sys: 6.9 s, total: 12.7 s
Wall time: 1.54 s
Out[2]:
rating
max min mean std
movieId
1 5.0 0.5 3.921240 0.889012
2 5.0 0.5 3.211977 0.951150
3 5.0 0.5 3.151040 1.006642
4 5.0 0.5 2.861393 1.095702
5 5.0 0.5 3.064592 0.982140
... ... ... ... ...
131254 4.0 4.0 4.000000 NaN
131256 4.0 4.0 4.000000 NaN
131258 2.5 2.5 2.500000 NaN
131260 3.0 3.0 3.000000 NaN
131262 4.0 4.0 4.000000 NaN
[26744 rows x 4 columns]

The code is almost the same, except that Mars needs to be started by execute().

The size of the ratings.csv file is more than 500 MB. We can accelerate it several times over by running Mars on a notebook. As the data size increases, Mars provides better acceleration performance. If a single server cannot meet the requirements, we can use Mars to accelerate the execution by distributing consistent code on multiple servers.

Sorting

  • #1046 added support for sort_values.

We will use MovieLens data as an example again:

In [1]: import pandas as pd                                                                                               In [2]: %%time 
...: ratings = pd.read_csv('Downloads/ml-20m/ratings.csv')
...: movies = pd.read_csv('Downloads/ml-20m/movies.csv')
...: movie_rating = ratings.groupby('movieId', as_index=False).agg({'rating': 'mean'})
...: result = movie_rating.merge(movies[['movieId', 'title']], on='movieId')
...: result.sort_values(by='rating', ascending=False)
...:
...:
CPU times: user 5.17 s, sys: 1.13 s, total: 6.3 s
Wall time: 4.05 s
Out[2]:
movieId rating title
19152 95517 5.0 Barchester Chronicles, The (1982)
21842 105846 5.0 Only Daughter (2013)
17703 89133 5.0 Boys (Drenge) (1977)
21656 105187 5.0 Linotype: The Film (2012)
21658 105191 5.0 Rocaterrania (2009)
... ... ... ...
26465 129784 0.5 Xuxa in Crystal Moon (1990)
18534 92479 0.5 Kisses for My President (1964)
26475 129834 0.5 Tom and Jerry: The Lost Dragon (2014)
24207 115631 0.5 Alone for Christmas (2013)
25043 119909 0.5 Sharpe's Eagle (1993)
[26744 rows x 3 columns]

and attempt to sort the films in the dataset in descending order by average score.

The code is still the same in Mars.

In [1]: import mars.dataframe as md                                                                                       In [2]: %%time 
...: ratings = md.read_csv('Downloads/ml-20m/ratings.csv')
...: movies = md.read_csv('Downloads/ml-20m/movies.csv')
...: movie_rating = ratings.groupby('movieId', as_index=False).agg({'rating': 'mean'})
...: result = movie_rating.merge(movies[['movieId', 'title']], on='movieId')
...: result.sort_values(by='rating', ascending=False).execute()
...:
...:
CPU times: user 4.97 s, sys: 6.01 s, total: 11 s
Wall time: 1.39 s
Out[2]:
movieId rating title
19152 95517 5.0 Barchester Chronicles, The (1982)
21842 105846 5.0 Only Daughter (2013)
17703 89133 5.0 Boys (Drenge) (1977)
21656 105187 5.0 Linotype: The Film (2012)
21658 105191 5.0 Rocaterrania (2009)
... ... ... ...
26465 129784 0.5 Xuxa in Crystal Moon (1990)
18534 92479 0.5 Kisses for My President (1964)
26475 129834 0.5 Tom and Jerry: The Lost Dragon (2014)
24207 115631 0.5 Alone for Christmas (2013)
25043 119909 0.5 Sharpe's Eagle (1993)
[26744 rows x 3 columns]

Mars uses Parallel Sorting by Regular Sampling (PSRS.)

Improved Index Support

  • #1042 added support for loc.
  • #1101 added support for at and iat.
  • #1073 added support for the md.date_range method.

By using loc, we can search for index-based data more conveniently.

In [1]: import mars.dataframe as md 

In [3]: import mars.tensor as mt
In [8]: df = md.DataFrame(mt.random.rand(10000, 10), index=md.date_range('2000-1-1', periods=10000)) In [9]: df.loc['2020-3-25'].execute()
Out[9]:
0 0.372354
1 0.139235
2 0.511007
3 0.102200
4 0.908454
5 0.144455
6 0.290627
7 0.248334
8 0.912666
9 0.830526
Name: 2020-03-25 00:00:00, dtype: float64

Custom Functions, Strings, and Time Processing

  • #1063 added support for md.Series.str and md.Series.dt to process strings and time columns.

We can use apply to calculate the distance from each city (dataset) to Hangzhou (120°12' E and 30°16' N).

In [1]: import numpy as np                                                                                                In [2]: def haversine(lat1, lon1, lat2, lon2): 
...: dlon = np.radians(lon2 - lon1)
...: dlat = np.radians(lat2 - lat1)
...: a = np.sin(dlat / 2) ** 2 + np.cos(np.radians(lat1)) * np.cos(np.radians(lat2)) * np.sin(dlon / 2) ** 2
...: c = 2 * np.arcsin(np.sqrt(a))
...: r = 6371
...: return c * r
...:
In [4]: import mars.dataframe as md In [5]: df = md.read_csv('Downloads/world-cities-database/worldcitiespop.csv', chunk_bytes='16M', dtype={'Region': object}
...: )
In [6]: df.execute(fetch=False) In [8]: df.apply(lambda r: haversine(r['Latitude'], r['Longitude'], 30.25, 120.17), result_type='reduce', axis=1).execute()
Out[8]:
0 9789.135208
1 9788.270528
2 9788.270528
3 9788.270528
4 9789.307210
...
248061 10899.720735
248062 11220.703197
248063 10912.645753
248064 11318.038981
248065 11141.080171
Length: 3173958, dtype: float64

Moving Window Functions

Moving Window Functions are used frequently in the financial field. Rolling means to perform some aggregation calculations at a fixed length (or a fixed time interval). The following provides an example:

In [1]: import pandas_datareader.data as web                                                                                                                      In [2]: data = web.DataReader("^TWII", "yahoo", "2000-01-01","2020-03-25")                                                                                        In [3]: import mars.dataframe as md                                                                                                                               In [4]: df = md.DataFrame(data)                                                                                                                                   In [5]: df.rolling(10, min_periods=1).mean().execute()                                                                                                            
Out[5]:
High Low Open Close Volume Adj Close
Date
2000-01-04 8803.610352 8642.500000 8644.910156 8756.549805 0.0 8756.517578
2000-01-05 8835.645020 8655.259766 8667.754883 8803.209961 0.0 8803.177734
2000-01-06 8898.426758 8714.809896 8745.356445 8842.816732 0.0 8842.784180
2000-01-07 8909.012451 8720.964844 8772.374756 8844.580078 0.0 8844.547607
2000-01-10 8952.413867 8755.129883 8806.285742 8896.183984 0.0 8896.151172
... ... ... ... ... ... ...
2020-03-19 10423.317090 10083.132910 10370.730078 10180.533887 4149640.0 10180.533887
2020-03-20 10202.623047 9833.786914 10105.280078 9971.761914 4366130.0 9971.761914
2020-03-23 9983.399023 9611.036914 9885.659082 9763.000977 3990040.0 9763.000977
2020-03-24 9821.716016 9436.392969 9703.275098 9591.208984 3927690.0 9591.208984
2020-03-25 9685.129980 9290.444922 9543.636035 9466.308984 4003760.0 9466.308984
[4974 rows x 6 columns]

Plans for Upcoming Versions

Original Source:

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store