diff --git a/source/jormungandr/jormungandr/realtime_schedule/forseti_multi_stop.py b/source/jormungandr/jormungandr/realtime_schedule/forseti_multi_stop.py new file mode 100644 index 0000000000..fc3d595de8 --- /dev/null +++ b/source/jormungandr/jormungandr/realtime_schedule/forseti_multi_stop.py @@ -0,0 +1,229 @@ +# coding=utf-8 + +# Copyright (c) 2001-2022, Hove and/or its affiliates. All rights reserved. +# +# This file is part of Navitia, +# the software to build cool stuff with public transport. +# +# Hope you'll enjoy and contribute to this project, +# powered by Hove (www.hove.com). +# Help us simplify mobility and open public transport: +# a non ending quest to the responsive locomotion way of traveling! +# +# LICENCE: This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +# Stay tuned using +# twitter @navitia +# channel `#navitia` on riot https://riot.im/app/#/room/#navitia:matrix.org +# https://groups.google.com/d/forum/navitia +# www.navitia.io +from __future__ import absolute_import, print_function, division +from jormungandr.realtime_schedule.realtime_proxy import RealtimeProxy, RealtimeProxyError +from jormungandr.utils import PY3 +import logging +import pybreaker +import pytz +import requests as requests +from jormungandr import cache, app +from jormungandr.schedule import RealTimePassage +import aniso8601 +import six + +""" +In Forseti direction types are transformed as followings +"forward", "outbound": "forward" +"backward", "inbound": "backward" +""" +DIRECTION_MAPPING = { + 'forward': 'forward', + 'outbound': 'forward', + 'backward': 'backward', + 'inbound': 'backward', +} + + +class ForsetiMultiStop(RealtimeProxy): + """ + class managing calls to Forseti service providing real-time next passages + + curl example to check/test that external service is working: + curl -X GET '{server}/departures?stop_id={stop_code}' + {stop_code} is the code of type configured for a stop_point + So in practice it will look like: + curl -X GET 'http://..forseti../departures?stop_id=472' + + """ + + def __init__( + self, + id, + service_url, + object_id_tag="source", + destination_id_tag="source", + instance=None, + timeout=2, + line_id_tag="source", + **kwargs + ): + self.service_url = service_url + self.timeout = timeout # timeout in seconds + self.rt_system_id = id + self.object_id_tag = object_id_tag + self.destination_id_tag = destination_id_tag + self.instance = instance + self.line_id_tag = line_id_tag + + fail_max = kwargs.get( + 'circuit_breaker_max_fail', app.config.get(str('CIRCUIT_BREAKER_MAX_SYTRAL_FAIL'), 5) + ) + reset_timeout = kwargs.get( + 'circuit_breaker_reset_timeout', app.config.get(str('CIRCUIT_BREAKER_SYTRAL_TIMEOUT_S'), 60) + ) + self.breaker = pybreaker.CircuitBreaker(fail_max=fail_max, reset_timeout=reset_timeout) + + def __repr__(self): + """ + used as the cache key. We use the rt_system_id to share the cache between servers in production + """ + if PY3: + return self.rt_system_id + try: + return self.rt_system_id.encode('utf-8', 'backslashreplace') + except: + return self.rt_system_id + + def _make_params(self, route_point): + """ + create params list for GET request + """ + stop_id_list = route_point.fetch_all_stop_id(self.object_id_tag) + if not stop_id_list: + logging.getLogger(__name__).debug( + 'missing realtime id for {obj}: stop code={s}'.format(obj=route_point, s=stop_id_list), + extra={'rt_system_id': six.text_type(self.rt_system_id)}, + ) + self.record_internal_failure('missing id') + return None + params = [('stop_id', i) for i in stop_id_list] + + direction_type = route_point.fetch_direction_type() + if direction_type: + params.append(("direction_type", direction_type)) + return params + + def _is_valid_direction(self, direction_uri, passage_direction_uri, group_by_dest): + # If group_by_dest is False then return True + # otherwise return the comparison result + if not group_by_dest: + return True + return direction_uri == passage_direction_uri + + @cache.memoize(app.config['CACHE_CONFIGURATION'].get('TIMEOUT_SYTRAL', 30)) + def _call(self, params): + """ + http call to Forseti + """ + logging.getLogger(__name__).debug( + 'Forseti RT service , call url : {}'.format(self.service_url), + extra={'rt_system_id': six.text_type(self.rt_system_id)}, + ) + try: + return self.breaker.call(requests.get, url=self.service_url, params=params, timeout=self.timeout) + except pybreaker.CircuitBreakerError as e: + logging.getLogger(__name__).error( + 'Forseti service dead, using base schedule (error: {}'.format(e), + extra={'rt_system_id': six.text_type(self.rt_system_id)}, + ) + raise RealtimeProxyError('circuit breaker open') + except requests.Timeout as t: + logging.getLogger(__name__).error( + 'Forseti service timeout, using base schedule (error: {}'.format(t), + extra={'rt_system_id': six.text_type(self.rt_system_id)}, + ) + raise RealtimeProxyError('timeout') + except Exception as e: + logging.getLogger(__name__).exception( + 'Forseti RT error, using base schedule', + extra={'rt_system_id': six.text_type(self.rt_system_id)}, + ) + raise RealtimeProxyError(str(e)) + + def _get_dt(self, datetime_str): + dt = aniso8601.parse_datetime(datetime_str) + + utc_dt = dt.astimezone(pytz.utc) + + return utc_dt + + def _get_passages(self, route_point, resp): + logging.getLogger(__name__).debug( + 'Forseti response: {}'.format(resp), extra={'rt_system_id': six.text_type(self.rt_system_id)} + ) + + line_ids = route_point.fetch_all_line_id(self.line_id_tag) + line_uri = route_point.fetch_line_uri() + direction_type = route_point.fetch_direction_type() + + departures = resp.get('departures', []) + next_passages = [] + for next_expected_st in departures: + if next_expected_st['line'] not in line_ids: + continue + if DIRECTION_MAPPING.get(direction_type) != next_expected_st.get('direction_type'): + continue + dt = self._get_dt(next_expected_st['datetime']) + direction_name = next_expected_st.get('direction_name') + is_real_time = next_expected_st.get('type') == 'E' + direction = self._get_direction( + line_uri=line_uri, object_code=next_expected_st.get('direction'), default_value=direction_name + ) + next_passage = RealTimePassage(dt, direction.label, is_real_time, direction.uri) + next_passages.append(next_passage) + + # If next_passages is empty return 0 to display base_schedule + if len(next_passages) == 0: + return None + + return next_passages + + def _get_next_passage_for_route_point( + self, route_point, count=None, from_dt=None, current_dt=None, duration=None + ): + params = self._make_params(route_point) + if not params: + return None + r = self._call(params) + + if r.status_code != requests.codes.ok: + logging.getLogger(__name__).error( + 'Forseti service unavailable, impossible to query : {}'.format(r.url), + extra={'rt_system_id': six.text_type(self.rt_system_id)}, + ) + raise RealtimeProxyError('non 200 response') + + return self._get_passages(route_point, r.json()) + + def status(self): + return { + 'id': six.text_type(self.rt_system_id), + 'timeout': self.timeout, + 'circuit_breaker': { + 'current_state': self.breaker.current_state, + 'fail_counter': self.breaker.fail_counter, + 'reset_timeout': self.breaker.reset_timeout, + }, + } + + def __eq__(self, other): + return self.rt_system_id == other.rt_system_id diff --git a/source/jormungandr/jormungandr/realtime_schedule/realtime_proxy.py b/source/jormungandr/jormungandr/realtime_schedule/realtime_proxy.py index 1288025919..7030ee68aa 100644 --- a/source/jormungandr/jormungandr/realtime_schedule/realtime_proxy.py +++ b/source/jormungandr/jormungandr/realtime_schedule/realtime_proxy.py @@ -190,7 +190,9 @@ def _get_first_datetime(self, stop_schedule): def _update_stop_schedule(self, request, stop_schedule, next_realtime_passages, group_by_dest=False): """ - Update the stopschedule response with the new realtime passages + Update the response for /stop_schedules, /terminus_schedules with the new realtime passages + group_by_dest = False for /stop_schedules + group_by_dest = True for /terminus_schedules By default, all base schedule data is removed and replaced with realtime data. Each proxy can define its own way to merge passages. @@ -232,6 +234,9 @@ def _filter_base_passage(self, passage, route_point): return RoutePoint(passage.route, passage.stop_point) == route_point def _update_passages(self, passages, route_point, template, next_realtime_passages): + """ + Update the /departures response with the new realtime passages + """ if next_realtime_passages is None: return diff --git a/source/jormungandr/jormungandr/realtime_schedule/tests/forseti_tram_test.py b/source/jormungandr/jormungandr/realtime_schedule/tests/forseti_tram_test.py new file mode 100644 index 0000000000..240df46e38 --- /dev/null +++ b/source/jormungandr/jormungandr/realtime_schedule/tests/forseti_tram_test.py @@ -0,0 +1,269 @@ +# coding=utf-8 +# Copyright (c) 2001-2022, Hove and/or its affiliates. All rights reserved. +# +# This file is part of Navitia, +# the software to build cool stuff with public transport. +# +# Hope you'll enjoy and contribute to this project, +# powered by Hove (www.hove.com). +# Help us simplify mobility and open public transport: +# a non ending quest to the responsive locomotion way of traveling! +# +# LICENCE: This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +# Stay tuned using +# twitter @navitia +# channel `#navitia` on riot https://riot.im/app/#/room/#navitia:matrix.org +# https://groups.google.com/d/forum/navitia +# www.navitia.io +from __future__ import absolute_import, print_function, division +import mock +from jormungandr.realtime_schedule.forseti_multi_stop import ForsetiMultiStop +from jormungandr.realtime_schedule.realtime_proxy import Direction +from jormungandr.tests.utils_test import MockRequests +import datetime +import pytz +import pytest +from jormungandr import ptref + + +class MockInstance: + def __init__(self): + self.ptref = ptref.PtRef(self) + + +def verify_attributes_in_connector_test(): + """ + Verify all attributes of connector + """ + forseti = ForsetiMultiStop(id='my_tram_rt', service_url='http://bob.com/', instance=MockInstance()) + assert forseti.rt_system_id == "my_tram_rt" + assert forseti.object_id_tag == "source" + assert forseti.destination_id_tag == "source" + assert forseti.line_id_tag == "source" + assert forseti.service_url == "http://bob.com/" + assert forseti.timeout == 2 + + forseti = ForsetiMultiStop( + id='my_tram_rt', + service_url='http://bob.com/', + object_id_tag='netex_monomodal_stopplace', + instance=MockInstance(), + ) + assert forseti.object_id_tag == "netex_monomodal_stopplace" + assert forseti.destination_id_tag == "source" + assert forseti.line_id_tag == "source" + + forseti = ForsetiMultiStop( + id='my_tram_rt', + service_url='http://bob.com/', + object_id_tag='netex_monomodal_stopplace', + destination_id_tag='netex_monomodal_stopplace', + instance=MockInstance(), + ) + assert forseti.object_id_tag == "netex_monomodal_stopplace" + assert forseti.destination_id_tag == "netex_monomodal_stopplace" + assert forseti.line_id_tag == "source" + + forseti = ForsetiMultiStop( + id='my_tram_rt', + service_url='http://bob.com/', + object_id_tag='netex_monomodal_stopplace', + destination_id_tag='netex_monomodal_stopplace', + line_id_tag='tag_pdl', + instance=MockInstance(), + ) + assert forseti.object_id_tag == "netex_monomodal_stopplace" + assert forseti.destination_id_tag == "netex_monomodal_stopplace" + assert forseti.line_id_tag == "tag_pdl" + + +def make_url_params_with_invalid_code_test(): + """ + test make_url when RoutePoint does not have a mandatory code + + we should not get any url + """ + forseti = ForsetiMultiStop(id='my_tram_rt', service_url='http://bob.com/', instance=MockInstance()) + params = forseti._make_params(MockRoutePoint(line_code='line_toto', stop_id=[])) + assert params is None + + +def make_url_params_test(): + forseti = ForsetiMultiStop(id='my_tram_rt', service_url='http://bob.com/', instance=MockInstance()) + + # The return is like [(stop_id, val1), (stop_id, val2), (direction_type, val), ...] + params = forseti._make_params(MockRoutePoint(line_code='line_1', stop_id='stop_1')) + assert params == [('stop_id', 'stop_1')] + + params = forseti._make_params(MockRoutePoint(line_code='line_1', stop_id=['stop_1', 'stop_2'])) + assert params == [('stop_id', 'stop_1'), ('stop_id', 'stop_2')] + + params = forseti._make_params( + MockRoutePoint(line_code='line_1', stop_id='stop_1', direction_type='forward') + ) + assert params == [('stop_id', 'stop_1'), ('direction_type', 'forward')] + + +class MockRoutePoint(object): + def __init__(self, *args, **kwargs): + l = kwargs['line_code'] + if isinstance(l, list): + self._hardcoded_line_ids = l + else: + self._hardcoded_line_ids = [l] + + l = kwargs['stop_id'] + if isinstance(l, list): + self._hardcoded_stop_ids = l + else: + self._hardcoded_stop_ids = [l] + + if 'direction_type' in kwargs: + self._hardcoded_direction_type = kwargs['direction_type'] + else: + self._hardcoded_direction_type = None + + def fetch_all_stop_id(self, object_id_tag): + return self._hardcoded_stop_ids + + def fetch_all_line_id(self, object_id_tag): + return self._hardcoded_line_ids + + def fetch_direction_type(self): + return self._hardcoded_direction_type + + def fetch_line_uri(self): + return "line:PDL:NM:Line:1:LOC" + + +class MockResponse(object): + def __init__(self, data, status_code, url, *args, **kwargs): + self.data = data + self.status_code = status_code + self.url = url + + def json(self): + return self.data + + +@pytest.fixture(scope="module") +def mock_multiline_response(): + return { + "departures": [ + { + "line": "NM:Line:1:LOC", + "stop": "MOBIITI:StopPlace:18977", + "type": "E", + "direction": "MOBIITI:StopPlace:18638", + "direction_name": "François Mitterrand", + "datetime": "2025-02-26T15:50:48+01:00", + "direction_type": "forward" + }, + { + "line": "NM:Line:1:LOC", + "stop": "MOBIITI:StopPlace:18977", + "type": "E", + "direction": "MOBIITI:StopPlace:19578", + "direction_name": "Jamet", + "datetime": "2025-02-26T15:54:00+01:00", + "direction_type": "forward" + }, + { + "line": "NM:Line:2:LOC", + "stop": "MOBIITI:StopPlace:18977", + "type": "E", + "direction": "MOBIITI:StopPlace:18590", + "direction_name": "Grand Val", + "datetime": "2025-02-26T15:54:12+01:00", + "direction_type": "backward" + }, + { + "line": "NM:Line:1:LOC", + "stop": "MOBIITI:StopPlace:18977", + "type": "E", + "direction": "MOBIITI:StopPlace:18920", + "direction_name": "Ranzay", + "datetime": "2025-02-26T16:02:06+01:00", + "direction_type": "backward" + }, + { + "line": "NM:Line:2:LOC", + "stop": "MOBIITI:StopPlace:18977", + "type": "E", + "direction": "MOBIITI:StopPlace:18590", + "direction_name": "Grand Val", + "datetime": "2025-02-26T16:05:00+01:00", + "direction_type": "forward" + }, + { + "line": "NM:Line:1:LOC", + "stop": "MOBIITI:StopPlace:18977", + "type": "E", + "direction": "MOBIITI:StopPlace:18638", + "direction_name": "François Mitterrand", + "datetime": "2025-02-26T16:15:06+01:00", + "direction_type": "forward" + } + ] + } + + +def next_passage_for_route_point_test(mock_multiline_response): + """ + test the whole next_passage_for_route_point + mock the http call to return a good response, we should get some next_passages + we match also direction_type = "forward" in the connector (forward = outbound / backward = inbound) + The connector keeps only date_times with line = NM:Line:1:LOC and direction_type = forward + """ + forseti = ForsetiMultiStop(id='my_tram_rt', service_url='http://bob.com/', instance=MockInstance()) + + mock_requests = MockRequests({'http://bob.com/?direction_type=forward&stop_id=MOBIITI%3AStopPlace%3A18977': (mock_multiline_response, 200)}) + + route_point = MockRoutePoint(line_code='NM:Line:1:LOC', stop_id='MOBIITI:StopPlace:18977', direction_type='forward') + + with mock.patch('requests.get', mock_requests.get): + with mock.patch( + 'jormungandr.realtime_schedule.forseti_multi_stop.ForsetiMultiStop._get_direction', + lambda ForsetiMultiStop, **kwargs: Direction("stop_area:PDL:MOBIITI:StopPlace:8730", "François Mitterrand (Saint-Herblain)"), + ): + passages = forseti.next_passage_for_route_point(route_point) + + assert len(passages) == 3 + + assert passages[0].datetime == datetime.datetime(2025, 2, 26, 14, 50, 48, tzinfo=pytz.UTC) + assert passages[0].is_real_time is True + assert passages[0].direction == "François Mitterrand (Saint-Herblain)" + assert passages[0].direction_uri == "stop_area:PDL:MOBIITI:StopPlace:8730" + + assert passages[1].datetime == datetime.datetime(2025, 2, 26, 14, 54, tzinfo=pytz.UTC) + assert passages[1].is_real_time is True + assert passages[1].direction == "François Mitterrand (Saint-Herblain)" + assert passages[1].direction_uri == "stop_area:PDL:MOBIITI:StopPlace:8730" + + assert passages[2].datetime == datetime.datetime(2025, 2, 26, 15, 15, 6, tzinfo=pytz.UTC) + assert passages[2].is_real_time is True + assert passages[2].direction == "François Mitterrand (Saint-Herblain)" + assert passages[2].direction_uri == "stop_area:PDL:MOBIITI:StopPlace:8730" + + +def status_test(): + forseti = ForsetiMultiStop( + id='my_tram_rt', + service_url='http://bob.com/', + service_args={'a': 'test', 'b': '12'}, + instance=MockInstance(), + ) + status = forseti.status() + assert status['id'] == "my_tram_rt"