Skip to content

Commit

Permalink
Reorganized event and messaging code.
Browse files Browse the repository at this point in the history
  • Loading branch information
TJohnsonAZ committed Feb 19, 2024
1 parent ab6bc0f commit e612775
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 155 deletions.
File renamed without changes.
1 change: 1 addition & 0 deletions doc/devlog/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ This folder is a handy place to put Jupyter notebooks or other documents which h
| 2023-11-22-ipm-probs.ipynb | Tyler | | Analyzing statistical correctness of our IPM processing algorithms. |
| 2023-12-05.ipynb | Tyler | | A brief tour of changes to epymorph due to the refactor effort. |
| 2024-01-08.ipynb | Tyler | | Another functional parameters demonstration, revisiting the Bonus Example from 2023-10-10. |
| 2024-02-06-adrio-demo.ipynb | Trevor | | Demonstrates the ADRIO system using code updated for latest changes. |

## Contributing

Expand Down
3 changes: 2 additions & 1 deletion epymorph/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
from epymorph.data import geo_library, ipm_library, mm_library
from epymorph.data_shape import Shapes
from epymorph.engine.standard_sim import StandardSimulation
from epymorph.logging.messaging import sim_messaging
from epymorph.plots import plot_event, plot_pop
from epymorph.proxy import dim, geo
from epymorph.simulation import SimDType, TimeFrame, default_rng, sim_messaging
from epymorph.simulation import SimDType, TimeFrame, default_rng

__all__ = [
'IPM',
Expand Down
18 changes: 8 additions & 10 deletions epymorph/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@
from epymorph.error import UnknownModel
from epymorph.geo.adrio import adrio_maker_library
from epymorph.geo.cache import load_from_cache
from epymorph.geo.dynamic import (DynamicGeo, DynamicGeoFileOps,
dynamic_geo_messaging_sim)
from epymorph.geo.dynamic import DynamicGeo, DynamicGeoFileOps
from epymorph.geo.geo import Geo
from epymorph.geo.static import StaticGeoFileOps
from epymorph.initializer import initializer_library, normalize_init_params
from epymorph.logging.messaging import sim_messaging
from epymorph.movement.parser import MovementSpec, parse_movement_spec
from epymorph.simulation import (TimeFrame, default_rng, enable_logging,
sim_messaging)
from epymorph.simulation import TimeFrame, default_rng, enable_logging


def define_argparser(command_parser: _SubParsersAction):
Expand Down Expand Up @@ -159,12 +158,11 @@ def run(input_path: str,

# Run simulation with appropriate messaging contexts

with sim_messaging(sim):
if geo_messaging and isinstance(geo, DynamicGeo):
with dynamic_geo_messaging_sim(geo.ADRIO_start):
out = sim.run()
else:
out = sim.run()
if not (geo_messaging and type(geo) is DynamicGeo):
geo_messaging = False

with sim_messaging(sim, geo_messaging):
out = sim.run()

# Draw charts (if specified).
# NOTE: this method of chart handling is a placeholder implementation
Expand Down
3 changes: 3 additions & 0 deletions epymorph/engine/standard_sim.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from epymorph.error import (AttributeException, CompilationException,
InitException, IpmSimException, MmSimException,
ValidationException, error_gate)
from epymorph.geo.dynamic import DynamicGeo
from epymorph.geo.geo import Geo
from epymorph.initializer import DEFAULT_INITIALIZER, Initializer
from epymorph.movement.movement_model import MovementModel
Expand Down Expand Up @@ -113,6 +114,8 @@ def __init__(self,
# events
self.on_start = Event()
self.on_tick = Event()
if type(geo) is DynamicGeo:
self.adrio_start = geo.adrio_start
self.on_end = Event()

def validate(self) -> None:
Expand Down
2 changes: 1 addition & 1 deletion epymorph/geo/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
from epymorph.data import adrio_maker_library, geo_library_dynamic
from epymorph.geo.dynamic import DynamicGeo
from epymorph.geo.dynamic import DynamicGeoFileOps as DF
from epymorph.geo.dynamic import dynamic_geo_messaging
from epymorph.geo.geo import Geo
from epymorph.geo.static import StaticGeo
from epymorph.geo.static import StaticGeoFileOps as F
from epymorph.geo.util import convert_to_static_geo
from epymorph.logging.messaging import dynamic_geo_messaging

CACHE_PATH = user_cache_path(appname='epymorph', ensure_exists=True)

Expand Down
104 changes: 10 additions & 94 deletions epymorph/geo/dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@
"""
import os
from concurrent.futures import ThreadPoolExecutor, wait
from contextlib import contextmanager
from time import perf_counter
from typing import Generator, NamedTuple, Protocol, Self, runtime_checkable
from typing import Self

import numpy as np
from numpy.typing import NDArray
Expand All @@ -17,7 +15,8 @@
from epymorph.geo.geo import Geo
from epymorph.geo.spec import (LABEL, AttribDef, DynamicGeoSpec,
validate_geo_values)
from epymorph.util import Event, MemoDict, subscriptions
from epymorph.simulation import AdrioStart, FetchStart
from epymorph.util import Event, MemoDict


def _memoized_adrio_maker_library(lib: ADRIOMakerLibrary) -> MemoDict[str, ADRIOMaker]:
Expand Down Expand Up @@ -79,14 +78,14 @@ def __init__(self, spec: DynamicGeoSpec, adrios: dict[str, ADRIO]):

# events
self.fetch_start = Event()
self.ADRIO_start = Event()
self.adrio_start = Event()
self.fetch_end = Event()

def __getitem__(self, name: str) -> NDArray:
if name not in self._adrios:
raise AttributeException(f"Attribute not found in geo: '{name}'")
if self._adrios[name]._cached_value is None:
self.ADRIO_start.publish(ADRIO_Start(name))
self.adrio_start.publish(AdrioStart(name, None))
return self._adrios[name].get_value()

@property
Expand Down Expand Up @@ -120,14 +119,16 @@ def fetch_all(self) -> None:
"""Retrieves all Geo attributes from geospec object using ADRIOs"""
self.fetch_start.publish(FetchStart(len(self._adrios)))

def fetch_attribute(adrio: ADRIO) -> NDArray:
self.ADRIO_start.publish(ADRIO_Start(adrio.attrib))
def fetch_attribute(adrio: ADRIO, index: int) -> NDArray:
self.adrio_start.publish(AdrioStart(adrio.attrib, index))
return adrio.get_value()

# initialize threads
with ThreadPoolExecutor(max_workers=5) as executor:
index = 1
for adrio in self._adrios.values():
executor.submit(fetch_attribute, adrio)
executor.submit(fetch_attribute, adrio, index)
index += 1

self.fetch_end.publish(None)

Expand All @@ -151,88 +152,3 @@ def load_from_spec(file: os.PathLike, adrio_maker_library: ADRIOMakerLibrary) ->
except Exception as e:
msg = f"Unable to load '{file}' as a geo: {e}"
raise GeoValidationException(msg) from e


class FetchStart(NamedTuple):
"""The payload of a DynamicGeo fetch_start event."""
adrio_len: int


class ADRIO_Start(NamedTuple):
"""The payload of a DynamicGeo adrio_start event."""
attribute: str


@runtime_checkable
class DynamicGeoEvents(Protocol):
"""Protocol for DynamicGeos that support lifecycle events."""

fetch_start: Event[FetchStart]
"""
Event that fires when geo begins fetching attributes. Payload is the number of ADRIOs.
"""

ADRIO_start: Event[ADRIO_Start]
"""
Event that fires when an individual ADRIO begins data retreival. Payload is the current attribute name.
"""

fetch_end: Event[None]
"""
Event that fires when data retreival is complete.
"""


@contextmanager
def dynamic_geo_messaging_sim(dyn: Event) -> Generator[None, None, None]:
"""
Attach messaging to dynamic geo operations during a simulation run to inform user when
simulation halts to actively fetch data.
Creates subscription to ADRIO_Start event
"""
print("Geo not found in cache; geo attributes will be lazily loaded during simulation run.")

def adrio_start(adrio: ADRIO_Start) -> None:
print(f"Uncached geo attribute found: {adrio.attribute}. Retreiving now...")

with subscriptions() as subs:
subs.subscribe(dyn, adrio_start)
yield


@contextmanager
def dynamic_geo_messaging(dyn: DynamicGeoEvents) -> Generator[None, None, None]:
"""
Attach progress messaging to a DynamicGeo for verbose printing of data retreival progress.
Creates subscriptions on the Geo's events.
"""

start_time = 0.0
num_adrios = 0
adrio_index = 1

def fetch_start(length: FetchStart) -> None:
nonlocal num_adrios
num_adrios = length.adrio_len

print("Fetching dynamic geo data")
print(f"• {num_adrios} attributes")

nonlocal start_time
start_time = perf_counter()

def adrio_start(adrio: ADRIO_Start) -> None:
nonlocal adrio_index
print(f"Fetching {adrio.attribute}...[{adrio_index}/{num_adrios}]")
adrio_index += 1

def fetch_end(_: None) -> None:
print("Complete.")
end_time = perf_counter()
print(f"Total fetch time: {(end_time - start_time):.3f}s")

with subscriptions() as subs:
subs.subscribe(dyn.fetch_start, fetch_start)
subs.subscribe(dyn.ADRIO_start, adrio_start)
subs.subscribe(dyn.fetch_end, fetch_end)
yield
98 changes: 98 additions & 0 deletions epymorph/logging/messaging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from contextlib import contextmanager
from time import perf_counter
from typing import Generator

from epymorph.simulation import (AdrioStart, DynamicGeoEvents, FetchStart,
OnStart, SimTick, SimulationEvents)
from epymorph.util import progress, subscriptions


@contextmanager
def sim_messaging(sim: SimulationEvents, geo_messaging=False) -> Generator[None, None, None]:
"""
Attach fancy console messaging to a Simulation, e.g., a progress bar.
This creates subscriptions on `sim`'s events, so you only need to do it once
per sim. Returns `sim` as a convenience.
"""

start_time = 0.0
use_progress_bar = sim.on_tick is not None

if geo_messaging:
print("Geo not found in cache; geo attributes will be lazily loaded during simulation run.")

def on_start(ctx: OnStart) -> None:
start_date = ctx.time_frame.start_date
duration_days = ctx.time_frame.duration_days
end_date = ctx.time_frame.end_date

print(f"Running simulation ({sim.__class__.__name__}):")
print(f"• {start_date} to {end_date} ({duration_days} days)")
print(f"• {ctx.dim.nodes} geo nodes")
if use_progress_bar:
print(progress(0.0), end='\r')
else:
print('Running...')

nonlocal start_time
start_time = perf_counter()

def on_tick(tick: SimTick) -> None:
print(progress(tick.percent_complete), end='\r')

def adrio_start(adrio: AdrioStart) -> None:
print(f"Uncached geo attribute found: {adrio.attribute}. Retreiving now...")

def on_end(_: None) -> None:
end_time = perf_counter()
if use_progress_bar:
print(progress(1.0))
else:
print('Complete.')
print(f"Runtime: {(end_time - start_time):.3f}s")

# Set up a subscriptions context, subscribe our handlers,
# then yield to the outer context (ostensibly where the sim will be run).
with subscriptions() as subs:
subs.subscribe(sim.on_start, on_start)
if sim.on_tick is not None:
subs.subscribe(sim.on_tick, on_tick)
if geo_messaging:
subs.subscribe(sim.adrio_start, adrio_start)
subs.subscribe(sim.on_end, on_end)
yield


@contextmanager
def dynamic_geo_messaging(dyn: DynamicGeoEvents) -> Generator[None, None, None]:
"""
Attach progress messaging to a DynamicGeo for verbose printing of data retreival progress.
Creates subscriptions on the Geo's events.
"""

start_time = 0.0
num_adrios = 0

def fetch_start(length: FetchStart) -> None:
nonlocal num_adrios
num_adrios = length.adrio_len

print("Fetching dynamic geo data")
print(f"• {num_adrios} attributes")

nonlocal start_time
start_time = perf_counter()

def adrio_start(adrio: AdrioStart) -> None:
print(f"Fetching {adrio.attribute}...[{adrio.index}/{num_adrios}]")

def fetch_end(_: None) -> None:
print("Complete.")
end_time = perf_counter()
print(f"Total fetch time: {(end_time - start_time):.3f}s")

with subscriptions() as subs:
subs.subscribe(dyn.fetch_start, fetch_start)
subs.subscribe(dyn.adrio_start, adrio_start)
subs.subscribe(dyn.fetch_end, fetch_end)
yield
Loading

0 comments on commit e612775

Please sign in to comment.