Skip to content

Commit

Permalink
Adding cache for transfer_path
Browse files Browse the repository at this point in the history
  • Loading branch information
azime committed Feb 8, 2024
1 parent 7bec236 commit 1ff0456
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 22 deletions.
1 change: 1 addition & 0 deletions source/jormungandr/jormungandr/default_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@
'TIMEOUT_SYNTHESE': 30,
'TIMEOUT_KRAKEN_COVERAGES': 60,
'FETCH_S3_DATA_TIMEOUT': 24 * 60,
"TIMEOUT_TRANSFER_PATH": 10 * 60,
}

CACHE_CONFIGURATION = json.loads(os.getenv('JORMUNGANDR_CACHE_CONFIGURATION', '{}')) or default_cache
Expand Down
169 changes: 147 additions & 22 deletions source/jormungandr/jormungandr/scenarios/helper_classes/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import copy
from collections import namedtuple
from navitiacommon import response_pb2, type_pb2
from jormungandr.protobuf_to_dict import protobuf_to_dict
from jormungandr import app, cache
import itertools
import logging
from jormungandr.street_network.street_network import StreetNetworkPathType
Expand Down Expand Up @@ -59,6 +61,19 @@
"physical_mode:Metro",
)

MAP_STRING_PTOBJECT_TYPE = {
"STOP_POINT": type_pb2.STOP_POINT,
"ACCESS_POINT": type_pb2.ACCESS_POINT,
"ADDRESS": type_pb2.ADDRESS,
}


MAP_CYCLEPATHTYPE = {
"NoCycleLane": response_pb2.NoCycleLane,
"SharedCycleWay": response_pb2.SharedCycleWay,
"DedicatedCycleWay": response_pb2.DedicatedCycleWay,
"SeparatedCycleWay": response_pb2.SeparatedCycleWay,
}
# if `(physical_mode:A, physical_mode:B) in NO_ACCESS_POINTS_TRANSFER` then it means that a transfer
# where we get out of a vehicle of `physical_mode:A` and then get in a vehicle of `physical_mode:B`
# **will not** go through an access point
Expand All @@ -74,10 +89,24 @@
itertools.product(ACCESS_POINTS_PHYSICAL_MODES, NO_ACCESS_POINTS_PHYSICAL_MODES)
) | set(itertools.product(NO_ACCESS_POINTS_PHYSICAL_MODES, ACCESS_POINTS_PHYSICAL_MODES))


TransferResult = namedtuple('TransferResult', ['direct_path', 'origin', 'destination'])


class TransferPathArgs:
def __init__(self, section, prev_section_mode, next_section_mode):
self.prev_section_mode = prev_section_mode
self.next_section_mode = next_section_mode
self.section = section

def __repr__(self):
return "{origin}:{destination}:{prev_section_mode}:{next_section_mode}".format(
origin=self.section.origin.uri,
destination=self.section.destination.uri,
prev_section_mode=self.prev_section_mode,
next_section_mode=self.next_section_mode,
)


class TransferPool(object):
def __init__(
self,
Expand All @@ -94,6 +123,15 @@ def __init__(
self._transfers_future = dict()
self._logger = logging.getLogger(__name__)

def __repr__(self):
return "{name}:{language}:{publication_date}".format(
name=self._instance.name, language=self.language, publication_date=self._instance.publication_date
)

@property
def language(self):
return self._request.get('language', "en-US")

def _make_sub_request_id(self, origin_uri, destination_uri):
return "{}_transfer_{}_{}".format(self._request_id, origin_uri, destination_uri)

Expand Down Expand Up @@ -167,9 +205,16 @@ def _do_no_access_point_transfer(self, section):
def _aysnc_no_access_point_transfer(self, section):
return self._future_manager.create_future(self._do_no_access_point_transfer, section)

def _get_access_points(self, stop_point_uri, access_point_filter=lambda x: x):
sub_request_id = "{}_transfer_start_{}".format(self._request_id, stop_point_uri)
stop_points = self._instance.georef.get_stop_points_from_uri(stop_point_uri, sub_request_id, depth=2)
def _get_access_points(self, pt_object, access_point_filter=lambda x: x):
if self._request.get("_pt_planner") == "loki":
return [
type_pb2.PtObject(name=ap.name, uri=ap.uri, embedded_type=type_pb2.ACCESS_POINT, access_point=ap)
for ap in pt_object.stop_point.access_points
if access_point_filter(ap)
]

sub_request_id = "{}_transfer_start_{}".format(self._request_id, pt_object.uri)
stop_points = self._instance.georef.get_stop_points_from_uri(pt_object.uri, sub_request_id, depth=2)
if not stop_points:
return None

Expand All @@ -187,12 +232,12 @@ def get_underlying_access_points(self, section, prev_section_mode, next_section_
"""
if prev_section_mode in ACCESS_POINTS_PHYSICAL_MODES:
return self._get_access_points(
section.origin.uri, access_point_filter=lambda access_point: access_point.is_exit
section.origin, access_point_filter=lambda access_point: access_point.is_exit
)

if next_section_mode in ACCESS_POINTS_PHYSICAL_MODES:
return self._get_access_points(
section.destination.uri, access_point_filter=lambda access_point: access_point.is_entrance
section.destination, access_point_filter=lambda access_point: access_point.is_entrance
)

return None
Expand Down Expand Up @@ -241,6 +286,7 @@ def determinate_the_best_access_point(routing_matrix, access_points):
return best_access_point

def _get_transfer_result(self, section, origin, destination):

sub_request_id = self._make_sub_request_id(origin.uri, destination.uri)
direct_path_type = StreetNetworkPathType.DIRECT
extremity = PeriodExtremity(section.end_date_time, False)
Expand All @@ -255,36 +301,102 @@ def _get_transfer_result(self, section, origin, destination):
sub_request_id,
)
if direct_path and direct_path.journeys:
return TransferResult(direct_path, origin, destination)
return None

def _do_access_point_transfer(self, section, prev_section_mode, next_section_mode):
access_points = self.get_underlying_access_points(section, prev_section_mode, next_section_mode)
return (
protobuf_to_dict(direct_path, use_enum_labels=True),
protobuf_to_dict(origin, use_enum_labels=True),
protobuf_to_dict(destination, use_enum_labels=True),
)
return None, None, None

def pb_object_from_json(self, json_object):
pb_object = type_pb2.PtObject()
str_embedded_type = json_object.get("embedded_type")
pb_embedded_type = MAP_STRING_PTOBJECT_TYPE.get(str_embedded_type)
str_embedded_type = str_embedded_type.lower()
pb_object.embedded_type = pb_embedded_type
for attribute in ["uri", "name"]:
setattr(pb_object, attribute, json_object.get(attribute))
pb_attr = getattr(pb_object, str_embedded_type)
json_data = json_object.get(str_embedded_type, {})
for attribute in ["uri", "name", "label"]:
if attribute in json_data:
setattr(pb_attr, attribute, json_data.get(attribute))
pb_attr.coord.lon = json_data.get("coord", {}).get("lon")
pb_attr.coord.lat = json_data.get("coord", {}).get("lat")
if pb_embedded_type == type_pb2.ACCESS_POINT:
for attribute in ["is_entrance", "is_exit", "pathway_mode", "length", "traversal_time", "stop_code"]:
setattr(pb_attr, attribute, json_data.get(attribute))
pb_attr.embedded_type = type_pb2.pt_access_point
return pb_object

def pb_transfer_path(self, json_transfer_path):
resp = response_pb2.Response()
resp.status_code = 200
resp.response_type = response_pb2.ITINERARY_FOUND
for json_journey in json_transfer_path.get("journeys", []):
journey = resp.journeys.add()
for json_section in json_journey.get("sections", []):
section = journey.sections.add()
section.type = response_pb2.STREET_NETWORK
section.street_network.mode = response_pb2.Walking
street_network = json_section.get("street_network", {})
for item in street_network.get("path_items", []):
path_item = section.street_network.path_items.add()
for attribute in ["name", "length", "direction", "duration", "instruction"]:
if attribute in item:
setattr(path_item, attribute, item.get(attribute))
instruction_start_coordinate = item.get("instruction_start_coordinate")
if instruction_start_coordinate:
path_item.instruction_start_coordinate.lon = instruction_start_coordinate.get("lon")
path_item.instruction_start_coordinate.lat = instruction_start_coordinate.get("lat")
if "cycle_path_type" in item:
path_item.cycle_path_type = MAP_CYCLEPATHTYPE.get(item["cycle_path_type"])

for coord in street_network.get("coordinates", []):
section.street_network.coordinates.add(lon=coord.get("lon"), lat=coord.get("lat"))
for st_info in street_network.get("street_information", []):
pb_street_information = section.street_network.street_information.add()
pb_street_information.geojson_offset = st_info.get("geojson_offset")
pb_street_information.cycle_path_type = MAP_CYCLEPATHTYPE[st_info.get("cycle_path_type")]
pb_street_information.length = st_info.get("length")

return resp

@cache.memoize(app.config[str('CACHE_CONFIGURATION')].get(str('TIMEOUT_TRANSFER_PATH'), 10 * 60))
def get_cached_transfer_path(self, transfer_path_args):
access_points = self.get_underlying_access_points(
transfer_path_args.section,
transfer_path_args.prev_section_mode,
transfer_path_args.next_section_mode,
)
# if no access points are found for this stop point, which is supposed to have access points
# we do nothing about the transfer path
if not access_points:
return None
return None, None, None

origins, destinations = self.determinate_matrix_entry(
section, access_points, prev_section_mode, next_section_mode
transfer_path_args.section,
access_points,
transfer_path_args.prev_section_mode,
transfer_path_args.next_section_mode,
)

if len(origins) > 1 and len(destinations) > 1:
self._logger.error(
"Error occurred when computing transfer path both origin's and destination's sizes are larger than 1"
)
return None
return None, None, None

if len(origins) == 1 and len(destinations) == 1:
return self._get_transfer_result(section, origins[0], destinations[0])
return self._get_transfer_result(transfer_path_args.section, origins[0], destinations[0])

sub_request_id = "{}_transfer_matrix".format(self._request_id)
routing_matrix = self._streetnetwork_service.get_street_network_routing_matrix(
self._instance,
origins,
destinations,
FallbackModes.walking.name,
section.duration * 3,
transfer_path_args.section.duration * 3,
self._request,
sub_request_id,
)
Expand All @@ -293,16 +405,30 @@ def _do_access_point_transfer(self, section, prev_section_mode, next_section_mod
matrix.routing_status == response_pb2.unreached for matrix in routing_matrix.rows[0].routing_response
):
logging.getLogger(__name__).warning("no access points is reachable in transfer path computation")
return None
return None, None, None

# now it's time to find the best combo
# (stop_point -> access_points or access_points -> stop_point)
best_access_point = self.determinate_the_best_access_point(routing_matrix, access_points)

origin, destination = self.determinate_direct_path_entry(
section, best_access_point, prev_section_mode, next_section_mode
transfer_path_args.section,
best_access_point,
transfer_path_args.prev_section_mode,
transfer_path_args.next_section_mode,
)
return self._get_transfer_result(transfer_path_args.section, origin, destination)

def _do_access_point_transfer(self, section, prev_section_mode, next_section_mode):
path, origin, destination = self.get_cached_transfer_path(
TransferPathArgs(section, prev_section_mode, next_section_mode)
)
return self._get_transfer_result(section, origin, destination)
if not path:
return None
pb_origin = self.pb_object_from_json(origin)
pb_destination = self.pb_object_from_json(destination)
pb_path = self.pb_transfer_path(path)
return TransferResult(pb_path, pb_origin, pb_destination)

def _aysnc_access_point_transfer(self, section, prev_section_mode, next_section_mode):
return self._future_manager.create_future(
Expand Down Expand Up @@ -351,22 +477,21 @@ def wait_and_complete(self, section):

# we assume here the transfer street network has only one section, which is in walking mode
transfer_street_network = transfer_direct_path.journeys[0].sections[0].street_network
language = self._request.get('language', "en-US")

if self._is_access_point(transfer_result.origin):
prepend_path_item_with_access_point(
transfer_street_network.path_items,
section.origin.stop_point,
transfer_result.origin.access_point,
language,
self.language,
)

if self._is_access_point(transfer_result.destination):
append_path_item_with_access_point(
transfer_street_network.path_items,
section.destination.stop_point,
transfer_result.destination.access_point,
language,
self.language,
)

section.street_network.CopyFrom(transfer_street_network)

0 comments on commit 1ff0456

Please sign in to comment.