diff --git a/doc/devlog/2024-02-06-adrio-demo.ipynb b/doc/devlog/2024-02-06-adrio-demo.ipynb new file mode 100644 index 00000000..1a4077b6 --- /dev/null +++ b/doc/devlog/2024-02-06-adrio-demo.ipynb @@ -0,0 +1,213 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "from epymorph.geo.spec import DynamicGeoSpec, AttribDef, CentroidDType, Year\n", + "from epymorph.geo.adrio.census.adrio_census import CensusGeography, Granularity\n", + "from epymorph.data_shape import Shapes\n", + "from pathlib import Path\n", + "import numpy as np\n", + "\n", + "maricopa = DynamicGeoSpec(\n", + " attributes=[\n", + " AttribDef('label', np.str_, Shapes.N),\n", + " AttribDef('geoid', np.str_, Shapes.N),\n", + " AttribDef('centroid', CentroidDType, Shapes.N),\n", + " AttribDef('median_income', np.int64, Shapes.N),\n", + " AttribDef('tract_median_income', np.int64, Shapes.N),\n", + " AttribDef('population', np.int64, Shapes.N),\n", + " AttribDef('population_by_age', np.int64, Shapes.NxA(3)),\n", + " AttribDef('population_by_age_x6', np.int64, Shapes.NxA(6)),\n", + " AttribDef('pop_density_km2', np.float64, Shapes.N),\n", + " AttribDef('median_age', np.int64, Shapes.N),\n", + " AttribDef('average_household_size', np.int64, Shapes.N),\n", + " AttribDef('gini_index', np.float64, Shapes.N)\n", + " ],\n", + " geography=CensusGeography(granularity=Granularity.CBG, filter={\n", + " 'state': ['04'],\n", + " 'county': ['013'],\n", + " 'tract': ['*'],\n", + " 'block group': ['*']\n", + " }),\n", + " time_period=Year(2019),\n", + " source={\n", + " 'label': 'Census:name',\n", + " 'population': 'Census',\n", + " 'geoid': 'Census',\n", + " 'centroid': 'Census',\n", + " 'median_income': 'Census',\n", + " 'tract_median_income': 'Census',\n", + " 'population_by_age': 'Census', \n", + " 'population_by_age_x6': 'Census',\n", + " 'pop_density_km2': 'Census',\n", + " 'median_age': 'Census', \n", + " 'average_household_size': 'Census',\n", + " 'gini_index': 'Census'\n", + " }\n", + " )\n", + "\n", + "json = maricopa.serialize()\n", + "with open(Path('./scratch/geo/maricopa_cbg_2019.geo'), mode='w', encoding='utf-8') as f:\n", + " f.write(json)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "from epymorph.geo.dynamic import DynamicGeoFileOps\n", + "from epymorph.geo.adrio import adrio_maker_library\n", + "\n", + "geo = DynamicGeoFileOps.load_from_spec(Path('./scratch/geo/maricopa_cbg_2019.geo'), adrio_maker_library)" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Fetching dynamic geo data\n", + "• 12 attributes\n", + "Fetching name...[1/12]\n", + "Fetching geoid...[2/12]\n", + "Fetching centroid...[3/12]\n", + "Fetching median_income...[4/12]\n", + "Fetching tract_median_income...[5/12]\n", + "Fetching population...[6/12]\n", + "Fetching population_by_age...[7/12]\n", + "Fetching population_by_age_x6...[8/12]\n", + "Fetching pop_density_km2...[9/12]\n", + "Fetching median_age...[10/12]\n", + "Fetching average_household_size...[11/12]\n", + "Fetching gini_index...[12/12]\n", + "Gini Index cannot be retrieved for block group level, fetching tract level data instead.\n", + "Complete.\n", + "Total fetch time: 27.819s\n" + ] + } + ], + "source": [ + "from epymorph.logging.messaging import dynamic_geo_messaging\n", + "\n", + "with dynamic_geo_messaging(geo):\n", + " geo.fetch_all()" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Loading requirements:\n", + "[✓] IPM (sirs)\n", + "[✓] MM (centroids)\n", + "[✓] GEO (us_sw_counties_2015)\n", + "Running simulation (StandardSimulation):\n", + "• 2010-01-01 to 2010-05-31 (150 days)\n", + "• 158 geo nodes\n", + "|####################| 100% \n", + "Runtime: 12.335s\n", + "Done\n" + ] + }, + { + "data": { + "text/plain": [ + "0" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from epymorph.cli.run import run\n", + "\n", + "run(input_path='./scratch/adrio_test.toml',\n", + " out_path=None,\n", + " chart=None,\n", + " profiling=False,\n", + " ignore_cache=False,\n", + " geo_messaging=True\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Fetching dynamic geo data\n", + "• 8 attributes\n", + "Fetching name...[1/8]\n", + "Fetching population...[2/8]\n", + "Fetching population_by_age...[3/8]\n", + "Fetching centroid...[4/8]\n", + "Fetching geoid...[5/8]\n", + "Fetching dissimilarity_index...[6/8]\n", + "Fetching median_income...[7/8]\n", + "Fetching pop_density_km2...[8/8]\n", + "Complete.\n", + "Total fetch time: 22.568s\n", + "geo sucessfully cached.\n" + ] + }, + { + "data": { + "text/plain": [ + "0" + ] + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from epymorph.cli.cache import fetch\n", + "\n", + "fetch('us_sw_counties_2015', True)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.7" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/doc/devlog/README.md b/doc/devlog/README.md index c76bd88a..c25dfd7f 100644 --- a/doc/devlog/README.md +++ b/doc/devlog/README.md @@ -41,6 +41,7 @@ This folder is a handy place to put Jupyter notebooks or other documents which h | 2023-11-22-ipm-probs.ipynb | Tyler | | Analyzing statistical correctness of our IPM processing algorithms. | | 2023-12-05.ipynb | Tyler | | A brief tour of changes to epymorph due to the refactor effort. | | 2024-01-08.ipynb | Tyler | | Another functional parameters demonstration, revisiting the Bonus Example from 2023-10-10. | +| 2024-02-06-adrio-demo.ipynb | Trevor | | Demonstrates the ADRIO system using code updated for latest changes. | | 2024-02-06.ipynb | Tyler | | Revisiting age-class IPMs, and thinking about modularity of approach. | | 2024-02-12.ipynb | Tyler | | Continued age-class IPM work, this time in more than one geo node. | diff --git a/epymorph/__init__.py b/epymorph/__init__.py index 2129ac06..d9b26f72 100644 --- a/epymorph/__init__.py +++ b/epymorph/__init__.py @@ -4,9 +4,10 @@ from epymorph.data import geo_library, ipm_library, mm_library from epymorph.data_shape import Shapes from epymorph.engine.standard_sim import StandardSimulation +from epymorph.logging.messaging import sim_messaging from epymorph.plots import plot_event, plot_pop from epymorph.proxy import dim, geo -from epymorph.simulation import SimDType, TimeFrame, default_rng, sim_messaging +from epymorph.simulation import SimDType, TimeFrame, default_rng __all__ = [ 'IPM', diff --git a/epymorph/cli/cache.py b/epymorph/cli/cache.py index 3e1c2d60..f68348fe 100644 --- a/epymorph/cli/cache.py +++ b/epymorph/cli/cache.py @@ -31,6 +31,10 @@ def define_argparser(command_parser: _SubParsersAction): 'geo', type=str, help='the name of the geo to fetch; must include a geo path if not already in the library') + fetch_command.add_argument( + '-p', '--path', + help='(optional) the path to a geo spec file not in the library' + ) fetch_command.add_argument( '-f', '--force', action='store_true', diff --git a/epymorph/cli/run.py b/epymorph/cli/run.py index e3e91ef7..10835526 100644 --- a/epymorph/cli/run.py +++ b/epymorph/cli/run.py @@ -24,9 +24,9 @@ from epymorph.geo.geo import Geo from epymorph.geo.static import StaticGeoFileOps from epymorph.initializer import initializer_library, normalize_init_params +from epymorph.logging.messaging import sim_messaging from epymorph.movement.parser import MovementSpec, parse_movement_spec -from epymorph.simulation import (TimeFrame, default_rng, enable_logging, - sim_messaging) +from epymorph.simulation import TimeFrame, default_rng, enable_logging def define_argparser(command_parser: _SubParsersAction): @@ -52,14 +52,19 @@ def define_argparser(command_parser: _SubParsersAction): p.add_argument( '-i', '--ignore_cache', action='store_true', - help='(optional) include this flag to run the simulation without utilizing the Geo cache.' - ) + help='(optional) include this flag to run the simulation without utilizing the Geo cache.') + p.add_argument( + '-m', '--mute_geo', + action='store_false', + help='(optional) include this flag to silence geo data retreival messaging.') + p.set_defaults(handler=lambda args: run( input_path=args.input, out_path=args.out, chart=args.chart, profiling=args.profile, - ignore_cache=args.ignore_cache + ignore_cache=args.ignore_cache, + geo_messaging=args.mute_geo )) @@ -79,7 +84,8 @@ def run(input_path: str, out_path: str | None, chart: str | None, profiling: bool, - ignore_cache: bool) -> int: + ignore_cache: bool, + geo_messaging: bool) -> int: """CLI command handler: run a simulation.""" # Exit codes: @@ -150,7 +156,9 @@ def run(input_path: str, if not profiling: enable_logging() - with sim_messaging(sim): + # Run simulation with appropriate messaging contexts + + with sim_messaging(sim, geo_messaging): out = sim.run() # Draw charts (if specified). diff --git a/epymorph/engine/standard_sim.py b/epymorph/engine/standard_sim.py index a347a06e..9927b4d8 100644 --- a/epymorph/engine/standard_sim.py +++ b/epymorph/engine/standard_sim.py @@ -93,6 +93,7 @@ class StandardSimulation(SimulationEvents): _config: RumeConfig _params: ContextParams | None = None + geo: Geo on_tick: Event[SimTick] # this class supports on_tick; so narrow the type def def __init__(self, @@ -103,6 +104,7 @@ def __init__(self, time_frame: TimeFrame, initializer: Initializer | None = None, rng: Callable[[], np.random.Generator] | None = None): + self.geo = geo if initializer is None: initializer = DEFAULT_INITIALIZER if rng is None: diff --git a/epymorph/geo/adrio/census/adrio_census.py b/epymorph/geo/adrio/census/adrio_census.py index 82dce842..0a276f9f 100644 --- a/epymorph/geo/adrio/census/adrio_census.py +++ b/epymorph/geo/adrio/census/adrio_census.py @@ -7,7 +7,7 @@ from census import Census from geopandas import GeoDataFrame from numpy.typing import NDArray -from pandas import DataFrame, concat, read_excel +from pandas import DataFrame, read_excel from pygris import block_groups, counties, states, tracts from epymorph.data_shape import Shapes @@ -370,10 +370,6 @@ def fetch_commuters(self, granularity: int, nodes: dict[str, list[str]], year: i data = data.loc[data['wrk_state_code'] < '057'] data = data.loc[data['wrk_state_code'] != '011'] - # filter out non-county locations - data = data.loc[data['res_county_code'] < '508'] - data = data.loc[data['wrk_county_code'] < '508'] - if granularity == Granularity.COUNTY.value: if counties is not None and counties[0] != '*': data = data.loc[data['res_county_code'].isin(counties)] diff --git a/epymorph/geo/cache.py b/epymorph/geo/cache.py index 1f707dd0..301c32d8 100644 --- a/epymorph/geo/cache.py +++ b/epymorph/geo/cache.py @@ -11,6 +11,7 @@ from epymorph.geo.static import StaticGeo from epymorph.geo.static import StaticGeoFileOps as F from epymorph.geo.util import convert_to_static_geo +from epymorph.logging.messaging import dynamic_geo_messaging CACHE_PATH = user_cache_path(appname='epymorph', ensure_exists=True) @@ -31,7 +32,8 @@ def fetch(geo_name_or_path: str) -> None: geo_load = geo_library_dynamic.get(geo_name_or_path) if geo_load is not None: geo = geo_load() - static_geo = convert_to_static_geo(geo) + with dynamic_geo_messaging(geo): + static_geo = convert_to_static_geo(geo) static_geo.save(file_path) # checks for geo spec at given path (path passed) @@ -41,7 +43,8 @@ def fetch(geo_name_or_path: str) -> None: geo_name = geo_path.stem file_path = CACHE_PATH / F.to_archive_filename(geo_name) geo = DF.load_from_spec(geo_path, adrio_maker_library) - static_geo = convert_to_static_geo(geo) + with dynamic_geo_messaging(geo): + static_geo = convert_to_static_geo(geo) static_geo.save(file_path) else: raise GeoCacheException(f'spec file at {geo_name_or_path} not found.') diff --git a/epymorph/geo/dynamic.py b/epymorph/geo/dynamic.py index dd7fed0a..2bfe6ec5 100644 --- a/epymorph/geo/dynamic.py +++ b/epymorph/geo/dynamic.py @@ -15,7 +15,8 @@ from epymorph.geo.geo import Geo from epymorph.geo.spec import (LABEL, AttribDef, DynamicGeoSpec, validate_geo_values) -from epymorph.util import MemoDict +from epymorph.simulation import AdrioStart, DynamicGeoEvents, FetchStart +from epymorph.util import Event, MemoDict def _memoized_adrio_maker_library(lib: ADRIOMakerLibrary) -> MemoDict[str, ADRIOMaker]: @@ -32,7 +33,7 @@ def load_maker(name: str) -> ADRIOMaker: return MemoDict[str, ADRIOMaker](load_maker) -class DynamicGeo(Geo[DynamicGeoSpec]): +class DynamicGeo(Geo[DynamicGeoSpec], DynamicGeoEvents): """A Geo implementation which uses ADRIOs to dynamically fetch data from third-party data sources.""" @classmethod @@ -75,9 +76,16 @@ def __init__(self, spec: DynamicGeoSpec, adrios: dict[str, ADRIO]): labels = self._adrios[LABEL.name].get_value() super().__init__(spec, len(labels)) + # events + self.fetch_start = Event() + self.adrio_start = Event() + self.fetch_end = Event() + def __getitem__(self, name: str) -> NDArray: if name not in self._adrios: raise AttributeException(f"Attribute not found in geo: '{name}'") + if self._adrios[name]._cached_value is None: + self.adrio_start.publish(AdrioStart(name, None, None)) return self._adrios[name].get_value() @property @@ -109,17 +117,19 @@ def fetch(key: str, adrio: ADRIO) -> tuple[str, NDArray]: def fetch_all(self) -> None: """Retrieves all Geo attributes from geospec object using ADRIOs""" - print('Fetching GEO data from ADRIOs...') + num_adrios = len(self._adrios) + self.fetch_start.publish(FetchStart(num_adrios)) - def fetch_attribute(adrio: ADRIO) -> NDArray: - print(f'Fetching {adrio.attrib}') + def fetch_attribute(adrio: ADRIO, index: int) -> NDArray: + self.adrio_start.publish(AdrioStart(adrio.attrib, index, num_adrios)) return adrio.get_value() # initialize threads with ThreadPoolExecutor(max_workers=5) as executor: - for adrio in self._adrios.values(): - executor.submit(fetch_attribute, adrio) - print('...done') + for index, adrio in enumerate(self._adrios.values()): + executor.submit(fetch_attribute, adrio, index) + + self.fetch_end.publish(None) class DynamicGeoFileOps: diff --git a/epymorph/geo/util.py b/epymorph/geo/util.py index 0417626d..1fed5448 100644 --- a/epymorph/geo/util.py +++ b/epymorph/geo/util.py @@ -12,6 +12,7 @@ def convert_to_static_geo(geo: DynamicGeo) -> StaticGeo: attributes=geo.spec.attributes, time_period=geo.spec.time_period, ) + geo.fetch_all() values = { attr.name: geo[attr.name] for attr in geo.spec.attributes diff --git a/epymorph/logging/messaging.py b/epymorph/logging/messaging.py new file mode 100644 index 00000000..70e65965 --- /dev/null +++ b/epymorph/logging/messaging.py @@ -0,0 +1,111 @@ +from contextlib import contextmanager +from time import perf_counter +from typing import Generator + +from epymorph.simulation import (AdrioStart, DynamicGeoEvents, FetchStart, + OnStart, SimTick, SimulationEvents) +from epymorph.util import progress, subscriptions + + +@contextmanager +def sim_messaging(sim: SimulationEvents, geo_messaging=False) -> Generator[None, None, None]: + """ + Attach fancy console messaging to a Simulation, e.g., a progress bar. + This creates subscriptions on `sim`'s events, so you only need to do it once + per sim. Returns `sim` as a convenience. + If `geo_messaging` is true, provide verbose messaging about geo operations + (if applicable, e.g., when fetching external data). + """ + + start_time: float | None = None + use_progress_bar = sim.on_tick is not None + + # If geo_messaging is true, the user has requested verbose messaging re: geo operations. + # However we don't want to make a strong assertion that a sim has a geo, nor what type that geo is. + # So we'll do this dynamically! + # - if we have a geo, and + # - if it's an instance of DynamicGeoEvents, and + # - if the user has enabled geo messaging, then and only then will we subscribe to its adrio_start event + sim_geo = None + if hasattr(sim, 'geo'): + sim_geo = getattr(sim, 'geo') + + if geo_messaging and isinstance(sim_geo, DynamicGeoEvents): + print("Geo not loaded from cache; attributes will be lazily loaded during simulation run.") + + def on_start(ctx: OnStart) -> None: + start_date = ctx.time_frame.start_date + duration_days = ctx.time_frame.duration_days + end_date = ctx.time_frame.end_date + + print(f"Running simulation ({sim.__class__.__name__}):") + print(f"• {start_date} to {end_date} ({duration_days} days)") + print(f"• {ctx.dim.nodes} geo nodes") + if use_progress_bar: + print(progress(0.0), end='\r') + else: + print('Running...') + + nonlocal start_time + start_time = perf_counter() + + def on_tick(tick: SimTick) -> None: + print(progress(tick.percent_complete), end='\r') + + def adrio_start(adrio: AdrioStart) -> None: + print(f"Uncached geo attribute requested: {adrio.attribute}. Retreiving now...") + + def on_end(_: None) -> None: + end_time = perf_counter() + if use_progress_bar: + print(progress(1.0)) + else: + print('Complete.') + if start_time is not None: + print(f"Runtime: {(end_time - start_time):.3f}s") + + # Set up a subscriptions context, subscribe our handlers, + # then yield to the outer context (ostensibly where the sim will be run). + with subscriptions() as subs: + subs.subscribe(sim.on_start, on_start) + if sim.on_tick is not None: + subs.subscribe(sim.on_tick, on_tick) + if geo_messaging and isinstance(sim_geo, DynamicGeoEvents): + subs.subscribe(sim_geo.adrio_start, adrio_start) + subs.subscribe(sim.on_end, on_end) + yield # to outer context + + +@contextmanager +def dynamic_geo_messaging(dyn: DynamicGeoEvents) -> Generator[None, None, None]: + """ + Attach progress messaging to a DynamicGeo for verbose printing of data retreival progress. + Creates subscriptions on the Geo's events. + """ + + start_time: float | None = None + + def fetch_start(event: FetchStart) -> None: + print("Fetching dynamic geo data") + print(f"• {event.adrio_len} attributes") + + nonlocal start_time + start_time = perf_counter() + + def adrio_start(event: AdrioStart) -> None: + msg = f"Fetching {event.attribute}..." + if event.index is not None and event.adrio_len is not None: + msg = f"{msg} [{event.index + 1}/{event.adrio_len}]" + print(msg) + + def fetch_end(_: None) -> None: + print("Complete.") + end_time = perf_counter() + if start_time is not None: + print(f"Total fetch time: {(end_time - start_time):.3f}s") + + with subscriptions() as subs: + subs.subscribe(dyn.fetch_start, fetch_start) + subs.subscribe(dyn.adrio_start, adrio_start) + subs.subscribe(dyn.fetch_end, fetch_end) + yield # to outer context diff --git a/epymorph/simulation.py b/epymorph/simulation.py index 7d5b37e4..9f6dac26 100644 --- a/epymorph/simulation.py +++ b/epymorph/simulation.py @@ -1,20 +1,17 @@ """General simulation data types, events, and utility functions.""" import logging -from contextlib import contextmanager from dataclasses import dataclass from datetime import date, timedelta from functools import partial from importlib import reload -from time import perf_counter -from typing import (Any, Callable, Generator, NamedTuple, Protocol, Self, - Sequence, runtime_checkable) +from typing import (Any, Callable, NamedTuple, Protocol, Self, Sequence, + runtime_checkable) import numpy as np from numpy.random import SeedSequence from epymorph.code import ImmutableNamespace, base_namespace -from epymorph.util import (Event, pairwise_haversine, progress, row_normalize, - subscriptions) +from epymorph.util import Event, pairwise_haversine, row_normalize SimDType = np.int64 """ @@ -131,9 +128,27 @@ class SimTick(NamedTuple): percent_complete: float +class FetchStart(NamedTuple): + """The payload of a DynamicGeo fetch_start event.""" + adrio_len: int + + +class AdrioStart(NamedTuple): + """The payload of a DynamicGeo adrio_start event.""" + attribute: str + index: int | None + """An index assigned to this ADRIO if fetching ADRIOs as a batch.""" + adrio_len: int | None + """The total number of ADRIOs being fetched if fetching ADRIOs as a batch.""" + + @runtime_checkable class SimulationEvents(Protocol): - """Protocol for Simulations that support lifecycle events.""" + """ + Protocol for Simulations that support lifecycle events. + For correct operation, ensure that `on_start` is fired first, + then `on_tick` at least once, then finally `on_end`. + """ on_start: Event[OnStart] """ @@ -153,52 +168,28 @@ class SimulationEvents(Protocol): """ -@contextmanager -def sim_messaging(sim: SimulationEvents) -> Generator[None, None, None]: +@runtime_checkable +class DynamicGeoEvents(Protocol): """ - Attach fancy console messaging to a Simulation, e.g., a progress bar. - This creates subscriptions on `sim`'s events, so you only need to do it once - per sim. Returns `sim` as a convenience. + Protocol for DynamicGeos that support lifecycle events. + For correct operation, ensure that `fetch_start` is fired first, + then `adrio_start` any number of times, then finally `fetch_end`. """ - start_time = 0.0 - use_progress_bar = sim.on_tick is not None - - def on_start(ctx: OnStart) -> None: - start_date = ctx.time_frame.start_date - duration_days = ctx.time_frame.duration_days - end_date = ctx.time_frame.end_date - - print(f"Running simulation ({sim.__class__.__name__}):") - print(f"• {start_date} to {end_date} ({duration_days} days)") - print(f"• {ctx.dim.nodes} geo nodes") - if use_progress_bar: - print(progress(0.0), end='\r') - else: - print('Running...') - - nonlocal start_time - start_time = perf_counter() - - def on_tick(tick: SimTick) -> None: - print(progress(tick.percent_complete), end='\r') - - def on_end(_: None) -> None: - end_time = perf_counter() - if use_progress_bar: - print(progress(1.0)) - else: - print('Complete.') - print(f"Runtime: {(end_time - start_time):.3f}s") - - # Set up a subscriptions context, subscribe our handlers, - # then yield to the outer context (ostensibly where the sim will be run). - with subscriptions() as subs: - subs.subscribe(sim.on_start, on_start) - if sim.on_tick is not None: - subs.subscribe(sim.on_tick, on_tick) - subs.subscribe(sim.on_end, on_end) - yield + fetch_start: Event[FetchStart] + """ + Event that fires when geo begins fetching attributes. Payload is the number of ADRIOs. + """ + + adrio_start: Event[AdrioStart] + """ + Event that fires when an individual ADRIO begins data retreival. Payload is the attribute name and index. + """ + + fetch_end: Event[None] + """ + Event that fires when data retreival is complete. + """ def enable_logging(filename: str = 'debug.log', movement: bool = True) -> None: