1
1
#!/usr/bin/python3
2
2
"""Roads."""
3
3
import os
4
+ from dataclasses import dataclass
4
5
from enum import auto
5
6
from enum import Enum
7
+ from typing import Any
6
8
7
9
from tqdm import tqdm
8
10
@@ -28,69 +30,151 @@ class CityRoadType(Enum):
28
30
ACCESS_ROAD = auto ()
29
31
30
32
31
- HIGHWAY_TAGS_PER_CITY_ROAD_TYPE = {
32
- CityRoadType .HIGHWAY : ["motorway" , "trunk" , "primary" ],
33
- CityRoadType .SECONDARY_ROAD : ["tertiary" , "secondary" ],
34
- CityRoadType .STREET : ["residential" , "living_street" ],
35
- CityRoadType .ACCESS_ROAD : ["unclassified" , "service" ]
33
+ class RoadPrecisionLevel :
34
+ """City road precision level."""
35
+ def __init__ (self , name : str , priority : int ):
36
+ self .name = name
37
+ self .priority = priority
38
+
39
+ def __str__ (self ) -> str :
40
+ return self .name
41
+
42
+ def __eq__ (self , other : Any ) -> bool :
43
+ if isinstance (other , RoadPrecisionLevel ):
44
+ return self .priority == other .priority
45
+ return False
46
+
47
+ def __lt__ (self , other : 'RoadPrecisionLevel' ) -> bool :
48
+ if isinstance (other , RoadPrecisionLevel ):
49
+ return self .priority < other .priority
50
+ return NotImplemented
51
+
52
+
53
+ class CityRoadPrecision (Enum ):
54
+ """Enum defining different road precision levels."""
55
+ VERY_HIGH = RoadPrecisionLevel (name = "Very-High" , priority = 3 )
56
+ HIGH = RoadPrecisionLevel (name = "High" , priority = 2 )
57
+ MEDIUM = RoadPrecisionLevel (name = "Medium" , priority = 1 )
58
+ LOW = RoadPrecisionLevel (name = "Low" , priority = 0 )
59
+
60
+ @classmethod
61
+ def from_string (cls , precision_name : str ) -> 'CityRoadPrecision' :
62
+ """Convert a string representation to a CityRoadPrecision enum value."""
63
+ for precision in cls :
64
+ if precision .value .name .lower () == precision_name .lower ():
65
+ return precision
66
+ raise ValueError (f"Invalid precision level: { precision_name } " )
67
+
68
+ @property
69
+ def priority (self ) -> int :
70
+ """Get the priority value for the precision level."""
71
+ return self .value .priority
72
+
73
+
74
+ @dataclass (frozen = True )
75
+ class RoadTypeData :
76
+ """Data for each type of city road."""
77
+ tags : list [str ]
78
+ priority : int
79
+ query_name : str
80
+
81
+ # Dictionary of RoadTypeData for each CityRoadType, ordered by priority
82
+ ROAD_TYPE_DATA : dict [CityRoadType , RoadTypeData ] = {
83
+ CityRoadType .HIGHWAY : RoadTypeData (tags = ["motorway" , "trunk" , "primary" ], priority = 0 , query_name = "highway" ),
84
+ CityRoadType .SECONDARY_ROAD : RoadTypeData (tags = ["tertiary" , "secondary" ], priority = 1 , query_name = "secondary_roads" ),
85
+ CityRoadType .STREET : RoadTypeData (tags = ["residential" , "living_street" ], priority = 2 , query_name = "street" ),
86
+ CityRoadType .ACCESS_ROAD : RoadTypeData (tags = ["unclassified" , "service" ], priority = 3 , query_name = "access_roads" )
36
87
}
37
88
38
- QUERY_NAME_PER_CITY_ROAD_TYPE = {
39
- CityRoadType .HIGHWAY : "highway" ,
40
- CityRoadType .SECONDARY_ROAD : "secondary_roads" ,
41
- CityRoadType .STREET : "street" ,
42
- CityRoadType .ACCESS_ROAD : "access_roads"
43
- }
44
-
45
- assert HIGHWAY_TAGS_PER_CITY_ROAD_TYPE .keys () == QUERY_NAME_PER_CITY_ROAD_TYPE .keys ()
89
+ # Automatically check that ROAD_TYPE_DATA is sorted by priority
90
+ assert list (ROAD_TYPE_DATA .keys ()) == sorted (ROAD_TYPE_DATA .keys (),
91
+ key = lambda road_type : ROAD_TYPE_DATA [road_type ].priority )
46
92
47
- CityRoads = dict [CityRoadType , list [ListLonLat ]]
48
93
94
+ def get_city_roads_with_priority_better_than (precision : CityRoadPrecision ) -> list [CityRoadType ]:
95
+ """Returns a list of CityRoadType with a priority better than the given x."""
96
+ # Filter ROAD_TYPE_DATA to get only those with priority less than x
97
+ return [
98
+ road_type
99
+ for road_type , data in ROAD_TYPE_DATA .items ()
100
+ if data .priority <= precision .priority
101
+ ]
49
102
50
103
@profile
51
104
def prepare_download_city_roads (query : OverpassQuery ,
52
- bounds : GpxBounds ) -> None :
105
+ bounds : GpxBounds ,
106
+ road_precision : CityRoadPrecision ) -> list [CityRoadType ]:
53
107
"""Download roads map from OpenStreetMap.
54
108
55
109
Args:
56
110
query: OverpassQuery class that merge all queries into a single one
57
111
bounds: GPX bounds
112
+ road_precision: string with the road precision desired
58
113
59
114
Returns:
60
115
List of roads (sequence of lon, lat coordinates) for each road type
61
116
"""
62
117
cache_pkl = ROADS_CACHE .get_path (bounds )
63
118
119
+ logger .debug (f"Road precision: { road_precision .name } " )
120
+
121
+ roads_to_plot : list [CityRoadType ] = get_city_roads_with_priority_better_than (road_precision )
122
+
64
123
if os .path .isfile (cache_pkl ):
124
+ cityroads_cache : dict [CityRoadType , list [ListLonLat ]] = read_pickle (file_path = cache_pkl )
125
+ roads_types_cache = list (cityroads_cache .keys ())
65
126
query .add_cached_result (ROADS_CACHE .name , cache_file = cache_pkl )
66
- return
67
127
68
- for city_road_type in tqdm (CityRoadType ):
69
- highway_tags_str = "|" .join (HIGHWAY_TAGS_PER_CITY_ROAD_TYPE [city_road_type ])
70
- query .add_overpass_query (QUERY_NAME_PER_CITY_ROAD_TYPE [city_road_type ],
71
- [f"way['highway'~'({ highway_tags_str } )']" ],
72
- bounds ,
73
- include_way_nodes = True ,
74
- add_relative_margin = None )
128
+ if all (key in roads_types_cache for key in roads_to_plot ):
129
+ logger .debug ("Roads needed already downloaded" )
130
+ roads_to_plot = []
131
+ else :
132
+ logger .debug ("Downloading additionnal roads" )
133
+ roads_to_plot = [key for key in roads_to_plot if key not in roads_types_cache ]
134
+
135
+ if len (roads_to_plot ) > 0 :
136
+ for city_road_type in tqdm (roads_to_plot ):
137
+ highway_tags_str = "|" .join (ROAD_TYPE_DATA [city_road_type ].tags )
138
+ query_name = ROAD_TYPE_DATA [city_road_type ].query_name
139
+ query .add_overpass_query (query_name ,
140
+ [f"way['highway'~'({ highway_tags_str } )']" ],
141
+ bounds ,
142
+ include_way_nodes = True ,
143
+ add_relative_margin = None )
144
+ return roads_to_plot
75
145
76
146
77
147
@profile
78
148
def process_city_roads (query : OverpassQuery ,
79
- bounds : GpxBounds ) -> dict [CityRoadType , list [ListLonLat ]]:
149
+ bounds : GpxBounds ,
150
+ city_roads_downloaded : list [CityRoadType ],
151
+ road_precision : CityRoadPrecision ) -> dict [CityRoadType ,list [ListLonLat ]]:
80
152
"""Query the overpass API to get the roads of a city."""
81
- if query .is_cached (ROADS_CACHE .name ):
82
- cache_file = query .get_cache_file (ROADS_CACHE .name )
83
- return read_pickle (cache_file )
84
-
85
- with Profiling .Scope ("Process City Roads" ):
86
- roads = dict ()
87
- for city_road_type , query_name in QUERY_NAME_PER_CITY_ROAD_TYPE .items ():
88
- logger .debug (f"Query name : { query_name } " )
89
- result = query .get_query_result (query_name )
90
- roads [city_road_type ] = get_ways_coordinates_from_results (result )
153
+ roads_to_plot : list [CityRoadType ] = get_city_roads_with_priority_better_than (road_precision )
154
+
155
+ if len (city_roads_downloaded ) > 0 :
156
+ # We need to process some downloaded road types.
157
+ if query .is_cached (ROADS_CACHE .name ):
158
+ cache_file = query .get_cache_file (ROADS_CACHE .name )
159
+ roads = read_pickle (cache_file )
160
+ else :
161
+ roads = dict ()
162
+ with Profiling .Scope ("Process City Roads" ):
163
+ for city_road_type in city_roads_downloaded :
164
+ query_name = ROAD_TYPE_DATA [city_road_type ].query_name
165
+ result = query .get_query_result (query_name )
166
+ roads [city_road_type ] = get_ways_coordinates_from_results (result )
167
+ cache_pkl = ROADS_CACHE .get_path (bounds )
168
+ write_pickle (cache_pkl , roads )
169
+ query .add_cached_result (ROADS_CACHE .name , cache_file = cache_pkl )
170
+ roads_to_return = {road_type : roads [road_type ] for road_type in roads_to_plot }
171
+ return roads_to_return
91
172
92
- cache_pkl = ROADS_CACHE .get_path (bounds )
93
- write_pickle (cache_pkl , roads )
94
- query .add_cached_result (ROADS_CACHE .name , cache_file = cache_pkl )
173
+ elif query .is_cached (ROADS_CACHE .name ):
174
+ cache_file = query .get_cache_file (ROADS_CACHE .name )
175
+ roads = read_pickle (cache_file )
176
+ roads_to_return = {road_type : roads [road_type ] for road_type in roads_to_plot }
177
+ return roads_to_return
95
178
96
- return roads
179
+ else :
180
+ raise FileNotFoundError ("Query is supposed to be cached but it is not." )
0 commit comments