# Processing Cartesian Products with PyODPS DataFrame

By Ji Sheng

PyODPS provides DataFrame APIs to analyze and preprocess large-scale data with interfaces like Pandas. This article mainly introduces how to use PyODPS to perform Cartesian product operations.

The most common scenario of Cartesian products is the comparison or operation of two collections. Taking the computation of geographic distance as an example, assume that the big table Coordinates1 stores the longitude and latitude coordinates of destination points, with a total of M rows of data, while the small table Coordinates2 stores the longitude and latitude coordinates of departure points, with a total of N rows of data. Now, the coordinates of all the departure points closest to each of the destination points need to be computed. For a destination point, we need to compute the distance from all the departure points to the destination point and find the minimum distance. Therefore, the entire intermediate process requires M * N data records, which results in a Cartesian product.

# Haversine Formula

`def  haversine(lat1,  lon1,  lat2,  lon2):        #  lat1,  lon1  为位置  1  的经纬度坐标        #  lat2,  lon2  为位置  2  的经纬度坐标        import  numpy  as  np        dlon  =  np.radians(lon2  -  lon1)        dlat  =  np.radians(lat2  -  lat1)        a  =  np.sin(  dlat  /2  )  **2  +  np.cos(np.radians(lat1))  *  np.cos(np.radians(lat2))  *  np.sin(  dlon  /2  )  **2        c  =  2  *  np.arcsin(np.sqrt(a))        r  =  6371  #  地球平均半径，单位为公里        return  c  *  r`

# MapJoin

`In  :  df1  =  o.get_table('coordinates1').to_df()                                                                                                                                                                                        In  :  df2  =  o.get_table('coordinates2').to_df()                                                                                                                                                                                        In  :  df3  =  df1.join(df2,  mapjoin=True)                                                                                                                                                                                                        In  :  df1.schema                                                                                                                                                                                                                                                      Out:  odps.Schema  {    latitude                    float64                  longitude                  float64                  id                                string                }In  :  df2.schema                                                                                                                                                                                                                                                      Out:  odps.Schema  {    latitude                    float64                  longitude                  float64                  id                                string                }In  :  df3.schema                                                                                                                                                                                                                                                      Out:  odps.Schema  {    latitude_x                        float64                  longitude_x                      float64                  id_x                                    string                    latitude_y                        float64                  longitude_y                      float64                  id_y                                    string                }`

We can see that the _x and _y suffixes are added to the duplicate columns by default during Join operations. The suffixes can be customized by passing a binary tuple in the `suffixes` parameter. After the joined table is obtained, the distance can be computed by using the user-defined function DataFrame in PyODPS, which is very simple, clear and efficient.

`In  :  r  =  6371        ...:  dis1  =  (df3.latitude_y  -  df3.latitude_x).radians()        ...:  dis2  =  (df3.longitude_y  -  df3.longitude_x).radians()        ...:  a  =  (dis1  /  2).sin()  **  2  +  df3.latitude_x.radians().cos()  *  df3.latitude_y.radians().cos()  *  (dis2  /  2).sin()  **  2        ...:  df3['dis']  =  2  *  a.sqrt().arcsin()  *  r                                                                                                                                                                                                                                                                                                                                                                                                      In : df3.head(10)                                                                                                                        Out:     latitude_x  longitude_x id_x  latitude_y   longitude_y id_y       dis0   76.252432    59.628253    0   84.045210     6.517522    0  1246.8649811   76.252432    59.628253    0   59.061796     0.794939    1  2925.9531472   76.252432    59.628253    0   42.368304    30.119837    2  4020.6049423   76.252432    59.628253    0   81.290936    51.682749    3   584.7797484   76.252432    59.628253    0   34.665222   147.167070    4  6213.9449425   76.252432    59.628253    0   58.058854   165.471565    5  4205.2191796   76.252432    59.628253    0   79.150677    58.661890    6   323.0707857   76.252432    59.628253    0   72.622352   123.195778    7  1839.3807608   76.252432    59.628253    0   80.063614   138.845193    8  1703.7824219   76.252432    59.628253    0   36.231584    90.774527    9  4717.284949In : df1.count()                                                                                                                         Out: 2000In : df2.count()                                                                                                                         Out: 100In : df3.count()                                                                                                                         Out: 200000`

`df3` already has M * N data records. Next, if we need to know the minimum distance, we can directly call groupby to `df3`, followed by the `min` aggregate function, to get the minimum distance of each target point.

`In : df3.groupby('id_x').dis.min().head(10)                                                                                              Out:        dis_min0   323.0707851    64.7554932  1249.2831693   309.8182884  1790.4847485   385.1077396   498.8161577   615.9874678   437.7654329   272.589621`

# Table Resources

`## use dataframe udfdf1 = o.get_table('coordinates1').to_df()df2 = o.get_table('coordinates2').to_df()def func(collections):    import pandas as pd        collection = collections        ids = []    latitudes = []    longitudes = []    for r in collection:        ids.append(r.id)        latitudes.append(r.latitude)        longitudes.append(r.longitude)    df = pd.DataFrame({'id': ids, 'latitude':latitudes, 'longitude':longitudes})    def h(x):                df['dis'] = haversine(x.latitude, x.longitude, df.latitude, df.longitude)        return df.iloc[df['dis'].idxmin()]['id']    return hdf1[df1.id, df1.apply(func, resources=[df2], axis=1, reduce=True, types='string').rename('min_id')].execute(    libraries=['pandas.zip', 'python-dateutil.zip', 'pytz.zip', 'six.tar.gz'])`

In a UDF, the table resource is looped to be read into the Pandas DataFrame, and the row corresponding to the minimum value can be easily found by using the Pandas loc, thus obtaining the ID of the closest departure point.

# Global Variable

`df1 = o.get_table('coordinates1').to_df()df2 = o.get_table('coordinates2').to_df()df = df2.to_pandas()def func(x):    df['dis'] = haversine(x.latitude, x.longitude, df.latitude, df.longitude)    return df.iloc[df['dis'].idxmin()]['id']df1[df1.id, df1.apply(func, axis=1, reduce=True, types='string').rename('min_id')].execute(    libraries=['pandas.zip', 'python-dateutil.zip', 'pytz.zip', 'six.tar.gz'])`

When the function is uploaded, the global variables (df in the above code) used in the function will be pickled into the UDF. However, note that applicable use cases for this method are very limited, because the size of files to be uploaded in ODPS is limited. Therefore, if there is too much data, the resources generated by the UDF will be too large to be uploaded. In addition, for this method, it is better to ensure that the client version of the third-party package is consistent with that of the server. Otherwise, serialization problems are very likely to occur, so we recommend that you only use this method when dealing with small data volumes.

# Original Source

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

## More from Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.