Skip to content

Commit

Permalink
Added context for collecting movement data.
Browse files Browse the repository at this point in the history
And a devlog demonstration.
  • Loading branch information
Tyler Coles committed Mar 13, 2024
1 parent df83f22 commit 509e215
Show file tree
Hide file tree
Showing 15 changed files with 985 additions and 140 deletions.
425 changes: 425 additions & 0 deletions doc/devlog/2024-03-13.ipynb

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions doc/devlog/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ This folder is a handy place to put Jupyter notebooks or other documents which h
| 2024-02-06.ipynb | Tyler | | Revisiting age-class IPMs, and thinking about modularity of approach. |
| 2024-02-12.ipynb | Tyler | | Continued age-class IPM work, this time in more than one geo node. |
| 2024-03-01.ipynb | Tyler | | Getting the indices of IPM events and compartments by name with wildcard support. |
| 2024-03-13.ipynb | Tyler | | Showing off movement data collection (NEW!) |

## Contributing

Expand Down
88 changes: 59 additions & 29 deletions epymorph/engine/mm_exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
Movement executor classes handle the logic for processing the movement step of the simulation.
"""
from abc import ABC, abstractmethod
from logging import DEBUG, Logger, getLogger

import numpy as np
from numpy.typing import NDArray

from epymorph.engine.context import RumeContext
from epymorph.engine.world import World
from epymorph.error import AttributeException, MmCompileException
from epymorph.event import (MovementEventsMixin, OnMovementClause,
OnMovementFinish, OnMovementStart)
from epymorph.movement.compile import compile_spec
from epymorph.movement.movement_model import (MovementModel, PredefParams,
TravelClause)
Expand All @@ -35,25 +36,24 @@ def apply(self, world: World, tick: Tick) -> None:
############################################################


class StandardMovementExecutor(MovementExecutor):
class StandardMovementExecutor(MovementEventsMixin, MovementExecutor):
"""The standard implementation of movement model execution."""

_ctx: RumeContext
_log: Logger
_model: MovementModel
_clause_masks: dict[TravelClause, NDArray[np.bool_]]
_predef: PredefParams = {}
_predef_hash: int | None = None

def __init__(self, ctx: RumeContext):
super().__init__()
# If we were given a MovementSpec, we need to compile it to get its clauses.
if isinstance(ctx.mm, MovementSpec):
self._model = compile_spec(ctx.mm, ctx.rng)
else:
self._model = ctx.mm

self._ctx = ctx
self._log = getLogger('movement')
self._clause_masks = {c: c.mask(ctx) for c in self._model.clauses}
self._check_predef()

Expand All @@ -76,11 +76,14 @@ def _check_predef(self) -> None:

def apply(self, world: World, tick: Tick) -> None:
"""Applies movement for this tick, mutating the world state."""
self._log.debug('Processing movement for day %s, step %s', tick.day, tick.step)

self.on_movement_start.publish(
OnMovementStart(tick.index, tick.day, tick.step))

self._check_predef()

# Process travel clauses.
total = 0
for clause in self._model.clauses:
if not clause.predicate(self._ctx, tick):
continue
Expand All @@ -89,10 +92,30 @@ def apply(self, world: World, tick: Tick) -> None:
returns = clause.returns(self._ctx, tick)
return_tick = self._ctx.resolve_tick(tick, returns)
world.apply_travel(travelers, return_tick)
total += travelers.sum()

# Process return clause.
return_movers = world.apply_return(tick)
self._log.getChild('Return').debug("moved %d", return_movers)
# return_requested, return_actual, return_total = world.apply_return(tick)
return_movers = world.apply_return(tick, return_stats=True)
if return_movers is not None:
return_total = return_movers.sum()
total += return_total

self.on_movement_clause.publish(
OnMovementClause(
tick.index,
tick.day,
tick.step,
"return",
return_movers,
return_movers,
return_total,
False,
)
)

self.on_movement_finish.publish(
OnMovementFinish(tick.index, tick.day, tick.step, total))

def _travelers(self, clause: TravelClause, tick: Tick, local_cohorts: NDArray[SimDType]) -> NDArray[SimDType]:
"""
Expand All @@ -101,38 +124,35 @@ def _travelers(self, clause: TravelClause, tick: Tick, local_cohorts: NDArray[Si
then selects exactly which individuals (by compartment) should move.
Returns an (N,N,C) array; from-source-to-destination-by-compartment.
"""
clause_log = self._log.getChild(clause.name)
_, N, C, _ = self._ctx.dim.TNCE

requested_movers = clause.requested(self._ctx, self._predef, tick)
np.fill_diagonal(requested_movers, 0)
requested_sum = requested_movers.sum(axis=1, dtype=SimDType)
clause_movers = clause.requested(self._ctx, self._predef, tick)
np.fill_diagonal(clause_movers, 0)
clause_sum = clause_movers.sum(axis=1, dtype=SimDType)

available_movers = local_cohorts * self._clause_masks[clause]
available_sum = available_movers.sum(axis=1, dtype=SimDType)

# If requested total is greater than the total available,
# If clause requested total is greater than the total available,
# use mvhg to select as many as possible.
throttled = False
for src in range(N):
if requested_sum[src] > available_sum[src]:
msg = "<WARNING> movement throttled for insufficient population at %d"
clause_log.debug(msg, src)
requested_movers[src, :] = self._ctx.rng.multivariate_hypergeometric(
colors=requested_movers[src, :],
nsample=available_sum[src]
)
throttled = True

# Update sum if it changed in the previous step.
if throttled:
if not np.any(clause_sum > available_sum):
throttled = False
requested_movers = clause_movers
requested_sum = clause_sum
else:
throttled = True
requested_movers = clause_movers.copy()
for src in range(N):
if clause_sum[src] > available_sum[src]:
requested_movers[src, :] = self._ctx.rng.multivariate_hypergeometric(
colors=requested_movers[src, :],
nsample=available_sum[src]
)
requested_sum = requested_movers.sum(axis=1, dtype=SimDType)

# The probability a mover from a src will go to a dst.
requested_prb = row_normalize(requested_movers, requested_sum, dtype=SimDType)

clause_log.debug("requested_movers:\n%s", requested_movers)

travelers_cs = np.zeros((N, N, C), dtype=SimDType)
for src in range(N):
if requested_sum[src] == 0:
Expand All @@ -151,7 +171,17 @@ def _travelers(self, clause: TravelClause, tick: Tick, local_cohorts: NDArray[Si
requested_prb[src, :]
).T.astype(SimDType)

if clause_log.isEnabledFor(DEBUG):
clause_log.debug("moved %d", requested_sum.sum())
self.on_movement_clause.publish(
OnMovementClause(
tick.index,
tick.day,
tick.step,
clause.name,
clause_movers,
travelers_cs,
requested_sum.sum(),
throttled,
)
)

return travelers_cs
33 changes: 22 additions & 11 deletions epymorph/engine/standard_sim.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
from epymorph.error import (AttributeException, CompilationException,
InitException, IpmSimException, MmSimException,
ValidationException, error_gate)
from epymorph.event import (MovementEventsMixin, OnStart, OnTick,
SimulationEventsMixin)
from epymorph.geo.geo import Geo
from epymorph.initializer import DEFAULT_INITIALIZER, Initializer
from epymorph.movement.movement_model import MovementModel, validate_mm
from epymorph.movement.parser import MovementSpec
from epymorph.params import ContextParams, Params
from epymorph.simulation import (OnStart, SimDimensions, SimDType, SimTick,
SimulationEvents, TimeFrame)
from epymorph.util import Event
from epymorph.simulation import SimDimensions, SimDType, TimeFrame
from epymorph.util import Subscriber


@dataclass
Expand Down Expand Up @@ -88,13 +89,12 @@ def ticks_in_days(self) -> NDArray[np.float64]:
return np.cumsum(np.tile(self.dim.tau_step_lengths, self.dim.days), dtype=np.float64)


class StandardSimulation(SimulationEvents):
class StandardSimulation(SimulationEventsMixin, MovementEventsMixin):
"""Runs singular simulation passes, producing time-series output."""

_config: RumeConfig
_params: ContextParams | None = None
geo: Geo
on_tick: Event[SimTick] # this class supports on_tick; so narrow the type def

def __init__(self,
geo: Geo,
Expand All @@ -104,6 +104,9 @@ def __init__(self,
time_frame: TimeFrame,
initializer: Initializer | None = None,
rng: Callable[[], np.random.Generator] | None = None):
SimulationEventsMixin.__init__(self)
MovementEventsMixin.__init__(self)

self.geo = geo
if initializer is None:
initializer = DEFAULT_INITIALIZER
Expand All @@ -112,11 +115,6 @@ def __init__(self,

self._config = RumeConfig(geo, ipm, mm, params, time_frame, initializer, rng)

# events
self.on_start = Event()
self.on_tick = Event()
self.on_end = Event()

def validate(self) -> None:
"""Validate the simulation."""
with error_gate("validating the simulation", ValidationException, CompilationException):
Expand Down Expand Up @@ -148,11 +146,22 @@ def run(self) -> Output:
Run the simulation. It is safe to call this multiple times
to run multiple independent simulations with the same configuraiton.
"""
event_subs = Subscriber()

with error_gate("compiling the simulation", CompilationException):
ctx = RumeContext.from_config(self._config)
ipm_exec = StandardIpmExecutor(ctx)
movement_exec = StandardMovementExecutor(ctx)

# Proxy the movement_exec's events, if anyone is listening for them.
if MovementEventsMixin.has_subscribers(self):
event_subs.subscribe(movement_exec.on_movement_start,
self.on_movement_start.publish)
event_subs.subscribe(movement_exec.on_movement_clause,
self.on_movement_clause.publish)
event_subs.subscribe(movement_exec.on_movement_finish,
self.on_movement_finish.publish)

with error_gate("initializing the simulation", InitException):
ini = ctx.initialize()
world = ListWorld.from_initials(ini)
Expand All @@ -173,9 +182,11 @@ def run(self) -> Output:
out.prevalence[tick.index] = tick_prevalence

t = tick.index
self.on_tick.publish(SimTick(t, (t + 1) / ctx.dim.ticks))
self.on_tick.publish(OnTick(t, (t + 1) / ctx.dim.ticks))

self.on_end.publish(None)

event_subs.unsubscribe()
return out


Expand Down
2 changes: 1 addition & 1 deletion epymorph/engine/test/world_list_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def test_return(self):
Cohort(np.array([19]), 0, 2),
]])

world.apply_return(Tick(1, 1, date(2023, 1, 1), 0, 1.0))
world.apply_return(Tick(1, 1, date(2023, 1, 1), 0, 1.0), return_stats=False)

self.assertWorld(
world.locations,
Expand Down
4 changes: 3 additions & 1 deletion epymorph/engine/world.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ def apply_travel(self, travelers: NDArray[SimDType], return_tick: int) -> None:
"""

@abstractmethod
def apply_return(self, tick: Tick) -> int:
def apply_return(self, tick: Tick, *, return_stats: bool) -> NDArray[SimDType] | None:
"""
Modify the world state as a result of returning all movers that are ready to be returned home.
If `return_stats` is True, returns an NxNxC array containing the individuals moved during the return.
Otherwise returns None.
"""
13 changes: 8 additions & 5 deletions epymorph/engine/world_hypercube.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,14 @@ def apply_travel(self, travelers: NDArray[SimDType], return_tick: int) -> None:
self.ledger[return_tick + 1, :, :, :] += travelers
self.time_frontier = max(self.time_frontier, return_tick + 2)

def apply_return(self, tick: Tick) -> int:
total = self.ledger[self.time_offset + 1, :, :, :].sum(dtype=SimDType)
movers = self.ledger[self.time_offset + 1, :, :, :].sum(
def apply_return(self, tick: Tick, *, return_stats: bool) -> NDArray[SimDType] | None:
# we have to transpose the movers "stats" result since they're being stored here as
# (home, visiting) and our result needs to be
# (moving from "visiting", moving to "home")
movers = self.ledger[self.time_offset + 1, :, :, :].transpose((1, 0, 2)).copy()
movers_by_home = self.ledger[self.time_offset + 1, :, :, :].sum(
axis=1, dtype=SimDType) * self._ident
self.ledger[self.time_offset + 1, :, :, :] = movers + \
self.ledger[self.time_offset + 1, :, :, :] = movers_by_home + \
self.ledger[self.time_offset, :, :, :]
self.time_offset += 1 # assumes there's only ever one return clause per tick
return total
return movers if return_stats else None
26 changes: 17 additions & 9 deletions epymorph/engine/world_list.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""World implementation: ListWorld."""
from operator import attrgetter
from typing import Iterable, Self
from typing import Any, Iterable, Self

import numpy as np
from numpy.typing import NDArray
Expand Down Expand Up @@ -64,6 +64,7 @@ class ListWorld(World):
"""The value of a population's `return_tick` when the population is home."""

nodes: int
compartments: int
locations: list[list[Cohort]]

@classmethod
Expand All @@ -79,6 +80,7 @@ def from_initials(cls, initial_compartments: NDArray[SimDType]) -> Self:

def __init__(self, locations: list[list[Cohort]]):
self.nodes = len(locations)
self.compartments = len(locations[0][0].compartments)
self.locations = locations

def normalize(self) -> None:
Expand Down Expand Up @@ -133,22 +135,28 @@ def apply_travel(self, travelers: NDArray[SimDType], return_tick: int) -> None:

self.normalize()

def apply_return(self, tick: Tick) -> int:
total_movers = 0
def apply_return(self, tick: Tick, *, return_stats: bool) -> NDArray[SimDType] | None:
movers: Any = None
if return_stats:
size = (self.nodes, self.nodes, self.compartments)
movers = np.zeros(size, dtype=SimDType)

next_locations = [[locals] for locals in self.get_local_cohorts()]
for loc_index, loc in enumerate(self.locations):
for cohort in loc:
for i, location in enumerate(self.locations):
for cohort in location:
if cohort.return_tick == ListWorld.HOME_TICK:
# locals are already where they need to be
continue
elif cohort.return_tick == tick.index:
# cohort ready to go home, merge with locals
next_locations[cohort.return_location][0].compartments += cohort.compartments
total_movers += cohort.compartments.sum()
j = cohort.return_location
next_locations[j][0].compartments += cohort.compartments
if return_stats:
movers[i, j, :] = cohort.compartments
else:
# cohort staying
next_locations[loc_index].append(cohort)
next_locations[i].append(cohort)

self.locations = next_locations
self.normalize()
return total_movers
return movers if return_stats else None
Loading

0 comments on commit 509e215

Please sign in to comment.