From 509e2159b7b9e5a361cdb309550486bac9ecd523 Mon Sep 17 00:00:00 2001 From: Tyler Coles Date: Fri, 8 Mar 2024 17:52:59 -0700 Subject: [PATCH] Added context for collecting movement data. And a devlog demonstration. --- doc/devlog/2024-03-13.ipynb | 425 ++++++++++++++++++++++++ doc/devlog/README.md | 1 + epymorph/engine/mm_exec.py | 88 +++-- epymorph/engine/standard_sim.py | 33 +- epymorph/engine/test/world_list_test.py | 2 +- epymorph/engine/world.py | 4 +- epymorph/engine/world_hypercube.py | 13 +- epymorph/engine/world_list.py | 26 +- epymorph/event.py | 200 +++++++++++ epymorph/geo/dynamic.py | 4 +- epymorph/log/file.py | 102 ++++++ epymorph/log/messaging.py | 6 +- epymorph/log/movement.py | 135 ++++++++ epymorph/simulation.py | 81 +---- epymorph/util.py | 5 + 15 files changed, 985 insertions(+), 140 deletions(-) create mode 100644 doc/devlog/2024-03-13.ipynb create mode 100644 epymorph/event.py create mode 100644 epymorph/log/file.py create mode 100644 epymorph/log/movement.py diff --git a/doc/devlog/2024-03-13.ipynb b/doc/devlog/2024-03-13.ipynb new file mode 100644 index 00000000..abafc576 --- /dev/null +++ b/doc/devlog/2024-03-13.ipynb @@ -0,0 +1,425 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# devlog 2024-02-13\n", + "\n", + "_Author: Tyler Coles_\n", + "\n", + "Demonstration of how to use the new movement data collection feature to examine the operation of a movement model.\n", + "\n", + "## What data is there to collect?\n", + "\n", + "The execution of the movement model is really answering two questions every time a movement clause fires:\n", + "\n", + "1) how many individuals (regardless of IPM state) were _requested_ to move from every node to every other node, and\n", + "2) how many individuals (divided by IPM state) were _actually_ moved from every node to every other node.\n", + "\n", + "The total number of individuals involved in these two answers can vary, especially if there were not enough individuals in a source node to satisfy the requested movement.\n", + "\n", + "Requested movement (for user-defined clauses) is the direct result of the movement clause function. Return movement is done internally by epymorph, so the \"return clause\" is virtual, in a sense, though its data is also captured by this mechanism.\n", + "\n", + "In movement execution, the requested movement is converted into actual movement by performing random draws on the individuals present in a location. For example, if we request 100 people to move, we might choose 75 Susceptibles, 20 Infectious, and 5 Recovered (in an SIR model). If there are less than 100 people in that location, we will choose all of them, whichever IPM state they are in.\n", + "\n", + "## How do we collect movement data?\n", + "\n", + "If we run our simulation in a `movement_data` context, it will provide us a `MovementData` object which we can use to access both _requested_ and _actual_ movement flows by clause name or in total.\n", + "\n", + "Let's see an example: first we'll set up a simulation using the \"centroids\" movement model, then run it to collect movement data, and finally inspect the collected data to give you an idea of what you can do with it." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "from functools import partial\n", + "\n", + "import matplotlib.pyplot as plt\n", + "import numpy as np\n", + "import pandas as pd\n", + "import pygris as pg\n", + "\n", + "from epymorph import *\n", + "from epymorph.geo.cache import load_from_cache\n", + "from epymorph.geo.geo import Geo\n", + "from epymorph.initializer import single_location\n", + "from epymorph.log.movement import movement_data" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Running simulation (StandardSimulation):\n", + "• 2015-01-01 to 2015-04-11 (100 days)\n", + "• 158 geo nodes\n", + "|####################| 100% \n", + "Runtime: 6.332s\n" + ] + } + ], + "source": [ + "def load_geo(name: str) -> Geo:\n", + " cached = load_from_cache(name)\n", + " if not cached is None:\n", + " return cached\n", + " else:\n", + " return geo_library[name]()\n", + "\n", + "\n", + "geo = load_geo('us_sw_counties_2015')\n", + "ipm = ipm_library['sirs']()\n", + "mm = mm_library['centroids']()\n", + "\n", + "sim = StandardSimulation(\n", + " geo, ipm, mm,\n", + " params={\n", + " 'phi': 40.0,\n", + " 'beta': 0.4,\n", + " 'gamma': 1 / 5,\n", + " 'xi': 1 / 90,\n", + " },\n", + " time_frame=TimeFrame.of(\"2015-01-01\", 100),\n", + " initializer=partial(single_location, location=0, seed_size=1_000),\n", + " rng=default_rng(42),\n", + ")\n", + "\n", + "# Python lets us run code in a block with multiple context managers,\n", + "# so we'll do both the console messaging and collection of movement data.\n", + "# The `move_data` object returned by the movement_data context is where\n", + "# the movement data will be collected.\n", + "# NOTE: we should only access its data _after_ the context completes.\n", + "with (\n", + " sim_messaging(sim, True),\n", + " movement_data(sim) as move_data,\n", + "):\n", + " out = sim.run()\n", + "\n", + "# Now we're free to use `move_data`." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Data Analysis\n", + "\n", + "1. Which counties \"exported\" the most infectious cases?\n", + "\n", + "To find out, we can sum all \"I\" individuals leaving each node across the whole time series. There's only one clause that is responsible for people leaving their home county, and that's \"centroids_commuters\", so we focus just on the actual movement from that clause." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
labelexported
0Maricopa County, Arizona790588
1Salt Lake County, Utah355211
2Clark County, Nevada274661
3El Paso County, Colorado238660
4Denver County, Colorado238172
5Bernalillo County, New Mexico229522
6Arapahoe County, Colorado219040
7Jefferson County, Colorado209478
8Pima County, Arizona190433
9Utah County, Utah185672
\n", + "
" + ], + "text/plain": [ + " label exported\n", + "0 Maricopa County, Arizona 790588\n", + "1 Salt Lake County, Utah 355211\n", + "2 Clark County, Nevada 274661\n", + "3 El Paso County, Colorado 238660\n", + "4 Denver County, Colorado 238172\n", + "5 Bernalillo County, New Mexico 229522\n", + "6 Arapahoe County, Colorado 219040\n", + "7 Jefferson County, Colorado 209478\n", + "8 Pima County, Arizona 190433\n", + "9 Utah County, Utah 185672" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Get actual commuters movement flows\n", + "commuters = move_data.actual_by('centroids_commuters')\n", + "\n", + "# Select just the Infectious compartment\n", + "# NOTE: `compartments_by` returns a tuple, so this result still has 4 axes; we'll deal with it in the next step.\n", + "commuters_I = commuters[:, :, :, ipm.compartments_by('I')]\n", + "\n", + "# Compute the totals leaving each source node, regardless of destination node, and across time.\n", + "exported_I = commuters_I.sum(axis=(0, 2, 3))\n", + "\n", + "# `argsort` gives us the indices that would sort the array, so we can use this to subselect.\n", + "# Sort the negative of the array to get descending order.\n", + "top_ten = np.argsort(-exported_I)[0:10]\n", + "\n", + "# Chart of the top 10 \"exporters\"\n", + "# I think we can safely say the units for the exported column are \"people-days\",\n", + "# since each individual is counted every time they commute as long as they are infectious,\n", + "# but we know from the movement mechanics that they will only commute up to once per day.\n", + "# It's a little esoteric, but gives us a basis for comparison.\n", + "pd.DataFrame({\n", + " 'label': geo.labels[top_ten],\n", + " 'exported': exported_I[top_ten],\n", + "})" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "2. Let's draw a map.\n", + "\n", + "That isn't a question, but sure! Maybe we'd like to see which counties exported the most infectious person-days relative to the total person-days they exported." + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "commuters = move_data.actual_by('centroids_commuters')\n", + "\n", + "df = pd.DataFrame({\n", + " 'GEOID': geo['geoid'],\n", + " 'I': commuters[..., ipm.compartments_by('I')].sum(axis=(0, 2, 3)),\n", + " 'N': commuters.sum(axis=(0, 2, 3)),\n", + "})\n", + "df['I:N'] = df['I'] / df['N']\n", + "\n", + "gdf_counties = pg.counties(cb=True, resolution='5m', cache=True, year=2020)\n", + "\n", + "gdf = pd.merge(\n", + " on=\"GEOID\",\n", + " how=\"right\",\n", + " left=gdf_counties[['GEOID', 'geometry']],\n", + " right=df,\n", + ")\n", + "\n", + "fig, ax = plt.subplots(figsize=(8, 6))\n", + "ax.axis('off')\n", + "ax.set_title(\"Exported person-days by county: ratio of infectious to total\")\n", + "gdf.plot(ax=ax, column='I:N', cmap='Purples', legend=True, vmin=0, vmax=0.06)\n", + "gdf.plot(ax=ax, color='none', edgecolor='white', linewidth=1, alpha=0.2)\n", + "fig.tight_layout()\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "3. When are the most infectious individuals moving around?\n", + "\n", + "We could certainly sum across the geography and look at time-series data. This time we'll lump commuting movement and returning movement together -- we're interested in how many infectious individuals are moving even if they catch the infection \"at work\" and bring it home. Again, the units of this aren't easily applicable, but the magnitude is all we need for a comparison." + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "movers = move_data.actual_all()\n", + "\n", + "T, N, _, E = out.dim.TNCE\n", + "D = out.dim.days\n", + "taus = out.dim.tau_steps\n", + "\n", + "# Select infected movers and sum by simulation tick\n", + "movers_I = movers[..., ipm.compartments_by('I')].sum(axis=(1, 2, 3))\n", + "\n", + "# Then we group-and-sum to get a daily total\n", + "movers_I_per_day = movers_I.reshape((T // taus, taus)).sum(axis=1)\n", + "\n", + "fig, ax = plt.subplots(figsize=(8, 6))\n", + "ax.set_title(\"Total movement events involving infectious individuals\")\n", + "ax.set_xlabel('day')\n", + "ax.set_ylabel('movements')\n", + "ax.plot(np.arange(D), movers_I_per_day)\n", + "fig.tight_layout()\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "4. How can we check that everyone who leaves due to the commuters clause returns home at the end of the day, like we expect?\n", + "\n", + "In other words: how do we know the centroids movement model works?" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "all equal!\n" + ] + } + ], + "source": [ + "# Commuters should be leaving their homes at the start of the day-time tau-step (tau-0)...\n", + "commuters = move_data.actual_by('centroids_commuters')\n", + "# And returning home at the end of the day (aka, the start of the night-time tau-step; tau-1).\n", + "returners = move_data.actual_by('return')\n", + "\n", + "# There are no other clauses in this movement model to worry about,\n", + "# but more complicated models may not be as easy as this to analyze.\n", + "\n", + "# Compare tau-0 to tau-1 for every day in the sim:\n", + "for t in range(0, out.dim.ticks, 2):\n", + " # Sum commuters by source node,\n", + " a = commuters[t + 0].sum(axis=(1, 2))\n", + " # But sum returners by destination node!\n", + " b = returners[t + 1].sum(axis=(0, 2))\n", + " # This way we're correctly comparing everyone who left with everyone who came back.\n", + "\n", + " # I use the sum here because the returners might not have the exact same IPM state distribution\n", + " # as when they left; the IPM execution may have transitioned some of them to different states.\n", + " # Only the count of individuals can be guaranteed to be the same.\n", + "\n", + " # We've found an error if there are any differences between `a` and `b`.\n", + " if not np.array_equal(a, b):\n", + " print(f\"not equal at {t}\")\n", + " break\n", + "else:\n", + " # Otherwise, no errors means success!\n", + " print(\"all equal!\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.8" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/doc/devlog/README.md b/doc/devlog/README.md index d20d6294..47e09bd4 100644 --- a/doc/devlog/README.md +++ b/doc/devlog/README.md @@ -45,6 +45,7 @@ This folder is a handy place to put Jupyter notebooks or other documents which h | 2024-02-06.ipynb | Tyler | | Revisiting age-class IPMs, and thinking about modularity of approach. | | 2024-02-12.ipynb | Tyler | | Continued age-class IPM work, this time in more than one geo node. | | 2024-03-01.ipynb | Tyler | | Getting the indices of IPM events and compartments by name with wildcard support. | +| 2024-03-13.ipynb | Tyler | | Showing off movement data collection (NEW!) | ## Contributing diff --git a/epymorph/engine/mm_exec.py b/epymorph/engine/mm_exec.py index 3c4cbce4..cb2187f9 100644 --- a/epymorph/engine/mm_exec.py +++ b/epymorph/engine/mm_exec.py @@ -2,7 +2,6 @@ Movement executor classes handle the logic for processing the movement step of the simulation. """ from abc import ABC, abstractmethod -from logging import DEBUG, Logger, getLogger import numpy as np from numpy.typing import NDArray @@ -10,6 +9,8 @@ from epymorph.engine.context import RumeContext from epymorph.engine.world import World from epymorph.error import AttributeException, MmCompileException +from epymorph.event import (MovementEventsMixin, OnMovementClause, + OnMovementFinish, OnMovementStart) from epymorph.movement.compile import compile_spec from epymorph.movement.movement_model import (MovementModel, PredefParams, TravelClause) @@ -35,17 +36,17 @@ def apply(self, world: World, tick: Tick) -> None: ############################################################ -class StandardMovementExecutor(MovementExecutor): +class StandardMovementExecutor(MovementEventsMixin, MovementExecutor): """The standard implementation of movement model execution.""" _ctx: RumeContext - _log: Logger _model: MovementModel _clause_masks: dict[TravelClause, NDArray[np.bool_]] _predef: PredefParams = {} _predef_hash: int | None = None def __init__(self, ctx: RumeContext): + super().__init__() # If we were given a MovementSpec, we need to compile it to get its clauses. if isinstance(ctx.mm, MovementSpec): self._model = compile_spec(ctx.mm, ctx.rng) @@ -53,7 +54,6 @@ def __init__(self, ctx: RumeContext): self._model = ctx.mm self._ctx = ctx - self._log = getLogger('movement') self._clause_masks = {c: c.mask(ctx) for c in self._model.clauses} self._check_predef() @@ -76,11 +76,14 @@ def _check_predef(self) -> None: def apply(self, world: World, tick: Tick) -> None: """Applies movement for this tick, mutating the world state.""" - self._log.debug('Processing movement for day %s, step %s', tick.day, tick.step) + + self.on_movement_start.publish( + OnMovementStart(tick.index, tick.day, tick.step)) self._check_predef() # Process travel clauses. + total = 0 for clause in self._model.clauses: if not clause.predicate(self._ctx, tick): continue @@ -89,10 +92,30 @@ def apply(self, world: World, tick: Tick) -> None: returns = clause.returns(self._ctx, tick) return_tick = self._ctx.resolve_tick(tick, returns) world.apply_travel(travelers, return_tick) + total += travelers.sum() # Process return clause. - return_movers = world.apply_return(tick) - self._log.getChild('Return').debug("moved %d", return_movers) + # return_requested, return_actual, return_total = world.apply_return(tick) + return_movers = world.apply_return(tick, return_stats=True) + if return_movers is not None: + return_total = return_movers.sum() + total += return_total + + self.on_movement_clause.publish( + OnMovementClause( + tick.index, + tick.day, + tick.step, + "return", + return_movers, + return_movers, + return_total, + False, + ) + ) + + self.on_movement_finish.publish( + OnMovementFinish(tick.index, tick.day, tick.step, total)) def _travelers(self, clause: TravelClause, tick: Tick, local_cohorts: NDArray[SimDType]) -> NDArray[SimDType]: """ @@ -101,38 +124,35 @@ def _travelers(self, clause: TravelClause, tick: Tick, local_cohorts: NDArray[Si then selects exactly which individuals (by compartment) should move. Returns an (N,N,C) array; from-source-to-destination-by-compartment. """ - clause_log = self._log.getChild(clause.name) _, N, C, _ = self._ctx.dim.TNCE - requested_movers = clause.requested(self._ctx, self._predef, tick) - np.fill_diagonal(requested_movers, 0) - requested_sum = requested_movers.sum(axis=1, dtype=SimDType) + clause_movers = clause.requested(self._ctx, self._predef, tick) + np.fill_diagonal(clause_movers, 0) + clause_sum = clause_movers.sum(axis=1, dtype=SimDType) available_movers = local_cohorts * self._clause_masks[clause] available_sum = available_movers.sum(axis=1, dtype=SimDType) - # If requested total is greater than the total available, + # If clause requested total is greater than the total available, # use mvhg to select as many as possible. - throttled = False - for src in range(N): - if requested_sum[src] > available_sum[src]: - msg = " movement throttled for insufficient population at %d" - clause_log.debug(msg, src) - requested_movers[src, :] = self._ctx.rng.multivariate_hypergeometric( - colors=requested_movers[src, :], - nsample=available_sum[src] - ) - throttled = True - - # Update sum if it changed in the previous step. - if throttled: + if not np.any(clause_sum > available_sum): + throttled = False + requested_movers = clause_movers + requested_sum = clause_sum + else: + throttled = True + requested_movers = clause_movers.copy() + for src in range(N): + if clause_sum[src] > available_sum[src]: + requested_movers[src, :] = self._ctx.rng.multivariate_hypergeometric( + colors=requested_movers[src, :], + nsample=available_sum[src] + ) requested_sum = requested_movers.sum(axis=1, dtype=SimDType) # The probability a mover from a src will go to a dst. requested_prb = row_normalize(requested_movers, requested_sum, dtype=SimDType) - clause_log.debug("requested_movers:\n%s", requested_movers) - travelers_cs = np.zeros((N, N, C), dtype=SimDType) for src in range(N): if requested_sum[src] == 0: @@ -151,7 +171,17 @@ def _travelers(self, clause: TravelClause, tick: Tick, local_cohorts: NDArray[Si requested_prb[src, :] ).T.astype(SimDType) - if clause_log.isEnabledFor(DEBUG): - clause_log.debug("moved %d", requested_sum.sum()) + self.on_movement_clause.publish( + OnMovementClause( + tick.index, + tick.day, + tick.step, + clause.name, + clause_movers, + travelers_cs, + requested_sum.sum(), + throttled, + ) + ) return travelers_cs diff --git a/epymorph/engine/standard_sim.py b/epymorph/engine/standard_sim.py index d66994c8..085fb021 100644 --- a/epymorph/engine/standard_sim.py +++ b/epymorph/engine/standard_sim.py @@ -16,14 +16,15 @@ from epymorph.error import (AttributeException, CompilationException, InitException, IpmSimException, MmSimException, ValidationException, error_gate) +from epymorph.event import (MovementEventsMixin, OnStart, OnTick, + SimulationEventsMixin) from epymorph.geo.geo import Geo from epymorph.initializer import DEFAULT_INITIALIZER, Initializer from epymorph.movement.movement_model import MovementModel, validate_mm from epymorph.movement.parser import MovementSpec from epymorph.params import ContextParams, Params -from epymorph.simulation import (OnStart, SimDimensions, SimDType, SimTick, - SimulationEvents, TimeFrame) -from epymorph.util import Event +from epymorph.simulation import SimDimensions, SimDType, TimeFrame +from epymorph.util import Subscriber @dataclass @@ -88,13 +89,12 @@ def ticks_in_days(self) -> NDArray[np.float64]: return np.cumsum(np.tile(self.dim.tau_step_lengths, self.dim.days), dtype=np.float64) -class StandardSimulation(SimulationEvents): +class StandardSimulation(SimulationEventsMixin, MovementEventsMixin): """Runs singular simulation passes, producing time-series output.""" _config: RumeConfig _params: ContextParams | None = None geo: Geo - on_tick: Event[SimTick] # this class supports on_tick; so narrow the type def def __init__(self, geo: Geo, @@ -104,6 +104,9 @@ def __init__(self, time_frame: TimeFrame, initializer: Initializer | None = None, rng: Callable[[], np.random.Generator] | None = None): + SimulationEventsMixin.__init__(self) + MovementEventsMixin.__init__(self) + self.geo = geo if initializer is None: initializer = DEFAULT_INITIALIZER @@ -112,11 +115,6 @@ def __init__(self, self._config = RumeConfig(geo, ipm, mm, params, time_frame, initializer, rng) - # events - self.on_start = Event() - self.on_tick = Event() - self.on_end = Event() - def validate(self) -> None: """Validate the simulation.""" with error_gate("validating the simulation", ValidationException, CompilationException): @@ -148,11 +146,22 @@ def run(self) -> Output: Run the simulation. It is safe to call this multiple times to run multiple independent simulations with the same configuraiton. """ + event_subs = Subscriber() + with error_gate("compiling the simulation", CompilationException): ctx = RumeContext.from_config(self._config) ipm_exec = StandardIpmExecutor(ctx) movement_exec = StandardMovementExecutor(ctx) + # Proxy the movement_exec's events, if anyone is listening for them. + if MovementEventsMixin.has_subscribers(self): + event_subs.subscribe(movement_exec.on_movement_start, + self.on_movement_start.publish) + event_subs.subscribe(movement_exec.on_movement_clause, + self.on_movement_clause.publish) + event_subs.subscribe(movement_exec.on_movement_finish, + self.on_movement_finish.publish) + with error_gate("initializing the simulation", InitException): ini = ctx.initialize() world = ListWorld.from_initials(ini) @@ -173,9 +182,11 @@ def run(self) -> Output: out.prevalence[tick.index] = tick_prevalence t = tick.index - self.on_tick.publish(SimTick(t, (t + 1) / ctx.dim.ticks)) + self.on_tick.publish(OnTick(t, (t + 1) / ctx.dim.ticks)) self.on_end.publish(None) + + event_subs.unsubscribe() return out diff --git a/epymorph/engine/test/world_list_test.py b/epymorph/engine/test/world_list_test.py index 014a4005..1c7805f1 100644 --- a/epymorph/engine/test/world_list_test.py +++ b/epymorph/engine/test/world_list_test.py @@ -216,7 +216,7 @@ def test_return(self): Cohort(np.array([19]), 0, 2), ]]) - world.apply_return(Tick(1, 1, date(2023, 1, 1), 0, 1.0)) + world.apply_return(Tick(1, 1, date(2023, 1, 1), 0, 1.0), return_stats=False) self.assertWorld( world.locations, diff --git a/epymorph/engine/world.py b/epymorph/engine/world.py index eb3c4cc0..d9eb4d09 100644 --- a/epymorph/engine/world.py +++ b/epymorph/engine/world.py @@ -46,7 +46,9 @@ def apply_travel(self, travelers: NDArray[SimDType], return_tick: int) -> None: """ @abstractmethod - def apply_return(self, tick: Tick) -> int: + def apply_return(self, tick: Tick, *, return_stats: bool) -> NDArray[SimDType] | None: """ Modify the world state as a result of returning all movers that are ready to be returned home. + If `return_stats` is True, returns an NxNxC array containing the individuals moved during the return. + Otherwise returns None. """ diff --git a/epymorph/engine/world_hypercube.py b/epymorph/engine/world_hypercube.py index 47f0c7d3..1cbbb377 100644 --- a/epymorph/engine/world_hypercube.py +++ b/epymorph/engine/world_hypercube.py @@ -107,11 +107,14 @@ def apply_travel(self, travelers: NDArray[SimDType], return_tick: int) -> None: self.ledger[return_tick + 1, :, :, :] += travelers self.time_frontier = max(self.time_frontier, return_tick + 2) - def apply_return(self, tick: Tick) -> int: - total = self.ledger[self.time_offset + 1, :, :, :].sum(dtype=SimDType) - movers = self.ledger[self.time_offset + 1, :, :, :].sum( + def apply_return(self, tick: Tick, *, return_stats: bool) -> NDArray[SimDType] | None: + # we have to transpose the movers "stats" result since they're being stored here as + # (home, visiting) and our result needs to be + # (moving from "visiting", moving to "home") + movers = self.ledger[self.time_offset + 1, :, :, :].transpose((1, 0, 2)).copy() + movers_by_home = self.ledger[self.time_offset + 1, :, :, :].sum( axis=1, dtype=SimDType) * self._ident - self.ledger[self.time_offset + 1, :, :, :] = movers + \ + self.ledger[self.time_offset + 1, :, :, :] = movers_by_home + \ self.ledger[self.time_offset, :, :, :] self.time_offset += 1 # assumes there's only ever one return clause per tick - return total + return movers if return_stats else None diff --git a/epymorph/engine/world_list.py b/epymorph/engine/world_list.py index c6772951..7ccbb2bb 100644 --- a/epymorph/engine/world_list.py +++ b/epymorph/engine/world_list.py @@ -1,6 +1,6 @@ """World implementation: ListWorld.""" from operator import attrgetter -from typing import Iterable, Self +from typing import Any, Iterable, Self import numpy as np from numpy.typing import NDArray @@ -64,6 +64,7 @@ class ListWorld(World): """The value of a population's `return_tick` when the population is home.""" nodes: int + compartments: int locations: list[list[Cohort]] @classmethod @@ -79,6 +80,7 @@ def from_initials(cls, initial_compartments: NDArray[SimDType]) -> Self: def __init__(self, locations: list[list[Cohort]]): self.nodes = len(locations) + self.compartments = len(locations[0][0].compartments) self.locations = locations def normalize(self) -> None: @@ -133,22 +135,28 @@ def apply_travel(self, travelers: NDArray[SimDType], return_tick: int) -> None: self.normalize() - def apply_return(self, tick: Tick) -> int: - total_movers = 0 + def apply_return(self, tick: Tick, *, return_stats: bool) -> NDArray[SimDType] | None: + movers: Any = None + if return_stats: + size = (self.nodes, self.nodes, self.compartments) + movers = np.zeros(size, dtype=SimDType) + next_locations = [[locals] for locals in self.get_local_cohorts()] - for loc_index, loc in enumerate(self.locations): - for cohort in loc: + for i, location in enumerate(self.locations): + for cohort in location: if cohort.return_tick == ListWorld.HOME_TICK: # locals are already where they need to be continue elif cohort.return_tick == tick.index: # cohort ready to go home, merge with locals - next_locations[cohort.return_location][0].compartments += cohort.compartments - total_movers += cohort.compartments.sum() + j = cohort.return_location + next_locations[j][0].compartments += cohort.compartments + if return_stats: + movers[i, j, :] = cohort.compartments else: # cohort staying - next_locations[loc_index].append(cohort) + next_locations[i].append(cohort) self.locations = next_locations self.normalize() - return total_movers + return movers if return_stats else None diff --git a/epymorph/event.py b/epymorph/event.py new file mode 100644 index 00000000..5680f459 --- /dev/null +++ b/epymorph/event.py @@ -0,0 +1,200 @@ +""" +epymorph's event frameworks. +The idea is to have a set of classes which define event protocols for +logical components of epymorph. +""" +from typing import NamedTuple, Protocol, runtime_checkable + +from numpy.typing import NDArray + +from epymorph.simulation import SimDimensions, SimDType, TimeFrame +from epymorph.util import Event + +# Simulation Events + + +class OnStart(NamedTuple): + """The payload of a Simulation on_start event.""" + dim: SimDimensions + time_frame: TimeFrame + + +class OnTick(NamedTuple): + """The payload of a Simulation tick event.""" + tick_index: int + percent_complete: float + + +@runtime_checkable +class SimulationEvents(Protocol): + """ + Protocol for Simulations that support lifecycle events. + For correct operation, ensure that `on_start` is fired first, + then `on_tick` at least once, then finally `on_end`. + """ + + on_start: Event[OnStart] + """ + Event fires at the start of a simulation run. Payload is a subset of the context for this run. + """ + + on_tick: Event[OnTick] + """ + Event which fires after each tick has been processed. + Event payload is a tuple containing the tick index just completed (an integer), + and the percentage complete (a float). + """ + + # TODO: rename `on_end` to `on_finish`. + + on_end: Event[None] + """ + Event fires after a simulation run is complete. + """ + + +class SimulationEventsMixin(SimulationEvents): + """A mixin implementation of the SimulationEvents protocol which initializes the events.""" + + def __init__(self): + self.on_start = Event() + self.on_tick = Event() + self.on_end = Event() + + def has_subscribers(self) -> bool: + """True if there is at least one subscriber on any simulation event.""" + return self.on_start.has_subscribers \ + or self.on_tick.has_subscribers \ + or self.on_end.has_subscribers + + +# Movement Events + + +class OnMovementStart(NamedTuple): + """The payload for the event when movement processing starts for a tick.""" + tick: int + """Which simulation tick.""" + day: int + """Which simulation day.""" + step: int + """Which tau step (by index).""" + + +class OnMovementClause(NamedTuple): + """The payload for the event when a single movement clause has been processed.""" + tick: int + """Which simulation tick.""" + day: int + """Which simulation day.""" + step: int + """Which tau step (by index).""" + clause_name: str + """The clause processed.""" + requested: NDArray[SimDType] + """ + The number of individuals this clause 'wants' to move, that is, the values returned by its clause funcction. + (An NxN array.) + """ + actual: NDArray[SimDType] + """The actual number of individuals moved, by source, destination, and compartment. (An NxNxC array.)""" + total: int + """The number of individuals moved by this clause.""" + is_throttled: bool + """Did the clause request to move more people than were available (at any location)?""" + + +class OnMovementFinish(NamedTuple): + """The payload for the event when movement processing finishes for one simulation tick.""" + tick: int + """Which simulation tick.""" + day: int + """Which simulation day.""" + step: int + """Which tau step (by index).""" + total: int + """The total number of individuals moved during this tick.""" + + +@runtime_checkable +class MovementEvents(Protocol): + """ + Mixin for Simulations that support movement events. + For correct operation, ensure that `on_movement_start` is fired first, + then `on_movement_clause` any number of times, then finally `on_movement_finish`. + """ + + on_movement_start: Event[OnMovementStart] + """ + Event fires at the start of the movement processing phase for every simulation tick. + """ + + on_movement_clause: Event[OnMovementClause] + """ + Event fires after every movement clause has been processed, excluding clauses that are not triggered in this tick. + """ + + on_movement_finish: Event[OnMovementFinish] + """ + Event fires at the end of the movement processing phase for every simulation tick. + """ + + +class MovementEventsMixin(MovementEvents): + """A mixin implementation of the MovementEvents protocol which initializes the events.""" + + def __init__(self): + self.on_movement_start = Event() + self.on_movement_clause = Event() + self.on_movement_finish = Event() + + def has_subscribers(self) -> bool: + """True if there is at least one subscriber on any movement event.""" + return self.on_movement_start.has_subscribers \ + or self.on_movement_clause.has_subscribers \ + or self.on_movement_finish.has_subscribers + + +class SimWithEvents(SimulationEvents, MovementEvents, Protocol): + """Intersection type of SimulationEvents and MovementEvents""" + + +# Geo/ADRIO Events + + +class FetchStart(NamedTuple): + """The payload of a DynamicGeo fetch_start event.""" + adrio_len: int + + +class AdrioStart(NamedTuple): + """The payload of a DynamicGeo adrio_start event.""" + attribute: str + index: int | None + """An index assigned to this ADRIO if fetching ADRIOs as a batch.""" + adrio_len: int | None + """The total number of ADRIOs being fetched if fetching ADRIOs as a batch.""" + + +@runtime_checkable +class DynamicGeoEvents(Protocol): + """ + Protocol for DynamicGeos that support lifecycle events. + For correct operation, ensure that `fetch_start` is fired first, + then `adrio_start` any number of times, then finally `fetch_end`. + """ + + fetch_start: Event[FetchStart] + """ + Event that fires when geo begins fetching attributes. Payload is the number of ADRIOs. + """ + + adrio_start: Event[AdrioStart] + """ + Event that fires when an individual ADRIO begins data retreival. Payload is the attribute name and index. + """ + + fetch_end: Event[None] + """ + Event that fires when data retreival is complete. + """ diff --git a/epymorph/geo/dynamic.py b/epymorph/geo/dynamic.py index c9183c83..1d2084be 100644 --- a/epymorph/geo/dynamic.py +++ b/epymorph/geo/dynamic.py @@ -11,12 +11,12 @@ from numpy.typing import NDArray from epymorph.error import AttributeException, GeoValidationException +from epymorph.event import AdrioStart, DynamicGeoEvents, FetchStart from epymorph.geo.adrio.adrio import ADRIO, ADRIOMaker, ADRIOMakerLibrary from epymorph.geo.geo import Geo from epymorph.geo.spec import (LABEL, AttribDef, DynamicGeoSpec, validate_geo_values) -from epymorph.simulation import (AdrioStart, AttributeArray, DynamicGeoEvents, - FetchStart) +from epymorph.simulation import AttributeArray from epymorph.util import Event, MemoDict diff --git a/epymorph/log/file.py b/epymorph/log/file.py new file mode 100644 index 00000000..ea8451ef --- /dev/null +++ b/epymorph/log/file.py @@ -0,0 +1,102 @@ +"""For logging epymorph simulations to a file.""" +from contextlib import contextmanager +from logging import (BASIC_FORMAT, DEBUG, NOTSET, FileHandler, Formatter, + getLogger) +from time import perf_counter +from typing import Generator + +from epymorph.event import (AdrioStart, DynamicGeoEvents, OnMovementClause, + OnMovementFinish, OnMovementStart, OnStart, OnTick, + SimWithEvents) +from epymorph.util import subscriptions + + +@contextmanager +def file_log( + sim: SimWithEvents, + log_file: str = 'debug.log', + log_level: str | int = DEBUG, +) -> Generator[None, None, None]: + """Attach file logging to a simulation.""" + + log_handler = FileHandler(log_file, "w", "utf8") + log_handler.setFormatter(Formatter(BASIC_FORMAT)) + + epy_log = getLogger('epymorph') + epy_log.addHandler(log_handler) + epy_log.setLevel(log_level) + + sim_log = epy_log.getChild('sim') + geo_log = epy_log.getChild('geo') + mm_log = epy_log.getChild('movement') + + # Define handlers for each of the events we're interested in. + + start_time: float | None = None + + def on_start(ctx: OnStart) -> None: + start_date = ctx.time_frame.start_date + duration_days = ctx.time_frame.duration_days + end_date = ctx.time_frame.end_date + + sim_log.info(f"Running simulation ({sim.__class__.__name__}):") + sim_log.info(f"- {start_date} to {end_date} ({duration_days} days)") + sim_log.info(f"- {ctx.dim.nodes} geo nodes") + + nonlocal start_time + start_time = perf_counter() + + def on_tick(tick: OnTick) -> None: + sim_log.info("Completed simulation tick %d", tick.tick_index) + + def on_end(_: None) -> None: + sim_log.info('Complete.') + end_time = perf_counter() + if start_time is not None: + sim_log.info(f"Runtime: {(end_time - start_time):.3f}s") + + def adrio_start(adrio: AdrioStart) -> None: + geo_log.debug( + "Uncached geo attribute requested: %s. Retreiving now.", adrio.attribute) + + def on_movement_start(e: OnMovementStart) -> None: + mm_log.info("Processing movement for day %d, step %d.", e.day, e.step) + + def on_movement_clause(e: OnMovementClause) -> None: + cl_log = mm_log.getChild(e.clause_name) + if e.total > 0: + cl_log.debug("requested:\n%s", e.requested) + if e.is_throttled: + cl_log.debug( + "WARNING: movement is throttled due to insufficient population") + cl_log.debug("moved:\n%s", e.actual) + cl_log.info("moved %d individuals", e.total) + + def on_movement_finish(e: OnMovementFinish) -> None: + mm_log.info(f"Moved a total of {e.total} individuals.") + + # Set up a subscriptions context, subscribe our handlers, + # then yield to the outer context (where the sim should be run). + with subscriptions() as subs: + # Simulation logging + subs.subscribe(sim.on_start, on_start) + if sim.on_tick is not None: + subs.subscribe(sim.on_tick, on_tick) + subs.subscribe(sim.on_end, on_end) + + # Geo logging will be attached if it makes sense. + sim_geo = getattr(sim, 'geo', None) + if isinstance(sim_geo, DynamicGeoEvents): + geo_log.info( + "Geo not loaded from cache; attributes will be lazily loaded during simulation run.") + subs.subscribe(sim_geo.adrio_start, adrio_start) + + # Movement logging + subs.subscribe(sim.on_movement_start, on_movement_start) + subs.subscribe(sim.on_movement_clause, on_movement_clause) + subs.subscribe(sim.on_movement_finish, on_movement_finish) + + yield # to outer context + + epy_log.removeHandler(log_handler) + epy_log.setLevel(NOTSET) diff --git a/epymorph/log/messaging.py b/epymorph/log/messaging.py index 70e65965..a242e4e6 100644 --- a/epymorph/log/messaging.py +++ b/epymorph/log/messaging.py @@ -2,8 +2,8 @@ from time import perf_counter from typing import Generator -from epymorph.simulation import (AdrioStart, DynamicGeoEvents, FetchStart, - OnStart, SimTick, SimulationEvents) +from epymorph.event import (AdrioStart, DynamicGeoEvents, FetchStart, OnStart, + OnTick, SimulationEvents) from epymorph.util import progress, subscriptions @@ -49,7 +49,7 @@ def on_start(ctx: OnStart) -> None: nonlocal start_time start_time = perf_counter() - def on_tick(tick: SimTick) -> None: + def on_tick(tick: OnTick) -> None: print(progress(tick.percent_complete), end='\r') def adrio_start(adrio: AdrioStart) -> None: diff --git a/epymorph/log/movement.py b/epymorph/log/movement.py new file mode 100644 index 00000000..45fa9ee3 --- /dev/null +++ b/epymorph/log/movement.py @@ -0,0 +1,135 @@ +"""For capturing extremely detailed movement data from a simulation.""" +from abc import abstractmethod +from contextlib import contextmanager +from typing import Generator, NamedTuple, Protocol + +import numpy as np +from numpy.typing import NDArray + +from epymorph.event import OnMovementClause, OnStart, SimWithEvents +from epymorph.simulation import SimDimensions, SimDType +from epymorph.util import subscriptions + + +class MovementData(Protocol): + """ + A data collection of simulation movement data. + Run the simulation inside a `movement_data` context and + an instance of this class will be returned. After the simulation + has completed, you can use this object to retrieve movement data + either by clause or in aggregate across all clauses. + + A note about axis ordering: both `requested` and `actual` data + includes a pair of axes that are the length of the number of geo nodes + in the simulation (N). Because these data represent movement flows we + use the convention that the first N represents where the movement is "from" and + the second N represents where the movement is "to". This is true regardless + of which clause is responsible. Returning movement from the return clause is treated + no differently than outgoing movement from a user-defined clause. + """ + + @abstractmethod + def requested_by(self, clause: str) -> NDArray[SimDType]: + """The time series of requested movement by clause. Array shape: (T,N,N)""" + + @abstractmethod + def actual_by(self, clause: str) -> NDArray[SimDType]: + """The time series of actual movement by clause. Array shape: (T,N,N,C)""" + + @abstractmethod + def requested_all(self) -> NDArray[SimDType]: + """The time series of requested movement for all clauses. Array shape: (T,N,N)""" + + @abstractmethod + def actual_all(self) -> NDArray[SimDType]: + """The time series of actual movement for all clauses. Array shape: (T,N,N,C)""" + + +class _Entry(NamedTuple): + """The data associated with a movement clause firing on a given tick.""" + name: str + tick: int + data: NDArray[SimDType] + + +class _MovementDataBuilder(MovementData): + """ + The mechanics of a context require that, in order to return a value, we have to + supply that value at the time we yield to the context body. Therefore we need + this builder object so we can provide a stand-in that will be populated as the + context body runs. We keep a `ready` flag to prevent access to the data before + the simulation (and the context) has completed. + """ + + ready: bool + dim: SimDimensions | None + requested: list[_Entry] + actual: list[_Entry] + + def __init__(self): + self.ready = False + self.dim = None + self.requested = [] + self.actual = [] + + def _get_dim(self) -> SimDimensions: + """Checks that the class is in a valid state and, if so, returns SimDimensions.""" + if not self.ready or self.dim is None: + msg = "Invalid state: MovementData cannot be accessed until the simulation is complete." + raise RuntimeError(msg) + return self.dim + + def requested_by(self, clause: str) -> NDArray[SimDType]: + T, N, _, _ = self._get_dim().TNCE + result = np.zeros((T, N, N), dtype=SimDType) + for (name, tick, data) in self.requested: + if name == clause: + result[tick, :, :] = data + return result + + def actual_by(self, clause: str) -> NDArray[SimDType]: + T, N, C, _ = self._get_dim().TNCE + result = np.zeros((T, N, N, C), dtype=SimDType) + for (name, tick, data) in self.actual: + if name == clause: + result[tick, :, :, :] = data + return result + + def requested_all(self) -> NDArray[SimDType]: + T, N, _, _ = self._get_dim().TNCE + result = np.zeros((T, N, N), dtype=SimDType) + for (_name, tick, data) in self.requested: + result[tick, :, :] += data + return result + + def actual_all(self) -> NDArray[SimDType]: + T, N, C, _ = self._get_dim().TNCE + result = np.zeros((T, N, N, C), dtype=SimDType) + for (_name, tick, data) in self.actual: + result[tick, :, :, :] += data + return result + + +@contextmanager +def movement_data(sim: SimWithEvents) -> Generator[MovementData, None, None]: + """ + Run a simulation in this context in order to collect detailed movement data + throughout the simulation run. This returns a MovementData object which + can be used -- after this context is exits -- to retrieve the movement data. + """ + md = _MovementDataBuilder() + + def on_start(e: OnStart): + nonlocal md + md.dim = e.dim + + def on_clause(e: OnMovementClause): + nonlocal md + md.requested.append(_Entry(e.clause_name, e.tick, e.requested)) + md.actual.append(_Entry(e.clause_name, e.tick, e.actual)) + + with subscriptions() as sub: + sub.subscribe(sim.on_start, on_start) + sub.subscribe(sim.on_movement_clause, on_clause) + yield md + md.ready = True diff --git a/epymorph/simulation.py b/epymorph/simulation.py index cd85bf76..dac8f641 100644 --- a/epymorph/simulation.py +++ b/epymorph/simulation.py @@ -5,15 +5,14 @@ from datetime import date, timedelta from functools import partial from importlib import reload -from typing import (Any, Callable, NamedTuple, Protocol, Self, Sequence, - runtime_checkable) +from typing import Any, Callable, NamedTuple, Protocol, Self, Sequence import numpy as np from numpy.random import SeedSequence from numpy.typing import NDArray from epymorph.code import ImmutableNamespace, base_namespace -from epymorph.util import Event, pairwise_haversine, row_normalize +from epymorph.util import pairwise_haversine, row_normalize SimDType = np.int64 """ @@ -132,82 +131,6 @@ def build(cls, tau_step_lengths: Sequence[float], days: int, nodes: int, compart """ -class OnStart(NamedTuple): - """The payload of a Simulation on_start event.""" - dim: SimDimensions - time_frame: TimeFrame - - -class SimTick(NamedTuple): - """The payload of a Simulation tick event.""" - tick_index: int - percent_complete: float - - -class FetchStart(NamedTuple): - """The payload of a DynamicGeo fetch_start event.""" - adrio_len: int - - -class AdrioStart(NamedTuple): - """The payload of a DynamicGeo adrio_start event.""" - attribute: str - index: int | None - """An index assigned to this ADRIO if fetching ADRIOs as a batch.""" - adrio_len: int | None - """The total number of ADRIOs being fetched if fetching ADRIOs as a batch.""" - - -@runtime_checkable -class SimulationEvents(Protocol): - """ - Protocol for Simulations that support lifecycle events. - For correct operation, ensure that `on_start` is fired first, - then `on_tick` at least once, then finally `on_end`. - """ - - on_start: Event[OnStart] - """ - Event fires at the start of a simulation run. Payload is a subset of the context for this run. - """ - - on_tick: Event[SimTick] | None - """ - Optional event which fires after each tick has been processed. - Event payload is a tuple containing the tick index just completed (an integer), - and the percentage complete (a float). - """ - - on_end: Event[None] - """ - Event fires after a simulation run is complete. - """ - - -@runtime_checkable -class DynamicGeoEvents(Protocol): - """ - Protocol for DynamicGeos that support lifecycle events. - For correct operation, ensure that `fetch_start` is fired first, - then `adrio_start` any number of times, then finally `fetch_end`. - """ - - fetch_start: Event[FetchStart] - """ - Event that fires when geo begins fetching attributes. Payload is the number of ADRIOs. - """ - - adrio_start: Event[AdrioStart] - """ - Event that fires when an individual ADRIO begins data retreival. Payload is the attribute name and index. - """ - - fetch_end: Event[None] - """ - Event that fires when data retreival is complete. - """ - - def enable_logging(filename: str = 'debug.log', movement: bool = True) -> None: """Enable simulation logging to file.""" reload(logging) diff --git a/epymorph/util.py b/epymorph/util.py index c2807636..66bd1c39 100644 --- a/epymorph/util.py +++ b/epymorph/util.py @@ -305,6 +305,11 @@ def publish(self, event: T) -> None: for subscriber in self._subscribers: subscriber(event) + @property + def has_subscribers(self) -> bool: + """True if at least one listener is subscribed to this event.""" + return len(self._subscribers) > 0 + class Subscriber: """