Ray is an open source, easy to use library for distributed computing with python. In my opinion, the biggest advantage of Ray is that python code might be parallelized as it is, so code do not have to be written in a specific way. Functions might be used as tasks and classes might be used as actors. If your process has multiple functions that produce some data and pass this data to other functions, then it’s better to use classes\actors. That will reduce sending data between workers and ideally the entire process and it’s data flow should be executed by one actor. In this post I will compare running times of spatial analytics from
my previous post about geopandas, run as a single process vs. distributed processing with Ray.
%%capture --no-display
import geopandas as gpd
import pandas as pd
import matplotlib.pyplot as plt
import warnings
import ray
import time
warnings.filterwarnings('ignore')
admins=gpd.read_file('admins/admin.shp')
roads=gpd.read_file('roads/roads_main.shp')
rivers=gpd.read_file('rivers/rivers_main.shp')
Following class will be used for single processing. You will notice later on, that exactly the same class just with additional decorator will be used for parallel processing.
class spatial_actor():
def __init__(self,admin):
self.admin=admin
def read_admin(self):
gdf_a=gpd.read_file('admins/admin.shp')
if self.admin=='all':
self.gdf_a=gdf_a
elif set(self.admin).issubset(['śląskie','opolskie','świętokrzyskie','pomorskie','podlaskie','zachodniopomorskie',
'dolnośląskie','wielkopolskie','podkarpackie','małopolskie','warmińsko-mazurskie',
'łódzkie','mazowieckie','kujawsko-pomorskie','lubelskie','lubuskie']):
self.gdf_a=gdf_a[gdf_a['JPT_NAZWA_'].isin(self.admin)]
else:
raise ValueError('Provided admin list is not valid!')
def read_roads(self):
self.gdf_r=gpd.read_file('roads/roads_main.shp',mask=self.gdf_a)
def read_rivers(self):
self.gdf_w=gpd.read_file('rivers/rivers_main.shp',mask=self.gdf_a)
def project(self, crs):
self.gdf_a.to_crs(crs, inplace=True)
self.gdf_r.to_crs(crs, inplace=True)
self.gdf_w.to_crs(crs, inplace=True)
def get_road_stats_per_admin(self):
roads_admins=self.gdf_r.sjoin(self.gdf_a, how='left')
roads_admins['length']=roads_admins.geometry.length/1000
self.road_stats=roads_admins.groupby('JPT_NAZWA_')['length'].sum()
return self.road_stats
def get_bridges(self):
bridge_loc=self.gdf_r.unary_union.intersection(self.gdf_w.unary_union)
bridge_buff=gpd.GeoDataFrame(geometry=[p.buffer(10) for p in bridge_loc],crs=32634)
self.gdf_b=self.gdf_r.clip(mask=bridge_buff, keep_geom_type=False).to_crs(4326)
return self.gdf_b
def execute_road_stats_and_bridges():
start=time.time()
actor=spatial_actor('all')
actor.read_admin()
actor.read_roads()
actor.read_rivers()
actor.project(32634)
result_bridges=actor.get_bridges()
result_bridges.drop_duplicates(inplace=True)
result_stats=actor.get_road_stats_per_admin()
end=time.time()
print(f'Running times {end-start}')
return result_stats, result_bridges
result_local=execute_road_stats_and_bridges()
Running times 165.72526788711548
Single processing running time is 165.7 sec.
result_local[0]
JPT_NAZWA_ dolnośląskie 4520.744740 kujawsko-pomorskie 3733.476072 lubelskie 3849.608949 lubuskie 2849.326089 mazowieckie 6754.754435 małopolskie 3210.074281 opolskie 2140.885651 podkarpackie 3229.634655 podlaskie 2703.961342 pomorskie 3627.349681 warmińsko-mazurskie 3345.741497 wielkopolskie 5596.932594 zachodniopomorskie 3677.008321 łódzkie 3573.407349 śląskie 3806.134792 świętokrzyskie 2067.127021 Name: length, dtype: float64
result_local[1]
osm_id | code | fclass | name | ref | oneway | maxspeed | layer | bridge | tunnel | geometry | |
---|---|---|---|---|---|---|---|---|---|---|---|
103874 | 26552134 | 5114 | secondary | None | 958 | B | 0 | 1 | T | F | LINESTRING (19.84935 49.29105, 19.84947 49.29091) |
108792 | 296393209 | 5114 | secondary | None | 958 | B | 0 | 0 | F | F | LINESTRING (19.84933 49.29106, 19.84935 49.29105) |
103878 | 26552159 | 5114 | secondary | None | 958 | B | 50 | 1 | T | F | LINESTRING (19.81896 49.35792, 19.81892 49.35774) |
103908 | 26579303 | 5114 | secondary | Kolejowa | 957 | B | 50 | 1 | T | F | LINESTRING (19.86247 49.44046, 19.86273 49.44040) |
103870 | 26552130 | 5114 | secondary | None | 959 | B | 50 | 1 | T | F | LINESTRING (19.81311 49.36749, 19.81283 49.36749) |
… | … | … | … | … | … | … | … | … | … | … | … |
118459 | 40674169 | 5114 | secondary | None | 1514G | B | 50 | 1 | T | F | LINESTRING (18.43984 54.63708, 18.43981 54.63690) |
119206 | 123267800 | 5114 | secondary | GdyÅska | 1514G | B | 50 | 0 | F | F | LINESTRING (18.43785 54.64092, 18.43794 54.64075) |
120629 | 200760872 | 5114 | secondary | None | 214 | B | 40 | 0 | F | F | LINESTRING (17.59644 54.67753, 17.59647 54.67750) |
124837 | 437067145 | 5114 | secondary | None | 214 | B | 40 | 0 | T | F | LINESTRING (17.59637 54.67767, 17.59644 54.67753) |
127003 | 660801683 | 5114 | secondary | Morska | 1306G | B | 40 | 0 | F | F | LINESTRING (17.79064 54.75550, 17.79071 54.755… |
2481 rows × 11 columns
Now it’s time to see ray in action. Like I said before, our code can be used as it is. The only difference is additional decorator before class definition: @ray.remote
@ray.remote
class spatial_actor():
def __init__(self,admin):
self.admin=admin
def read_admin(self):
gdf_a=gpd.read_file('admins/admin.shp')
if self.admin=='all':
self.gdf_a=gdf_a
elif set(self.admin).issubset(['śląskie','opolskie','świętokrzyskie','pomorskie','podlaskie','zachodniopomorskie',
'dolnośląskie','wielkopolskie','podkarpackie','małopolskie','warmińsko-mazurskie',
'łódzkie','mazowieckie','kujawsko-pomorskie','lubelskie','lubuskie']):
self.gdf_a=gdf_a[gdf_a['JPT_NAZWA_'].isin(self.admin)]
else:
raise ValueError('Provided admin list is not valid!')
def read_roads(self):
self.gdf_r=gpd.read_file('roads/roads_main.shp',mask=self.gdf_a)
def read_rivers(self):
self.gdf_w=gpd.read_file('rivers/rivers_main.shp',mask=self.gdf_a)
def project(self, crs):
self.gdf_a.to_crs(crs, inplace=True)
self.gdf_r.to_crs(crs, inplace=True)
self.gdf_w.to_crs(crs, inplace=True)
def get_road_stats_per_admin(self):
roads_admins=self.gdf_r.sjoin(self.gdf_a, how='left')
roads_admins['length']=roads_admins.geometry.length/1000
self.road_stats=roads_admins.groupby('JPT_NAZWA_')['length'].sum()
return self.road_stats
def get_bridges(self):
self.bridge_loc=self.gdf_r.unary_union.intersection(self.gdf_w.unary_union)
self.bridge_buff=gpd.GeoDataFrame(geometry=[p.buffer(10) for p in self.bridge_loc],crs=32634)
self.gdf_b=self.gdf_r.clip(mask=self.bridge_buff, keep_geom_type=False).to_crs(4326)
return self.gdf_b
Next function will execute our distributed process, and here is few methods which are specific to ray.
ray.init() call starts a Ray runtime on your machine. We can specify number of resources that we want to use i.e. number of cpus or gpus, but for this example, we will allow ray to decide about resources to be used. At the end of your script you will find ray.shutdown(), which terminates Ray runtime.
spatial_actor.remote() – this spatial_actor class object call creates our actor. We have 16 L2 admin regions in Poland, thus we have 16 actors.
To fetch the result of processing from each actor, you need to use get call i.e. ray.get([actor1.read_admin.remote(), actor2.read_admin.remote(),actor3.read_admin.remote(),…..])
def execute_road_stats_and_bridges():
ray.init()
start=time.time()
actor1=spatial_actor.remote(['śląskie'])
actor2=spatial_actor.remote(['świętokrzyskie'])
actor3=spatial_actor.remote(['podlaskie'])
actor4=spatial_actor.remote(['dolnośląskie'])
actor5=spatial_actor.remote(['podkarpackie'])
actor6=spatial_actor.remote(['warmińsko-mazurskie'])
actor7=spatial_actor.remote(['mazowieckie'])
actor8=spatial_actor.remote(['lubelskie'])
actor9=spatial_actor.remote(['opolskie'])
actor10=spatial_actor.remote(['pomorskie'])
actor11=spatial_actor.remote(['zachodniopomorskie'])
actor12=spatial_actor.remote(['wielkopolskie'])
actor13=spatial_actor.remote(['małopolskie'])
actor14=spatial_actor.remote(['łódzkie'])
actor15=spatial_actor.remote(['kujawsko-pomorskie'])
actor16=spatial_actor.remote(['lubuskie'])
actors=[actor1,actor2,actor3,actor4,actor5,actor6,actor7,actor8,
actor9,actor10,actor11,actor12,actor13,actor14,actor15,actor16]
ray.get(list(map(lambda x:x.read_admin.remote(),actors)))
ray.get(list(map(lambda x:x.read_roads.remote(),actors)))
ray.get(list(map(lambda x:x.read_rivers.remote(),actors)))
ray.get(list(map(lambda x:x.project.remote(32634),actors)))
result_bridges=pd.concat(ray.get(list(map(lambda x:x.get_bridges.remote(),actors))))
result_bridges.drop_duplicates(inplace=True)
result_stats=pd.concat(ray.get(list(map(lambda x:x.get_road_stats_per_admin.remote(),actors))))
end=time.time()
print(f'Running times {end-start}')
ray.shutdown()
return result_stats, result_bridges
result_ray=execute_road_stats_and_bridges()
Running times 49.32021403312683
Running time of our process with Ray is more than 3 times faster than singles processing approach.
result_ray[0]
JPT_NAZWA_ śląskie 3806.134792 świętokrzyskie 2067.127021 podlaskie 2703.961342 dolnośląskie 4520.744740 podkarpackie 3229.634655 warmińsko-mazurskie 3345.741497 mazowieckie 6754.754435 lubelskie 3849.608949 opolskie 2140.885651 pomorskie 3627.349681 zachodniopomorskie 3677.008321 wielkopolskie 5596.932594 małopolskie 3210.074281 łódzkie 3573.407349 kujawsko-pomorskie 3733.476072 lubuskie 2849.326089 Name: length, dtype: float64
result_ray[1]
osm_id | code | fclass | name | ref | oneway | maxspeed | layer | bridge | tunnel | geometry | |
---|---|---|---|---|---|---|---|---|---|---|---|
15942 | 26552142 | 5114 | secondary | None | 958 | B | 0 | 1 | T | F | LINESTRING (19.81895 49.39608, 19.81912 49.39594) |
15964 | 26579303 | 5114 | secondary | Kolejowa | 957 | B | 50 | 1 | T | F | LINESTRING (19.86247 49.44046, 19.86273 49.44040) |
18518 | 284516714 | 5114 | secondary | None | 958 | B | 0 | 1 | T | F | LINESTRING (19.89012 49.54462, 19.89025 49.544… |
19528 | 408756008 | 5114 | secondary | None | 958 | B | 0 | 0 | F | F | LINESTRING (19.89012 49.54462, 19.89012 49.54462) |
15959 | 26568242 | 5113 | primary | None | 47 | B | 100 | 1 | T | F | LINESTRING (19.93035 49.59553, 19.93038 49.59536) |
… | … | … | … | … | … | … | … | … | … | … | … |
752 | 33732182 | 5113 | primary | None | B 1 | B | 100 | 1 | T | F | LINESTRING (14.62101 52.57465, 14.62122 52.57478) |
763 | 34203180 | 5113 | primary | WÅadysÅawa Sikorskiego | 31 | B | 50 | 1 | T | F | LINESTRING (14.64574 52.58370, 14.64590 52.58385) |
1342 | 143317901 | 5111 | motorway | Autostrada WolnoÅci | A2 | F | 140 | 1 | T | F | LINESTRING (15.07017 52.32466, 15.06988 52.32465) |
882 | 71358223 | 5113 | primary | None | B 1 | B | 50 | 1 | T | F | LINESTRING (14.62876 52.57901, 14.62864 52.57888) |
738 | 33507506 | 5113 | primary | None | 22 | B | 50 | 1 | T | F | LINESTRING (14.62878 52.57904, 14.62876 52.57901) |
2482 rows × 11 columns
As you can see, in our case, the same code with Ray works 3 times faster. In this scenario, although
we use distributed execution, entire calculation was done on our machine. Imagine what we can achieve scaling it further with using some cloud solutions i.e. AWS EMR….