From f2afa9f85d86d38cd7cfd977bb030f64887f0769 Mon Sep 17 00:00:00 2001 From: Tyler Coles Date: Wed, 14 Aug 2024 18:05:01 -0700 Subject: [PATCH] MM classes implementation. --- epymorph/data/__init__.py | 1 - epymorph/data/mm/centroids.movement | 25 -- epymorph/data/mm/centroids.py | 60 ++++ epymorph/data/mm/flat.movement | 27 -- epymorph/data/mm/flat.py | 53 +++ epymorph/data/mm/icecube.movement | 18 - epymorph/data/mm/icecube.py | 43 +++ epymorph/data/mm/no.movement | 10 - epymorph/data/mm/no.py | 29 ++ epymorph/data/mm/pei.movement | 53 --- epymorph/data/mm/pei.py | 88 +++++ epymorph/data/mm/sparsemod.movement | 32 -- epymorph/data/mm/sparsemod.py | 65 ++++ epymorph/data/registry.py | 57 ++- epymorph/movement/__init__.py | 0 epymorph/movement/compile.py | 336 ------------------ epymorph/movement/movement_model.py | 134 ------- epymorph/movement/parser.py | 226 ------------ epymorph/movement/parser_util.py | 186 ---------- epymorph/movement/test/__init__.py | 0 epymorph/movement/test/compile_test.py | 57 --- epymorph/movement/test/parser_test.py | 111 ------ epymorph/movement_model.py | 276 ++++++++++++++ epymorph/rume.py | 152 ++++---- epymorph/simulation.py | 114 ++++-- epymorph/simulator/basic/basic_simulator.py | 4 +- epymorph/simulator/basic/mm_exec.py | 141 +++----- .../basic/test/basic_simulator_test.py | 4 +- epymorph/simulator/data.py | 12 +- epymorph/simulator/test/data_test.py | 2 +- epymorph/test/movement_model_test.py | 163 +++++++++ epymorph/test/rume_test.py | 95 +++-- 32 files changed, 1085 insertions(+), 1489 deletions(-) delete mode 100644 epymorph/data/mm/centroids.movement create mode 100644 epymorph/data/mm/centroids.py delete mode 100644 epymorph/data/mm/flat.movement create mode 100644 epymorph/data/mm/flat.py delete mode 100644 epymorph/data/mm/icecube.movement create mode 100644 epymorph/data/mm/icecube.py delete mode 100644 epymorph/data/mm/no.movement create mode 100644 epymorph/data/mm/no.py delete mode 100644 epymorph/data/mm/pei.movement create mode 100644 epymorph/data/mm/pei.py delete mode 100644 epymorph/data/mm/sparsemod.movement create mode 100644 epymorph/data/mm/sparsemod.py delete mode 100644 epymorph/movement/__init__.py delete mode 100644 epymorph/movement/compile.py delete mode 100644 epymorph/movement/movement_model.py delete mode 100644 epymorph/movement/parser.py delete mode 100644 epymorph/movement/parser_util.py delete mode 100644 epymorph/movement/test/__init__.py delete mode 100644 epymorph/movement/test/compile_test.py delete mode 100644 epymorph/movement/test/parser_test.py create mode 100644 epymorph/movement_model.py create mode 100644 epymorph/test/movement_model_test.py diff --git a/epymorph/data/__init__.py b/epymorph/data/__init__.py index 6b4a3e5b..c859c51f 100644 --- a/epymorph/data/__init__.py +++ b/epymorph/data/__init__.py @@ -7,5 +7,4 @@ 'geo_library_static', 'ipm_library', 'mm_library', - 'mm_library_parsed', ] diff --git a/epymorph/data/mm/centroids.movement b/epymorph/data/mm/centroids.movement deleted file mode 100644 index a9715d2e..00000000 --- a/epymorph/data/mm/centroids.movement +++ /dev/null @@ -1,25 +0,0 @@ -[move-steps: per-day=2; duration=[1/3, 2/3]] - -[attrib: name=population; type=int; shape=N; default_value=None; - comment="The total population at each node."] - -[attrib: name=centroid; type=[(longitude, float), (latitude, float)]; shape=N; default_value=None; - comment="The centroids for each node as (longitude, latitude) tuples."] - -[attrib: name=phi; type=float; shape=S; default_value=40.0; - comment="Influences the distance that movers tend to travel."] - -[predef: function = -def centroids_movement(): - centroid = data['centroid'] - distance = pairwise_haversine(centroid['longitude'], centroid['latitude']) - dispersal_kernel = row_normalize(1 / np.exp(distance / data['phi'])) - return { 'dispersal_kernel': dispersal_kernel } -] - -# Commuter movement: assume 10% of the population are commuters -[mtype: days=all; leave=1; duration=0d; return=2; function= -def centroids_commuters(t): - n_commuters = np.floor(data['population'] * 0.1).astype(SimDType) - return np.multinomial(n_commuters, predef['dispersal_kernel']) -] diff --git a/epymorph/data/mm/centroids.py b/epymorph/data/mm/centroids.py new file mode 100644 index 00000000..a48d3348 --- /dev/null +++ b/epymorph/data/mm/centroids.py @@ -0,0 +1,60 @@ +from functools import cached_property + +import numpy as np +from numpy.typing import NDArray + +from epymorph.data import registry +from epymorph.data_shape import Shapes +from epymorph.data_type import CentroidType, SimDType +from epymorph.movement_model import EveryDay, MovementClause, MovementModel +from epymorph.simulation import AttributeDef, Tick, TickDelta, TickIndex +from epymorph.util import pairwise_haversine, row_normalize + + +class CentroidsClause(MovementClause): + """The clause of the centroids model.""" + requirements = ( + AttributeDef('population', int, Shapes.N, + comment="The total population at each node."), + AttributeDef('centroid', CentroidType, Shapes.N, + comment="The centroids for each node as (longitude, latitude) tuples."), + AttributeDef('phi', float, Shapes.S, default_value=40.0, + comment="Influences the distance that movers tend to travel."), + AttributeDef('commuter_proportion', float, Shapes.S, default_value=0.1, + comment="Decides what proportion of the total population should be commuting normally.") + ) + + predicate = EveryDay() + leaves = TickIndex(step=0) + returns = TickDelta(step=1, days=0) + + @cached_property + def dispersal_kernel(self) -> NDArray[np.float64]: + """ + The NxN matrix or dispersal kernel describing the tendency for movers to move to a particular location. + In this model, the kernel is: + 1 / e ^ (distance / phi) + which is then row-normalized. + """ + centroid = self.data('centroid') + phi = self.data('phi') + distance = pairwise_haversine(centroid['longitude'], centroid['latitude']) + return row_normalize(1 / np.exp(distance / phi)) + + def evaluate(self, tick: Tick) -> NDArray[np.int64]: + pop = self.data('population') + comm_prop = self.data('commuter_proportion') + n_commuters = np.floor(pop * comm_prop).astype(SimDType) + return self.rng.multinomial(n_commuters, self.dispersal_kernel) + + +@registry.mm('centroids') +class Centroids(MovementModel): + """ + The centroids MM describes a basic commuter movement where a fixed proportion + of the population commutes every day, travels to another location for 1/3 of a day + (with a location likelihood that decreases with distance), and then returns home for + the remaining 2/3 of the day. + """ + steps = (1 / 3, 2 / 3) + clauses = (CentroidsClause(),) diff --git a/epymorph/data/mm/flat.movement b/epymorph/data/mm/flat.movement deleted file mode 100644 index 5bcdea2a..00000000 --- a/epymorph/data/mm/flat.movement +++ /dev/null @@ -1,27 +0,0 @@ -# This model evenly weights the probability of movement to all other nodes. -# It uses parameter 'commuter_proportion' to determine how many people should -# be moving, based on the total normal population of each node. - -[move-steps: per-day=2; duration=[1/3, 2/3]] - -[attrib: name=population; type=int; shape=N; default_value=None; - comment="The total population at each node."] - -[attrib: name=commuter_proportion; type=float; shape=S; default_value=0.2; - comment="The ratio of the total population that is assumed to be commuters."] - -[predef: function= -def flat_predef(): - ones = np.ones((dim.nodes, dim.nodes)) - np.fill_diagonal(ones, 0) - dispersal_kernel = row_normalize(ones) - return { 'dispersal_kernel': dispersal_kernel } -] - -# Assume a percentage of the population move around, -# evenly weighted to all other locations. -[mtype: days=all; leave=1; duration=0d; return=2; function= -def flat_movement(t): - n_commuters = np.floor(data['population'] * data['commuter_proportion']).astype(SimDType) - return np.multinomial(n_commuters, predef['dispersal_kernel']) -] diff --git a/epymorph/data/mm/flat.py b/epymorph/data/mm/flat.py new file mode 100644 index 00000000..73aed334 --- /dev/null +++ b/epymorph/data/mm/flat.py @@ -0,0 +1,53 @@ +from functools import cached_property + +import numpy as np +from numpy.typing import NDArray + +from epymorph.data import registry +from epymorph.data_shape import Shapes +from epymorph.data_type import SimDType +from epymorph.movement_model import EveryDay, MovementClause, MovementModel +from epymorph.simulation import AttributeDef, Tick, TickDelta, TickIndex +from epymorph.util import row_normalize + + +class FlatClause(MovementClause): + """The clause of the flat model.""" + requirements = ( + AttributeDef('population', int, Shapes.N, + comment="The total population at each node."), + AttributeDef('commuter_proportion', float, Shapes.S, default_value=0.1, + comment="Decides what proportion of the total population should be commuting normally.") + ) + + predicate = EveryDay() + leaves = TickIndex(step=0) + returns = TickDelta(step=1, days=0) + + @cached_property + def dispersal_kernel(self) -> NDArray[np.float64]: + """ + The NxN matrix or dispersal kernel describing the tendency for movers to move to a particular location. + In this model, the kernel is full of 1s except for 0s on the diagonal, which is then row-normalized. + Effectively: every destination is equally likely. + """ + ones = np.ones((self.dim.nodes, self.dim.nodes)) + np.fill_diagonal(ones, 0) + return row_normalize(ones) + + def evaluate(self, tick: Tick) -> NDArray[SimDType]: + pop = self.data('population') + comm_prop = self.data('commuter_proportion') + n_commuters = np.floor(pop * comm_prop).astype(SimDType) + return self.rng.multinomial(n_commuters, self.dispersal_kernel) + + +@registry.mm('flat') +class Flat(MovementModel): + """ + This model evenly weights the probability of movement to all other nodes. + It uses parameter 'commuter_proportion' to determine how many people should + be moving, based on the total normal population of each node. + """ + steps = (1 / 3, 2 / 3) + clauses = (FlatClause(),) diff --git a/epymorph/data/mm/icecube.movement b/epymorph/data/mm/icecube.movement deleted file mode 100644 index 70d90411..00000000 --- a/epymorph/data/mm/icecube.movement +++ /dev/null @@ -1,18 +0,0 @@ -# A toy example: ice cube tray movement movement model -# Each state sends a fixed number of commuters to the next -# state in the line (without wraparound). - -[move-steps: per-day=2; duration=[2/3, 1/3]] - -[attrib: name=population; type=int; shape=N; default_value=None; - comment="The total population at each node."] - -[mtype: days=all; leave=1; duration=0d; return=2; function= -def icecube(t, src): - # using `zeros_like` here is just a convenient way - # to make an empty integer array of the correct length - commuters = np.zeros_like(data['population']) - if (src + 1) < commuters.size: - commuters[src + 1] = 5000 - return commuters -] diff --git a/epymorph/data/mm/icecube.py b/epymorph/data/mm/icecube.py new file mode 100644 index 00000000..bff28c4e --- /dev/null +++ b/epymorph/data/mm/icecube.py @@ -0,0 +1,43 @@ +import numpy as np +from numpy.typing import NDArray + +from epymorph.data import registry +from epymorph.data_shape import Shapes +from epymorph.data_type import SimDType +from epymorph.movement_model import EveryDay, MovementClause, MovementModel +from epymorph.simulation import AttributeDef, Tick, TickDelta, TickIndex + + +class IcecubeClause(MovementClause): + """The clause of the icecube model.""" + requirements = ( + AttributeDef('population', int, Shapes.N, + comment="The total population at each node."), + AttributeDef('commuter_proportion', float, Shapes.S, default_value=0.1, + comment="Decides what proportion of the total population should be commuting normally.") + ) + + predicate = EveryDay() + leaves = TickIndex(step=0) + returns = TickDelta(step=1, days=0) + + def evaluate(self, tick: Tick) -> NDArray[np.int64]: + N = self.dim.nodes + pop = self.data('population') + comm_prop = self.data('commuter_proportion') + commuters = np.zeros((N, N), dtype=SimDType) + for src in range(N): + if (src + 1) < N: + commuters[src, src + 1] = pop[src] * comm_prop + return commuters + + +@registry.mm('icecube') +class Icecube(MovementModel): + """ + A toy example: ice cube tray movement movement model + Each state sends a fixed number of commuters to the next + state in the line (without wraparound). + """ + steps = (1 / 2, 1 / 2) + clauses = (IcecubeClause(),) diff --git a/epymorph/data/mm/no.movement b/epymorph/data/mm/no.movement deleted file mode 100644 index 7c0ec44e..00000000 --- a/epymorph/data/mm/no.movement +++ /dev/null @@ -1,10 +0,0 @@ -# No movement at all. -# Obviously this isn't the most-efficient-imaginable way to accomplish this, -# but this way we don't have to write any new code, so here we are. - -[move-steps: per-day=1; duration=[1]] - -[mtype: days=all; leave=1; duration=0d; return=1; function= -def no(t, src, dst): - return 0 -] diff --git a/epymorph/data/mm/no.py b/epymorph/data/mm/no.py new file mode 100644 index 00000000..8940fd69 --- /dev/null +++ b/epymorph/data/mm/no.py @@ -0,0 +1,29 @@ +import numpy as np +from numpy.typing import NDArray + +from epymorph.data import registry +from epymorph.data_type import SimDType +from epymorph.movement_model import EveryDay, MovementClause, MovementModel +from epymorph.simulation import Tick, TickDelta, TickIndex + + +class NoClause(MovementClause): + """The clause of the "no" model.""" + requirements = () + predicate = EveryDay() + leaves = TickIndex(step=0) + returns = TickDelta(step=0, days=0) + + def evaluate(self, tick: Tick) -> NDArray[np.int64]: + N = self.dim.nodes + return np.zeros((N, N), dtype=SimDType) + + +@registry.mm('no') +class No(MovementModel): + """ + No movement at all. This is handy for cases when you want to disable movement + in an experiment, or for testing. + """ + steps = (1.0,) + clauses = (NoClause(),) diff --git a/epymorph/data/mm/pei.movement b/epymorph/data/mm/pei.movement deleted file mode 100644 index 79ce9c80..00000000 --- a/epymorph/data/mm/pei.movement +++ /dev/null @@ -1,53 +0,0 @@ -# Modeled after the Pei influenza paper, this model simulates -# two types of movers -- regular commuters and more-randomized dispersers. -# Each is somewhat stochastic but adhere to the general shape dictated -# by the commuters array. Both kinds of movement can be "tuned" through -# their respective parameters: move_control and theta. - -[move-steps: per-day=2; duration=[1/3, 2/3]] - -[attrib: name=commuters; type=int; shape=NxN; default_value=None; - comment="A node-to-node commuters matrix."] - -[attrib: name=move_control; type=float; shape=S; default_value=0.9; - comment="A factor which modulates the number of commuters by conducting a binomial draw with this probability and the expected commuters from the commuters matrix."] - -[attrib: name=theta; type=float; shape=S; default_value=0.1; - comment="A factor which allows for randomized movement by conducting a poisson draw with this factor times the average number of commuters between two nodes from the commuters matrix."] - -[predef: function = -def pei_movement(): - """Pei style movement pre definition""" - commuters = data['commuters'] - - # Average commuters between locations. - commuters_average = (commuters + commuters.T) // 2 - - # Total commuters living in each state. - commuters_by_node = np.sum(commuters, axis=1) - - # Commuters as a ratio to the total commuters living in that state. - # For cases where there are no commuters, avoid div-by-0 errors - commuting_probability = row_normalize(commuters) - - return { - 'commuters_average': commuters_average, - 'commuters_by_node': commuters_by_node, - 'commuting_probability': commuting_probability - } -] - -# Commuter movement -[mtype: days=all; leave=1; duration=0d; return=2; function= -def commuters(t): - typical = predef['commuters_by_node'] - actual = np.binomial(typical, data['move_control']) - return np.multinomial(actual, predef['commuting_probability']) -] - -# Random dispersers movement -[mtype: days=all; leave=1; duration=0d; return=2; function= -def dispersers(t): - avg = predef['commuters_average'] - return np.poisson(avg * data['theta']) -] diff --git a/epymorph/data/mm/pei.py b/epymorph/data/mm/pei.py new file mode 100644 index 00000000..d7fce5d6 --- /dev/null +++ b/epymorph/data/mm/pei.py @@ -0,0 +1,88 @@ +from functools import cached_property + +import numpy as np +from numpy.typing import NDArray + +from epymorph.data import registry +from epymorph.data_shape import Shapes +from epymorph.data_type import SimDType +from epymorph.movement_model import EveryDay, MovementClause, MovementModel +from epymorph.simulation import AttributeDef, Tick, TickDelta, TickIndex +from epymorph.util import row_normalize + +_COMMUTERS_ATTRIB = AttributeDef('commuters', int, Shapes.NxN, + comment="A node-to-node commuters marix.") + + +class Commuters(MovementClause): + """The commuter clause of the pei model.""" + + requirements = ( + _COMMUTERS_ATTRIB, + AttributeDef('move_control', float, Shapes.S, default_value=0.9, + comment="A factor which modulates the number of commuters by conducting a binomial draw " + "with this probability and the expected commuters from the commuters matrix."), + ) + + predicate = EveryDay() + leaves = TickIndex(step=0) + returns = TickDelta(step=1, days=0) + + @cached_property + def commuters_by_node(self) -> NDArray[SimDType]: + """Total commuters living in each state.""" + commuters = self.data('commuters') + return np.sum(commuters, axis=1) + + @cached_property + def commuting_probability(self) -> NDArray[np.float64]: + """ + Commuters as a ratio to the total commuters living in that state. + For cases where there are no commuters, avoid div-by-0 errors + """ + commuters = self.data('commuters') + return row_normalize(commuters) + + def evaluate(self, tick: Tick) -> NDArray[np.int64]: + move_control = self.data('move_control') + actual = self.rng.binomial(self.commuters_by_node, move_control) + return self.rng.multinomial(actual, self.commuting_probability) + + +class Dispersers(MovementClause): + """The dispersers clause of the pei model.""" + + requirements = ( + _COMMUTERS_ATTRIB, + AttributeDef('theta', float, Shapes.S, default_value=0.1, + comment="A factor which allows for randomized movement by conducting a poisson draw " + "with this factor times the average number of commuters between two nodes " + "from the commuters matrix."), + ) + + predicate = EveryDay() + leaves = TickIndex(step=0) + returns = TickDelta(step=1, days=0) + + @cached_property + def commuters_average(self) -> NDArray[SimDType]: + """Average commuters between locations.""" + commuters = self.data('commuters') + return (commuters + commuters.T) // 2 + + def evaluate(self, tick: Tick) -> NDArray[SimDType]: + theta = self.data('theta') + return self.rng.poisson(theta * self.commuters_average) + + +@registry.mm('pei') +class Pei(MovementModel): + """ + Modeled after the Pei influenza paper, this model simulates + two types of movers -- regular commuters and more-randomized dispersers. + Each is somewhat stochastic but adhere to the general shape dictated + by the commuters array. Both kinds of movement can be "tuned" through + their respective parameters: move_control and theta. + """ + steps = (1 / 3, 2 / 3) + clauses = (Commuters(), Dispersers()) diff --git a/epymorph/data/mm/sparsemod.movement b/epymorph/data/mm/sparsemod.movement deleted file mode 100644 index b0ebeae7..00000000 --- a/epymorph/data/mm/sparsemod.movement +++ /dev/null @@ -1,32 +0,0 @@ -# Modeled after the SPARSEMOD COVID-19 paper, this model simulates -# movement using a distance kernel parameterized by phi, and using a commuters -# matrix to determine the total expected number of commuters. - -[move-steps: per-day=2; duration=[1/3, 2/3]] - -[attrib: name=centroid; type=[(longitude, float), (latitude, float)]; shape=N; default_value=None; - comment="The centroids for each node as (longitude, latitude) tuples."] - -[attrib: name=commuters; type=int; shape=NxN; default_value=None; - comment="A node-to-node commuters matrix."] - -[attrib: name=phi; type=float; shape=S; default_value=40.0; - comment="Influences the distance that movers tend to travel."] - -[predef: function = -def sparsemod_predef(): - centroid = data['centroid'] - distance = pairwise_haversine(centroid['longitude'], centroid['latitude']) - dispersal_kernel = row_normalize(1 / np.exp(distance / data['phi'])) - return { - 'commuters_by_node': np.sum(data['commuters'], axis=1), - 'dispersal_kernel': dispersal_kernel - } -] - -# Commuter movement -[mtype: days=all; leave=1; duration=0d; return=2; function= -def sparsemod_commuters(t): - n_commuters = predef['commuters_by_node'] - return np.multinomial(n_commuters, predef['dispersal_kernel']) -] diff --git a/epymorph/data/mm/sparsemod.py b/epymorph/data/mm/sparsemod.py new file mode 100644 index 00000000..05ba2989 --- /dev/null +++ b/epymorph/data/mm/sparsemod.py @@ -0,0 +1,65 @@ +from functools import cached_property + +import numpy as np +from numpy.typing import NDArray + +from epymorph.data import registry +from epymorph.data_shape import Shapes +from epymorph.data_type import CentroidType, SimDType +from epymorph.movement_model import EveryDay, MovementClause, MovementModel +from epymorph.simulation import AttributeDef, Tick, TickDelta, TickIndex +from epymorph.util import pairwise_haversine, row_normalize + + +class SparsemodClause(MovementClause): + """The clause of the sparsemod model.""" + requirements = ( + AttributeDef('commuters', int, Shapes.NxN, + comment="A node-to-node commuters marix."), + AttributeDef('centroid', CentroidType, Shapes.N, + comment="The centroids for each node as (longitude, latitude) tuples."), + AttributeDef('phi', float, Shapes.S, default_value=40.0, + comment="Influences the distance that movers tend to travel."), + ) + + predicate = EveryDay() + leaves = TickIndex(step=0) + returns = TickDelta(step=1, days=0) + + @cached_property + def commuters_by_node(self) -> NDArray[SimDType]: + """ + The number of commuters that live in any particular node + (regardless of typical commuting destination). + """ + return np.sum(self.data('commuters'), axis=1) + + @cached_property + def dispersal_kernel(self) -> NDArray[np.float64]: + """ + The NxN matrix or dispersal kernel describing the tendency for movers to move to a particular location. + In this model, the kernel is: + 1 / e ^ (distance / phi) + which is then row-normalized. + """ + centroid = self.data('centroid') + phi = self.data('phi') + distance = pairwise_haversine(centroid['longitude'], centroid['latitude']) + return row_normalize(1 / np.exp(distance / phi)) + + def evaluate(self, tick: Tick) -> NDArray[np.int64]: + return self.rng.multinomial( + self.commuters_by_node, + self.dispersal_kernel + ) + + +@registry.mm('sparsemod') +class Sparsemod(MovementModel): + """ + Modeled after the SPARSEMOD COVID-19 paper, this model simulates + movement using a distance kernel parameterized by phi, and using a commuters + matrix to determine the total expected number of commuters. + """ + steps = (1 / 3, 2 / 3) + clauses = (SparsemodClause(),) diff --git a/epymorph/data/registry.py b/epymorph/data/registry.py index 92156088..4c6fefb2 100644 --- a/epymorph/data/registry.py +++ b/epymorph/data/registry.py @@ -5,7 +5,7 @@ from importlib.abc import Traversable from importlib.resources import as_file, files from inspect import isclass, signature -from typing import Callable, Mapping, NamedTuple, TypeGuard, TypeVar, cast +from typing import Callable, Mapping, NamedTuple, TypeGuard, TypeVar from epymorph.compartment_model import CompartmentModel from epymorph.error import ModelRegistryException @@ -13,7 +13,7 @@ from epymorph.geo.dynamic import DynamicGeo, DynamicGeoFileOps from epymorph.geo.geo import Geo from epymorph.geo.static import StaticGeo, StaticGeoFileOps -from epymorph.movement.parser import MovementSpec, parse_movement_spec +from epymorph.movement_model import MovementModel from epymorph.util import as_sorted_dict ModelT = TypeVar('ModelT') @@ -36,14 +36,17 @@ def annotation(self) -> str: """Get the ID annotation name for this model type.""" return f'__epymorph_{self.name}_id__' - def get_model_id(self, func: Callable) -> str | None: + def get_model_id(self, obj: object) -> str: """Retrieves the tagged model ID.""" - value = getattr(func, self.annotation, None) - return value if isinstance(value, str) else None + value = getattr(obj, self.annotation, None) + if isinstance(value, str): + return value + else: + raise ValueError("Unable to load model ID during model registry.") - def set_model_id(self, func: Callable, model_id: str) -> None: + def set_model_id(self, obj: object, model_id: str) -> None: """Sets a model ID as a tag.""" - setattr(func, self.annotation, model_id) + setattr(obj, self.annotation, model_id) GEO_REGISTRY = _ModelRegistryInfo('geo') @@ -56,17 +59,17 @@ def set_model_id(self, func: Callable, model_id: str) -> None: def ipm(ipm_id: str): """Decorates an IPM loader so we can register it with the system.""" - def make_decorator(func: LoaderFunc[CompartmentModel]) -> LoaderFunc[CompartmentModel]: - IPM_REGISTRY.set_model_id(func, ipm_id) - return func + def make_decorator(model: type[CompartmentModel]) -> type[CompartmentModel]: + IPM_REGISTRY.set_model_id(model, ipm_id) + return model return make_decorator def mm(mm_id: str): """Decorates an IPM loader so we can register it with the system.""" - def make_decorator(func: LoaderFunc[MovementSpec]) -> LoaderFunc[MovementSpec]: - MM_REGISTRY.set_model_id(func, mm_id) - return func + def make_decorator(model: type[MovementModel]) -> type[MovementModel]: + MM_REGISTRY.set_model_id(model, mm_id) + return model return make_decorator @@ -81,7 +84,7 @@ def make_decorator(func: LoaderFunc[Geo]) -> LoaderFunc[Geo]: # Discovery and loading utilities -DiscoverT = TypeVar('DiscoverT', bound=CompartmentModel | MovementSpec | StaticGeo) +DiscoverT = TypeVar('DiscoverT', bound=CompartmentModel | MovementModel | StaticGeo) def _discover(model: _ModelRegistryInfo, library_type: type[DiscoverT]) -> Library[DiscoverT]: @@ -115,7 +118,7 @@ def is_loader(func: Callable) -> TypeGuard[LoaderFunc[DiscoverT]]: ] return { - cast(str, model.get_model_id(x)): x + model.get_model_id(x): x for mod in modules for x in mod.__dict__.values() if callable(x) and x.__module__ == mod.__name__ and is_loader(x) @@ -138,22 +141,13 @@ def _discover_classes(model: _ModelRegistryInfo, library_type: type[DiscoverT]) ] return { - cast(str, model.get_model_id(x)): x + model.get_model_id(x): x for mod in modules for x in mod.__dict__.values() if isclass(x) and issubclass(x, library_type) and x.__module__ == mod.__name__ } -def _mm_spec_loader(mm_spec_file: Traversable) -> Callable[[], MovementSpec]: - """Returns a function to load the identified movement model.""" - def load() -> MovementSpec: - with as_file(mm_spec_file) as file: - spec_string = file.read_text(encoding="utf-8") - return parse_movement_spec(spec_string) - return load - - def _geo_spec_loader(geo_spec_file: Traversable) -> Callable[[], DynamicGeo]: """Returns a function to load the identified GEO (from spec).""" def load() -> DynamicGeo: @@ -170,7 +164,6 @@ def load() -> StaticGeo: return load -_MM_DIR = files(MM_REGISTRY.path) _GEO_DIR = files(GEO_REGISTRY.path) @@ -182,17 +175,9 @@ def load() -> StaticGeo: }) """All epymorph intra-population models (by id).""" -mm_library_parsed: Library[MovementSpec] = as_sorted_dict({ - # Auto-discover all .movement files in the data/mm path. - f.name.removesuffix('.movement'): _mm_spec_loader(f) - for f in _MM_DIR.iterdir() - if f.name.endswith('.movement') -}) -"""The subset of MMs that are parsed from movement files.""" -mm_library: Library[MovementSpec] = as_sorted_dict({ - **_discover(MM_REGISTRY, MovementSpec), - **mm_library_parsed, +mm_library: ClassLibrary[MovementModel] = as_sorted_dict({ + **_discover_classes(MM_REGISTRY, MovementModel), }) """All epymorph movement models (by id).""" diff --git a/epymorph/movement/__init__.py b/epymorph/movement/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/epymorph/movement/compile.py b/epymorph/movement/compile.py deleted file mode 100644 index 5264b572..00000000 --- a/epymorph/movement/compile.py +++ /dev/null @@ -1,336 +0,0 @@ -""" -Compilation of movement models. -""" -import ast -from functools import wraps -from typing import Any, Callable, Mapping, Protocol, Sequence - -import numpy as np -from numpy.typing import NDArray - -from epymorph.code import (ImmutableNamespace, compile_function, - epymorph_namespace, parse_function) -from epymorph.data_type import SimDType -from epymorph.error import AttributeException, MmCompileException, error_gate -from epymorph.movement.movement_model import (DynamicTravelClause, - MovementContext, - MovementFunction, MovementModel, - PredefData, TravelClause) -from epymorph.movement.parser import (ALL_DAYS, DailyClause, MovementClause, - MovementSpec) -from epymorph.simulation import AttributeDef, Tick, TickDelta -from epymorph.util import identity - - -def _empty_predef(_ctx: MovementContext) -> PredefData: - """A placeholder predef function for when none is given by the movement spec.""" - return {} - - -def compile_spec( - spec: MovementSpec, - rng: np.random.Generator, - name_override: Callable[[str], str] = identity, -) -> MovementModel: - """ - Compile a movement model from a spec. Requires a reference to the random number generator - that will be used to execute the movement model. - By default, clauses will be given a name from the spec file, but you can override - that naming behavior by providing the `name_override` function. - """ - with error_gate("compiling the movement model", MmCompileException, AttributeException): - # Prepare a namespace within which to execute our movement functions. - global_namespace = _movement_global_namespace(rng) - - # Compile predef (if any). - if spec.predef is None: - predef_f = _empty_predef - else: - orig_ast = parse_function(spec.predef.function) - transformer = PredefFunctionTransformer(spec.attributes) - trns_ast = transformer.visit_and_fix(orig_ast) - predef_f = compile_function(trns_ast, global_namespace) - - return MovementModel( - tau_steps=spec.steps.step_lengths, - attributes=spec.attributes, - predef=predef_f, - clauses=[_compile_clause(c, spec.attributes, global_namespace, name_override) - for c in spec.clauses] - ) - - -def _movement_global_namespace(rng: np.random.Generator) -> dict[str, Any]: - """Make a safe namespace for user-defined movement functions.""" - def as_simdtype(func): - @wraps(func) - def wrapped_func(*args, **kwargs): - result = func(*args, **kwargs) - if np.isscalar(result): - return SimDType(result) # type: ignore - else: - return result.astype(SimDType) - return wrapped_func - - global_namespace = epymorph_namespace(SimDType) - # Add rng functions to np namespace. - np_ns = ImmutableNamespace({ - **global_namespace['np'].to_dict_shallow(), - 'poisson': as_simdtype(rng.poisson), - 'binomial': as_simdtype(rng.binomial), - 'multinomial': as_simdtype(rng.multinomial) - }) - # Add simulation details. - global_namespace |= { - 'MovementContext': MovementContext, - 'PredefData': PredefData, - 'np': np_ns, - } - return global_namespace - - -def _compile_clause( - clause: MovementClause, - model_attributes: Sequence[AttributeDef], - global_namespace: dict[str, Any], - name_override: Callable[[str], str] = identity, -) -> TravelClause: - """Compiles a movement clause in a given namespace.""" - # Parse AST for the function. - try: - orig_ast = parse_function(clause.function) - transformer = ClauseFunctionTransformer(model_attributes) - fn_ast = transformer.visit_and_fix(orig_ast) - fn = compile_function(fn_ast, global_namespace) - except MmCompileException as e: - raise e - except Exception as e: - msg = "Unable to parse and compile movement clause function." - raise MmCompileException(msg) from e - - # Handle different types of MovementClause. - match clause: - case DailyClause(): - clause_weekdays = set( - i for (i, d) in enumerate(ALL_DAYS) - if d in clause.days - ) - - def move_predicate(_ctx: MovementContext, tick: Tick) -> bool: - return clause.leave_step == tick.step and \ - tick.date.weekday() in clause_weekdays - - def returns(_ctx: MovementContext, _tick: Tick) -> TickDelta: - return TickDelta( - days=clause.duration.to_days(), - step=clause.return_step - ) - - return DynamicTravelClause( - name=name_override(fn_ast.name), - move_predicate=move_predicate, - requested=_adapt_move_function(fn, fn_ast), - returns=returns - ) - - -def _adapt_move_function(fn: Callable, fn_ast: ast.FunctionDef) -> MovementFunction: - """ - Wrap the user-provided function in order to handle functions of different arity. - Movement functions as specified by the user can have signature: - f(tick); f(tick, src); or f(tick, src, dst). - """ - match len(fn_ast.args.args): - # Remember `fn` has been transformed, so if the user gave 1 arg we added 1 for a total of 2. - case 2: - @wraps(fn) - def fn_arity1(ctx: MovementContext, tick: Tick) -> NDArray[SimDType]: - requested = fn(ctx, tick) - np.fill_diagonal(requested, 0) - return requested - return fn_arity1 - - case 3: - @wraps(fn) - def fn_arity2(ctx: MovementContext, tick: Tick) -> NDArray[SimDType]: - N = ctx.dim.nodes - requested = np.zeros((N, N), dtype=SimDType) - for n in range(N): - requested[n, :] = fn(ctx, tick, n) - np.fill_diagonal(requested, 0) - return requested - return fn_arity2 - - case 4: - @wraps(fn) - def fn_arity3(ctx: MovementContext, tick: Tick) -> NDArray[SimDType]: - N = ctx.dim.nodes - requested = np.zeros((N, N), dtype=SimDType) - for i, j in np.ndindex(N, N): - requested[i, j] = fn(ctx, tick, i, j) - np.fill_diagonal(requested, 0) - return requested - return fn_arity3 - - case invalid_num_args: - msg = f"Movement clause '{fn_ast.name}' has an invalid number of arguments ({invalid_num_args})" - raise MmCompileException(msg) - - -# Code transformers - -class HasLineNo(Protocol): - lineno: int - - -class _MovementCodeTransformer(ast.NodeTransformer): - """ - This class defines the logic that can be shared between Predef and Clause function - transformers. Some functionality might be more than is technically necessary for either - case, but only if that extra functionality is effectively harmless. - """ - - check_attributes: bool - attributes: Mapping[str, AttributeDef] - - def __init__(self, attributes: Sequence[AttributeDef]): - # NOTE: for the sake of backwards compatibility, MovementModel attribute declarations - # are optional; so our approach will be that attributes will only be checked if at least - # one attribute declaration is provided. - if len(attributes) == 0: - self.check_attributes = False - self.attributes = {} - else: - self.check_attributes = True - self.attributes = {a.name: a for a in attributes} - - def _report_line(self, node: HasLineNo): - return f"Line: {node.lineno}" - - def visit_Subscript(self, node: ast.Subscript) -> Any: - """Modify references to data and predef pseudo-dictionaries.""" - - if isinstance(node.value, ast.Name) and isinstance(node.slice, ast.Constant) and node.value.id in ['data', 'predef']: - source = node.value.id - attr_name = node.slice.value - - # Check data attributes against declarations (but ignore predefs). - if self.check_attributes and source == 'data' and attr_name not in self.attributes: - msg = f"Movement model is using an undeclared attribute: `data[{attr_name}]`. "\ - f"Please add a suitable attribute declaration. ({self._report_line(node)})" - raise MmCompileException(msg) - - # NOTE: what we are *NOT* doing is checking if usage of predef attributes are - # actually provided by the predef function. Doing this at compile time would be - # exceedingly difficult, as we'd have to scrape and analyze all code that contributes to - # the returned dictionary's keys. In simple cases this might be straight-forward, but not - # in the general case. For the time being, this will remain a simulation-time error. - - # Rewrite to access via context resolver. - return ast.Call( - func=ast.Attribute( - value=ast.Attribute( - value=ast.Name(id='ctx', ctx=ast.Load()), - attr='data', - ctx=ast.Load(), - ), - attr='resolve_name', - ctx=ast.Load(), - ), - args=[node.slice], - keywords=[], - ) - - return self.generic_visit(node) - - def visit_Attribute(self, node: ast.Attribute) -> Any: - """Modify references to objects that should be in context.""" - if isinstance(node.value, ast.Name) and node.value.id in ['dim']: - node.value = ast.Attribute( - value=ast.Name(id='ctx', ctx=ast.Load()), - attr=node.value.id, - ctx=ast.Load(), - ) - return node - return self.generic_visit(node) - - def visit_and_fix(self, node: ast.AST) -> Any: - """ - Shortcut for visiting the node and then running - ast.fix_missing_locations() on the result before returning it. - """ - transformed = self.visit(node) - ast.fix_missing_locations(transformed) - return transformed - - -class PredefFunctionTransformer(_MovementCodeTransformer): - """ - Transforms movement model predef code. This is the dual of - ClauseFunctionTransformer (below; see that for additional description), - but specialized for predef which is similar but slightly different. - Most importantly, this transforms the function signature to have the context - as the first parameter. - """ - - def _report_line(self, node: HasLineNo): - return f"predef line: {node.lineno}" - - def visit_FunctionDef(self, node: ast.FunctionDef) -> Any: - """Modify function parameters.""" - new_node = self.generic_visit(node) - if isinstance(new_node, ast.FunctionDef): - ctx_arg = ast.arg( - arg='ctx', - annotation=ast.Name(id='MovementContext', ctx=ast.Load()), - ) - new_node.args.args = [ctx_arg, *new_node.args.args] - return new_node - - -class ClauseFunctionTransformer(_MovementCodeTransformer): - """ - Transforms movement clause code so that we can pass context, etc., - via function arguments instead of the namespace. The goal is to - simplify the function interface for end users while still maintaining - good performance characteristics when parameters change during - a simulation run (i.e., not have to recompile the functions every time - the params change). - - A function like: - - def commuters(t): - typical = np.minimum( - data['population'][:], - data['commuters_by_node'], - ) - actual = np.binomial(typical, data['move_control']) - return np.multinomial(actual, predef['commuting_probability']) - - Will be rewritten as: - - def commuters(ctx, t): - typical = np.minimum( - ctx.data.resolve_name('population')[:], - ctx.data.resolve_name('commuters_by_node'), - ) - actual = np.binomial(typical, ctx.data.resolve_name('move_control')) - return np.multinomial(actual, ctx.data.resolve_name('commuting_probability')) - """ - - clause_name: str = "" - - def _report_line(self, node: HasLineNo): - return f"{self.clause_name} line: {node.lineno}" - - def visit_FunctionDef(self, node: ast.FunctionDef) -> Any: - """Modify function parameters.""" - self.clause_name = f"`{node.name}`" - new_node = self.generic_visit(node) - if isinstance(new_node, ast.FunctionDef): - ctx_arg = ast.arg( - arg='ctx', - annotation=ast.Name(id='MovementContext', ctx=ast.Load()), - ) - new_node.args.args = [ctx_arg, *new_node.args.args] - return new_node diff --git a/epymorph/movement/movement_model.py b/epymorph/movement/movement_model.py deleted file mode 100644 index b2228ce0..00000000 --- a/epymorph/movement/movement_model.py +++ /dev/null @@ -1,134 +0,0 @@ -""" -The basis of the movement model system in epymorph. -This module contains all of the elements needed to define a -movement model, but Rume of it is left to the mm_exec module. -""" -from abc import ABC, abstractmethod -from dataclasses import dataclass -from typing import Callable, Protocol - -import numpy as np -from numpy.typing import NDArray - -from epymorph.data_type import AttributeArray, SimDType -from epymorph.error import AttributeException, MmSimException -from epymorph.simulation import (AttributeDef, NamespacedAttributeResolver, - SimDimensions, Tick, TickDelta) - - -class MovementContext(Protocol): - """The subset of the RumeContext that the movement model clauses need.""" - - @property - @abstractmethod - def dim(self) -> SimDimensions: - """The simulation dimensions.""" - - @property - @abstractmethod - def rng(self) -> np.random.Generator: - """The simulation's random number generator.""" - - @property - @abstractmethod - def data(self) -> NamespacedAttributeResolver: - """The resolver for simulation data.""" - - -PredefData = dict[str, AttributeArray] -PredefClause = Callable[[MovementContext], PredefData] - - -class TravelClause(ABC): - """A clause moving individuals from their home location to another.""" - - name: str - - @abstractmethod - def predicate(self, ctx: MovementContext, tick: Tick) -> bool: - """Should this clause apply this tick?""" - - @abstractmethod - def requested(self, ctx: MovementContext, tick: Tick) -> NDArray[SimDType]: - """Evaluate this clause for the given tick, returning a requested movers array (N,N).""" - - @abstractmethod - def returns(self, ctx: MovementContext, tick: Tick) -> TickDelta: - """Calculate when this clause's movers should return (which may vary from tick-to-tick).""" - - -MovementPredicate = Callable[[MovementContext, Tick], bool] -"""A predicate which decides if a clause should fire this tick.""" - -MovementFunction = Callable[[MovementContext, Tick], NDArray[SimDType]] -""" -A function which calculates the requested number of individuals to move due to this clause this tick. -Returns an (N,N) array of integers. -""" - -ReturnsFunction = Callable[[MovementContext, Tick], TickDelta] -"""A function which decides when this clause's movers should return.""" - - -class DynamicTravelClause(TravelClause): - """ - A travel clause implementation where each method proxies to a lambda. - This allows us to build travel clauses dynamically at runtime. - """ - - name: str - - _move: MovementPredicate - _requested: MovementFunction - _returns: ReturnsFunction - - def __init__(self, - name: str, - move_predicate: MovementPredicate, - requested: MovementFunction, - returns: ReturnsFunction): - self.name = name - self._move = move_predicate - self._requested = requested - self._returns = returns - - def predicate(self, ctx: MovementContext, tick: Tick) -> bool: - return self._move(ctx, tick) - - def requested(self, ctx: MovementContext, tick: Tick) -> NDArray[SimDType]: - try: - return self._requested(ctx, tick) - except KeyError as e: - # NOTE: catching KeyError here will be necessary (to get nice error messages) - # until we can properly validate the MM clauses. - msg = f"Missing attribute {e} required by movement model clause '{self.name}'." - raise AttributeException(msg) from None - except Exception as e: - # NOTE: catching exceptions here is necessary to get nice error messages - # for some value error cause by incorrect parameter and/or clause definition - msg = f"Error from applying clause '{self.name}': see exception trace" - raise MmSimException(msg) from e - - def returns(self, ctx: MovementContext, tick: Tick) -> TickDelta: - return self._returns(ctx, tick) - - -@dataclass(frozen=True) -class MovementModel: - """ - The movement model divides a day into simulation parts (tau steps) under the assumption - that each day part will have movement characteristics relevant to the simulation. - That is: there is no reason to have tau steps smaller than 1 day unless it's relevant - to movement. - """ - - tau_steps: list[float] - """The tau steps for the simulation.""" - - attributes: list[AttributeDef] - - predef: PredefClause - """The predef clause for this movement model.""" - - clauses: list[TravelClause] - """The clauses which express the movement model""" diff --git a/epymorph/movement/parser.py b/epymorph/movement/parser.py deleted file mode 100644 index f63ab45b..00000000 --- a/epymorph/movement/parser.py +++ /dev/null @@ -1,226 +0,0 @@ -"""Parsing of MovementSpecs.""" -from typing import Literal, NamedTuple, cast - -import pyparsing as P -from pyparsing import pyparsing_common as PC - -import epymorph.movement.parser_util as p -from epymorph.error import MmParseException -from epymorph.simulation import AttributeDef - -# A MovementSpec has the following object structure: -# -# - MovementSpec -# - MoveSteps (1) -# - AttributeDef (0 or more) -# - Predef (0 or 1) -# - MovementClause (1 or more) - -############################################################ -# MoveSteps -############################################################ - - -class MoveSteps(NamedTuple): - """The data model for a MoveSteps clause.""" - step_lengths: list[float] - """The lengths of each tau step. This should sum to 1.""" - - -move_steps: P.ParserElement = p.tag('move-steps', [ - p.field('per-day', PC.integer)('num_steps'), - p.field('duration', p.num_list)('steps') -]) - - -@move_steps.set_parse_action -def marshal_move_steps(results: P.ParseResults): - """Convert a pyparsing result to a MoveSteps.""" - fields = results.as_dict() - return MoveSteps(fields['steps']) - - -############################################################ -# Attributes -############################################################ - - -attribute: P.ParserElement = p.tag('attrib', [ - p.field('name', p.name), - p.field('type', p.dtype), - p.field('shape', p.shape), - p.field('default_value', p.scalar_value | p.none), - p.field('comment', p.quoted), -]) - - -@attribute.set_parse_action -def marshal_attribute(results: P.ParseResults): - """Convert a pyparsing result to an Attribute.""" - fields = results.as_dict() - field_type = fields['type'][0] - default_value = fields['default_value'][0] - - # We can coerce integers to floats for convenience. - if field_type == float and isinstance(default_value, int): - default_value = float(default_value) - - return AttributeDef( - name=fields['name'], - type=field_type, - shape=fields['shape'], - default_value=default_value, - comment=fields['comment'], - ) - - -############################################################ -# Predef -############################################################ - - -class Predef(NamedTuple): - """The data model of a predef clause.""" - function: str - - -predef: P.ParserElement = p.tag('predef', [p.field('function', p.fn_body('function'))]) -"""The parser for a predef clause.""" - - -@predef.set_parse_action -def marshal_predef(results: P.ParseResults): - """Convert a pyparsing result into a Predef.""" - fields = results.as_dict() - return Predef(fields['function']) - - -############################################################ -# MovementClause -############################################################ - - -day_list: P.ParserElement = p.bracketed( - P.delimited_list(P.one_of('M T W Th F Sa Su')) -) -"""Parser for a square-bracketed list of days-of-the-week.""" - - -DayOfWeek = Literal['M', 'T', 'W', 'Th', 'F', 'Sa', 'Su'] -"""Type for days of the week values.""" - -ALL_DAYS: list[DayOfWeek] = ['M', 'T', 'W', 'Th', 'F', 'Sa', 'Su'] -"""A list of all days of the week values.""" - - -class DailyClause(NamedTuple): - """ - The data model for a daily movement clause. - Note: leave_step and return_step should be 0-indexed in this form. - """ - # Conversion from 1-indexed steps happens during parsing. - days: list[DayOfWeek] - leave_step: int - duration: p.Duration - return_step: int - function: str - - -daily: P.ParserElement = p.tag('mtype', [ - p.field('days', ('all' | day_list)), - p.field('leave', PC.integer), - p.field('duration', p.duration), - p.field('return', PC.integer), - p.field('function', p.fn_body) -]) -""" -Parser for a DailyClause. e.g.: -``` -[mtype: days=[all]; leave=0; duration=7d; return=0; function= -def simple_movement(t, src, dst): - return 10 -] -``` -""" - - -@daily.set_parse_action -def marshal_daily(instring: str, loc: int, results: P.ParseResults): - """Convert a pyparsing result to a Daily.""" - fields = results.as_dict() - days = fields['days'] - leave_step = fields['leave'] - duration = fields['duration'][0] - return_step = fields['return'] - function = fields['function'] - if not isinstance(days, list): - msg = f"Unsupported value for movement clause daily: days ({days})" - raise P.ParseException(instring, loc, msg) - elif days == ['all']: - days = ALL_DAYS.copy() - else: - days = cast(list[DayOfWeek], days) - if leave_step < 1: - msg = f"movement clause daily: leave step must be at least 1 (value was: {leave_step})" - raise P.ParseException(instring, loc, msg) - if return_step < 1: - msg = f"movement clause daily: return step must be at least 1 (value was: {leave_step})" - raise P.ParseException(instring, loc, msg) - return DailyClause(days, leave_step - 1, duration, return_step - 1, function) - - -MovementClause = DailyClause -"""Data classes representing all possible movement clauses.""" - - -############################################################ -# MovementSpec -############################################################ - - -class MovementSpec(NamedTuple): - """The data model for a movement model spec.""" - steps: MoveSteps - attributes: list[AttributeDef] - predef: Predef | None - clauses: list[MovementClause] - - -def parse_movement_spec(string: str) -> MovementSpec: - """Parse a MovementSpec from the given string.""" - try: - result = movement_spec.parse_string(string, parse_all=True) - return cast(MovementSpec, result[0]) - except Exception as e: - msg = "Unable to parse MovementModel." - raise MmParseException(msg) from e - - -movement_spec = P.OneOrMore( - move_steps | - attribute | - predef | - daily -).ignore(p.code_comment) -"""The parser for MovementSpec.""" - - -@movement_spec.set_parse_action -def marshal_movement(instring: str, loc: int, results: P.ParseResults): - """Convert a pyparsing result to a MovementSpec.""" - s = [x for x in results if isinstance(x, MoveSteps)] - a = [x for x in results if isinstance(x, AttributeDef)] - c = [x for x in results if isinstance(x, MovementClause)] - d = [x for x in results if isinstance(x, Predef)] - if len(s) < 1 or len(s) > 1: - msg = f"Invalid movement specification: expected 1 steps clause, but found {len(s)}." - raise P.ParseException(instring, loc, msg) - if len(c) < 1: - msg = "Invalid movement specification: expected more than one movement clause, but found 0." - raise P.ParseException(instring, loc, msg) - if len(d) > 1: - msg = f"Invalid movement specification: expected 0 or 1 predef clause, but found {len(d)}" - raise P.ParseException(instring, loc, msg) - - predef_clause = d[0] if len(d) == 1 else None - return P.ParseResults(MovementSpec(steps=s[0], attributes=a, predef=predef_clause, clauses=c)) diff --git a/epymorph/movement/parser_util.py b/epymorph/movement/parser_util.py deleted file mode 100644 index 09d4c0fc..00000000 --- a/epymorph/movement/parser_util.py +++ /dev/null @@ -1,186 +0,0 @@ -""" -Common parsing utilities. -""" -from functools import reduce -from typing import NamedTuple - -import pyparsing as P -from pyparsing import pyparsing_common as PC - -from epymorph.data_shape import parse_shape - -# It's likely this will need to move to a different package (i.e., not `movement`) -# but it's fine here for now since movement is the only thing with a spec parser. - -E = P.ParserElement -_ = P.Suppress -l = P.Literal -none = l('None') - - -@none.set_parse_action -def marshal_none(_results: P.ParseResults): - """Marshal a None value.""" - return P.ParseResults([None]) - - -quoted = P.QuotedString(quote_char='"', unquote_results=True) |\ - P.QuotedString(quote_char="'", unquote_results=True) -"""Allow both single- or double-quote-delimited strings.""" - -name: E = P.Word( - init_chars=P.srange("[a-zA-Z]"), - body_chars=P.srange("[a-zA-Z0-9_]"), -) -"""A name string, suitable for use as a Python variable name, for instance.""" - -code_comment: E = P.AtLineStart('#') + ... + P.LineEnd() -"""Parser for Python-style code comments.""" - - -def field(field_name: str, value_parser: E) -> E: - """Parser for a clause field, like `=`""" - return _(l(field_name) + l('=')) + value_parser.set_results_name(field_name) - - -def bracketed(value_parser: E) -> E: - """Wrap another parser to surround it with square brackets.""" - return _('[') + value_parser + _(']') - - -def tag(tag_name: str, fields: list[E]) -> E: - """ - Parser for a spec tag: this is a top-level item, surrounded by square brackets, - identified by a tag name and containing one-or-more fields. e.g.: - `[: =; =]` - """ - def combine(p1, p2): - return p1 + _(';') + p2 - field_list = reduce(combine, fields[1:], fields[0]) - return bracketed(_(l(tag_name) + l(":")) - field_list) - - -num_list: E = bracketed(P.delimited_list(PC.fraction | PC.fnumber)) -"""Parser for a list of numbers in brackets, like: `[1,2,3]`""" - - -class Duration(NamedTuple): - """Data class for a duration expression.""" - value: int - unit: str - - def to_days(self) -> int: - """Return the equivalent number of days.""" - if self.unit == 'd': - return self.value - elif self.unit == 'w': - return self.value * 7 - else: - raise ValueError(f"unsupported unit {self.unit}") - - -duration = P.Group(PC.integer + P.one_of('d w')) -"""Parser for a duration expression.""" - - -@duration.set_parse_action -def marshal_duration(results: P.ParseResults): - """Convert a pyparsing result to a Duration object.""" - [value, unit] = results[0] - return P.ParseResults(Duration(value, unit)) - - -fn_body = P.SkipTo(P.AtLineStart(']'))\ - .set_parse_action(lambda toks: toks.as_list()[0].strip()) -""" -Parser for a function body, which runs until the end of a clause ends. -For example, if a tag value should be a function, you can define it like this: -`p.field('function', p.fn_body('function'))` which can be used to parse things like: -``` -[: function= -def my_function(): - return 'hello world' -] -``` -""" - - -# Shapes - - -shape = P.one_of("S T N TxN NxN") -""" -Describes the dimensions of an array in terms of the simulation. -For example "TxN" describes a two-dimensional array which is the -number of simulation days in the first dimensions and the number -of geo nodes in the second dimension. See `epymorph.data_shape` for more info. -(Excludes Shapes with arbitrary dimensions for simplicity.) -""" - - -@shape.set_parse_action -def marshal_shape(results: P.ParseResults): - """Convert a pyparsing result to a Shape object.""" - value = str(results[0]) - return P.ParseResults(parse_shape(value)) - - -# dtypes - - -base_dtype = P.one_of('int float str') -"""One of epymorph's base permitted data types.""" - - -@base_dtype.set_parse_action -def marshal_base_dtype(results: P.ParseResults): - """Convert a pyparsing result to dtype object.""" - match results[0]: - case 'int': - return int - case 'float': - return float - case 'str': - return str - case x: - return P.ParseException(f"Unable to parse '{x}' as a dtype.") - - -struct_dtype_field = _('(') + name('name') + _(',') + base_dtype('dtype') + _(')') -"""A single named field in a structured dtype.""" - - -@struct_dtype_field.set_parse_action -def marshal_struct_dtype_field(results: P.ParseResults): - """Convert a pyparsing result to a tuple representing a single named field in a structured dtype.""" - return (results['name'], results['dtype']) - - -struct_dtype = bracketed(P.delimited_list(struct_dtype_field)) -"""A complete structured dtype.""" - - -@struct_dtype.set_parse_action -def marshal_struct_dtype(results: P.ParseResults): - """Convert a pyparsing results to a structured dtype object.""" - return [results.as_list()] - - -dtype = base_dtype | struct_dtype - - -# Scalar Values - - -base_scalar = quoted | PC.number - -tuple_scalar = _('(') + P.delimited_list(base_scalar, ',') + _(')') - - -@tuple_scalar.set_parse_action -def marshal_tuple_scalar(results: P.ParseResults): - """Convert a pyparsing result to a tuple of values.""" - return tuple(results.as_list()) - - -scalar_value = tuple_scalar | base_scalar diff --git a/epymorph/movement/test/__init__.py b/epymorph/movement/test/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/epymorph/movement/test/compile_test.py b/epymorph/movement/test/compile_test.py deleted file mode 100644 index 328680c8..00000000 --- a/epymorph/movement/test/compile_test.py +++ /dev/null @@ -1,57 +0,0 @@ -# pylint: disable=missing-docstring -import unittest -from unittest.mock import MagicMock - -from epymorph.code import compile_function, parse_function -from epymorph.movement.compile import (ClauseFunctionTransformer, - PredefFunctionTransformer) -from epymorph.movement.movement_model import MovementContext - - -class TestMovementClauseTransformer(unittest.TestCase): - - def test_transform_clause(self): - source = """ - def foo(t, src, dst): - return t * src * dst - """ - - ast1 = parse_function(source) - f1 = compile_function(ast1, {}) - - transformer = ClauseFunctionTransformer([]) - ast2 = transformer.visit_and_fix(ast1) - - f2 = compile_function(ast2, { - 'MovementContext': MovementContext, - }) - - ctx = MagicMock(spec=MovementContext) - - self.assertEqual(f1(3, 5, 7), 105) - self.assertEqual(f2(ctx, 3, 5, 7), 105) - with self.assertRaises(TypeError): - f2(3, 5, 7) - - def test_transform_predef(self): - source = """ - def my_predef(): - return {'foo': 42} - """ - - ast1 = parse_function(source) - f1 = compile_function(ast1, {}) - - transformer = PredefFunctionTransformer([]) - ast2 = transformer.visit_and_fix(ast1) - - f2 = compile_function(ast2, { - 'MovementContext': MovementContext, - }) - - ctx = MagicMock(spec=MovementContext) - - self.assertEqual(f1(), {'foo': 42}) - self.assertEqual(f2(ctx), {'foo': 42}) - with self.assertRaises(TypeError): - f2() diff --git a/epymorph/movement/test/parser_test.py b/epymorph/movement/test/parser_test.py deleted file mode 100644 index a7a80e39..00000000 --- a/epymorph/movement/test/parser_test.py +++ /dev/null @@ -1,111 +0,0 @@ -# pylint: disable=missing-docstring -import unittest - -from pyparsing import ParseBaseException - -from epymorph.data_shape import Shapes -from epymorph.data_type import CentroidType -from epymorph.movement.parser import (DailyClause, MoveSteps, attribute, daily, - move_steps) -from epymorph.movement.parser_util import Duration -from epymorph.simulation import AttributeDef - - -class TestMoveSteps(unittest.TestCase): - def test_successful(self): - cases = [ - '[move-steps: per-day=2; duration=[2/3, 1/3]]', - '[move-steps: per-day=1; duration=[2/3]]', - '[ move-steps : per-day = 3 ; duration = [0.25, 0.25, 0.5] ]', - '[move-steps: per-day=3; duration=[0.25, 0.25, 0.5]]', - '[move-steps:per-day=3;duration=[0.25, 0.25, 0.5]]' - ] - exps = [ - MoveSteps([2 / 3, 1 / 3]), - MoveSteps([2 / 3]), - MoveSteps([0.25, 0.25, 0.5]), - MoveSteps([0.25, 0.25, 0.5]), - MoveSteps([0.25, 0.25, 0.5]) - ] - for c, e in zip(cases, exps): - a = move_steps.parse_string(c)[0] - self.assertEqual(a, e, f"{str(a)} did not match {str(e)}") - - def test_failures(self): - cases = [ - '[move-steps: duration=[2/3, 1/3]; per-day=2]', - '[move-steps: per-day=2; duration=[]]', - '[move-steps-2: per-day=2; duration=[]]' - ] - for c in cases: - with self.assertRaises(ParseBaseException): - move_steps.parse_string(c) - - -class TestAttribute(unittest.TestCase): - def test_successful(self): - cases = [ - '[attrib: name=commuters; type=int; shape=NxN; default_value=None; comment="hey1"]', - '[attrib: name=move_control; type=float; shape=TxN; default_value=42; comment="hey2"]', - '[attrib: name=move_control; type=float; shape=TxN; default_value=-32.7;\n comment="hey3"]', - '[attrib:\nname=theta;\ntype=str;\nshape=S;\ndefault_value="hi";\ncomment="hey4"]', - '[attrib: name=centroids; type=[(longitude, float), (latitude, float)]; shape=N; default_value=(1.0, 2.0); comment="hey5"]', - ] - exps = [ - AttributeDef('commuters', int, Shapes.NxN, None, 'hey1'), - AttributeDef('move_control', float, Shapes.TxN, 42.0, 'hey2'), - AttributeDef('move_control', float, Shapes.TxN, -32.7, 'hey3'), - AttributeDef('theta', str, Shapes.S, 'hi', 'hey4'), - AttributeDef('centroids', CentroidType, Shapes.N, (1.0, 2.0), 'hey5'), - ] - for c, e in zip(cases, exps): - a = attribute.parse_string(c)[0] - self.assertEqual(a, e, f"{str(a)} did not match {str(e)}") - - def test_failures(self): - cases = [ - '[attrib: name=commuters; type=int; shape=NxN; default_value=23; comment="hey1"]', - '[attrib: name=move_control; type=uint8; shape=TxN; default_value=1; comment="hey2"]', - '[attrib: name=move_control; type=float; shape=TxA; default_value=27.3; comment="hey3"]', - ] - for c in cases: - with self.assertRaises(ParseBaseException): - move_steps.parse_string(c) - - -class TestDailyMoveClause(unittest.TestCase): - def test_successful_01(self): - case = '[mtype: days=[M,Th,Sa]; leave=2; duration=1d; return=4; function=\ndef(t):\n return 1\n]' - exp = DailyClause( - days=['M', 'Th', 'Sa'], - leave_step=1, - duration=Duration(1, 'd'), - return_step=3, - function='def(t):\n return 1' - ) - act = daily.parse_string(case)[0] - self.assertEqual(act, exp) - - def test_successful_02(self): - case = '[mtype: days=all; leave=1; duration=2w; return=2; function=\ndef(t):\n return 1\n]' - exp = DailyClause( - days=['M', 'T', 'W', 'Th', 'F', 'Sa', 'Su'], - leave_step=0, - duration=Duration(2, 'w'), - return_step=1, - function='def(t):\n return 1' - ) - act = daily.parse_string(case)[0] - self.assertEqual(act, exp) - - def test_failed_01(self): - # Invalid leave step - with self.assertRaises(ParseBaseException): - case = '[mtype: days=all; leave=0; duration=2w; return=2; function=\ndef(t):\n return 1\n]' - daily.parse_string(case) - - def test_failed_02(self): - # Invalid days value - with self.assertRaises(ParseBaseException): - case = '[mtype: days=[X]; leave=0; duration=2w; return=2; function=\ndef(t):\n return 1\n]' - daily.parse_string(case) diff --git a/epymorph/movement_model.py b/epymorph/movement_model.py new file mode 100644 index 00000000..443c071d --- /dev/null +++ b/epymorph/movement_model.py @@ -0,0 +1,276 @@ +""" +The basis of the movement model system in epymorph. +Movement models are responsible for dividing up the day +into one or more parts, in accordance with their desired +tempo of movement. (For example, commuting patterns of work day +versus night.) Movement mechanics are expressed using a set of +clauses which calculate a requested number of individuals move +between geospatial nodes at a particular time step of the simulation. +""" +import re +from abc import ABC, ABCMeta, abstractmethod +from functools import cached_property +from math import isclose +from typing import Any, Literal, Sequence, Type, TypeVar, cast + +import numpy as np +from numpy.typing import NDArray + +from epymorph.data_shape import SimDimensions +from epymorph.data_type import SimDType +from epymorph.geography.scope import GeoScope +from epymorph.simulation import (AttributeDef, NamespacedAttributeResolver, + SimulationFunctionClass, + SimulationTickFunction, Tick, TickDelta, + TickIndex) +from epymorph.util import are_instances + +DayOfWeek = Literal['M', 'T', 'W', 'Th', 'F', 'Sa', 'Su'] +"""Type for days of the week values.""" + +ALL_DAYS: tuple[DayOfWeek, ...] = ('M', 'T', 'W', 'Th', 'F', 'Sa', 'Su') +"""All days of the week values.""" + +_day_of_week_pattern = r"\b(M|T|W|Th|F|Sa|Su)\b" + + +def parse_days_of_week(dow: str) -> tuple[DayOfWeek, ...]: + """ + Parses the string as a list of days of the week using our standard abbreviations: M,T,W,Th,F,Sa,Su. + This parser is pretty permissive, simply ignoring invalid parts of the input while keeping the valid parts. + Any separator is allowed between the day of the week themselves. Returns an empty tuple if there are no matches. + """ + ds = re.findall(_day_of_week_pattern, dow) + return tuple(set(ds)) + + +class MovementPredicate(ABC): + """Checks the current tick and responds with True or False.""" + + @abstractmethod + def evaluate(self, tick: Tick) -> bool: + """Check the given tick.""" + + +class EveryDay(MovementPredicate): + """Return True for every day.""" + + def evaluate(self, tick: Tick) -> bool: + return True + + +class DayIs(MovementPredicate): + """Checks that the day is in the given set of days of the week.""" + + week_days: tuple[DayOfWeek, ...] + + def __init__(self, week_days: Sequence[DayOfWeek] | str): + if isinstance(week_days, str): + self.week_days = parse_days_of_week(week_days) + else: + self.week_days = tuple(week_days) + + def evaluate(self, tick: Tick) -> bool: + return tick.date.weekday() in self.week_days + + +################## +# MovementClause # +################## + + +_TypeT = TypeVar("_TypeT") + + +class MovementClauseClass(SimulationFunctionClass): + """ + The metaclass for user-defined MovementClause classes. + Used to verify proper class implementation. + """ + def __new__( + mcs: Type[_TypeT], + name: str, + bases: tuple[type, ...], + dct: dict[str, Any], + ) -> _TypeT: + # Skip these checks for known base classes: + if name in ("MovementClause",): + return super().__new__(mcs, name, bases, dct) + + # Check predicate. + predicate = dct.get("predicate") + if predicate is None or not isinstance(predicate, MovementPredicate): + raise TypeError( + f"Invalid predicate in {name}: please specify a MovementPredicate instance." + ) + # Check leaves. + leaves = dct.get("leaves") + if leaves is None or not isinstance(leaves, TickIndex): + raise TypeError( + f"Invalid leaves in {name}: please specify a TickIndex instance." + ) + if leaves.step < 0: + raise TypeError( + f"Invalid leaves in {name}: step indices cannot be less than zero." + ) + # Check returns. + returns = dct.get("returns") + if returns is None or not isinstance(returns, TickDelta): + raise TypeError( + f"Invalid returns in {name}: please specify a TickDelta instance." + ) + if returns.step < 0: + raise TypeError( + f"Invalid returns in {name}: step indices cannot be less than zero." + ) + if returns.days < 0: + raise TypeError( + f"Invalid returns in {name}: days cannot be less than zero." + ) + + return super().__new__(mcs, name, bases, dct) + + +class MovementClause(SimulationTickFunction[NDArray[SimDType]], ABC, metaclass=MovementClauseClass): + """ + A movement clause is basically a function which calculates _how many_ individuals + should move between all of the geo nodes, and then epymorph decides by random draw + _which_ individuals move (as identified by their disease status, or IPM compartment). + It also has various settings which determine when the clause is active + (for example, only move people Monday-Friday at the start of the day) + and when the individuals that were moved by the clause should return home + (for example, stay for two days and then return at the end of the day). + """ + + # in addition to requirements (from super), movement clauses must also specify: + + predicate: MovementPredicate + """When does this movement clause apply?""" + + leaves: TickIndex + """On which tau step does this movement clause apply?""" + + returns: TickDelta + """When do the movers from this clause return home?""" + + def is_active(self, tick: Tick) -> bool: + """Should this movement clause be applied this tick?""" + return self.leaves.step == tick.step and self.predicate.evaluate(tick) + + @abstractmethod + def evaluate(self, tick: Tick) -> NDArray[SimDType]: + """ + Implement this method to provide logic for the clause. + Your implementation is free to use `data`, `dim`, and `rng` in this function body. + You can also use `defer` to utilize another MovementClause instance. + """ + + def evaluate_in_context( + self, + data: NamespacedAttributeResolver, + dim: SimDimensions, + scope: GeoScope, + rng: np.random.Generator, + tick: Tick + ) -> NDArray[SimDType]: + """ + Evaluate this function within a context. + epymorph calls this function; you generally don't need to. + """ + requested = super().evaluate_in_context(data, dim, scope, rng, tick) + np.fill_diagonal(requested, 0) + return requested + + +################# +# MovementModel # +################# + + +class MovementModelClass(ABCMeta): + """ + The metaclass for user-defined MovementModel classes. + Used to verify proper class implementation. + """ + def __new__( + mcs: Type[_TypeT], + name: str, + bases: tuple[type, ...], + dct: dict[str, Any], + ) -> _TypeT: + # Skip these checks for known base classes: + if name in ("MovementModel",): + return super().__new__(mcs, name, bases, dct) + + # Check tau steps. + steps = dct.get("steps") + if steps is None or not isinstance(steps, (list, tuple)): + raise TypeError( + f"Invalid steps in {name}: please specify as a list or tuple." + ) + if not are_instances(steps, float): + raise TypeError( + f"Invalid steps in {name}: must be floating point numbers." + ) + if len(steps) == 0: + raise TypeError( + f"Invalid steps in {name}: please specify at least one tau step length." + ) + if not isclose(sum(steps), 1.0, abs_tol=1e-6): + raise TypeError( + f"Invalid steps in {name}: steps must sum to 1." + ) + if any(x <= 0 for x in steps): + raise TypeError( + f"Invalid steps in {name}: steps must all be greater than 0." + ) + dct["steps"] = tuple(steps) + + # Check clauses. + clauses = dct.get("clauses") + if clauses is None or not isinstance(clauses, (list, tuple)): + raise TypeError( + f"Invalid clauses in {name}: please specify as a list or tuple." + ) + if not are_instances(clauses, MovementClause): + raise TypeError( + f"Invalid clauses in {name}: must be instances of MovementClause." + ) + if len(clauses) == 0: + raise TypeError( + f"Invalid clauses in {name}: please specify at least one clause." + ) + for c in cast(Sequence[MovementClause], clauses): + # Check that clause steps are valid. + num_steps = len(steps) + if c.leaves.step >= num_steps: + raise TypeError( + f"Invalid clauses in {name}: {c.__class__.__name__} uses a leave step ({c.leaves.step}) " + f"which is not a valid index. (steps: {steps})" + ) + if c.returns.step >= num_steps: + raise TypeError( + f"Invalid clauses in {name}: {c.__class__.__name__} uses a return step ({c.returns.step}) " + f"which is not a valid index. (steps: {steps})" + ) + dct["clauses"] = tuple(clauses) + + return super().__new__(mcs, name, bases, dct) + + +class MovementModel(ABC, metaclass=MovementModelClass): + """ + A MovementModel (MM) describes a pattern of geospatial movement for individuals in the model. + The MM chops the day up into one or more parts (tau steps), and then describes movement clauses + which trigger for certain parts of the day. + """ + + steps: Sequence[float] + clauses: Sequence[MovementClause] + + @cached_property + def requirements(self) -> Sequence[AttributeDef]: + """The combined requirements of all of the clauses in this model.""" + return [req + for clause in self.clauses + for req in clause.requirements] diff --git a/epymorph/rume.py b/epymorph/rume.py index 875cd856..515bf449 100644 --- a/epymorph/rume.py +++ b/epymorph/rume.py @@ -7,10 +7,12 @@ import dataclasses import textwrap from abc import ABC, abstractmethod +from copy import deepcopy from dataclasses import dataclass, field from functools import cached_property -from itertools import accumulate -from typing import Callable, Mapping, OrderedDict, Self, Sequence, final +from itertools import accumulate, pairwise, starmap +from typing import (Callable, Mapping, NamedTuple, OrderedDict, Self, Sequence, + final) import numpy as np from numpy.typing import NDArray @@ -25,11 +27,10 @@ from epymorph.database import AbsoluteName, ModuleNamePattern, NamePattern from epymorph.geography.scope import GeoScope from epymorph.initializer import Initializer -from epymorph.movement.parser import (DailyClause, MovementClause, - MovementSpec, MoveSteps) +from epymorph.movement_model import MovementClause, MovementModel from epymorph.params import ParamSymbol, ParamValue, simulation_symbols from epymorph.simulation import (DEFAULT_STRATA, META_STRATA, AttributeDef, - TimeFrame, gpm_strata) + TickDelta, TickIndex, TimeFrame, gpm_strata) from epymorph.util import are_unique, map_values ####### @@ -46,7 +47,7 @@ class Gpm: name: str ipm: CompartmentModel - mm: MovementSpec + mm: MovementModel init: Initializer params: Mapping[ModuleNamePattern, ParamValue] @@ -54,7 +55,7 @@ def __init__( self, name: str, ipm: CompartmentModel, - mm: MovementSpec, + mm: MovementModel, init: Initializer, params: Mapping[str, ParamValue] | None = None, ): @@ -80,7 +81,13 @@ def __init__( """ -def combine_tau_steps(strata_tau_lengths: dict[str, list[float]]) -> tuple[list[float], dict[str, dict[int, int]], dict[str, dict[int, int]]]: +class _CombineTauStepsResult(NamedTuple): + new_tau_steps: tuple[float, ...] + start_mapping: dict[str, dict[int, int]] + stop_mapping: dict[str, dict[int, int]] + + +def combine_tau_steps(strata_tau_lengths: dict[str, Sequence[float]]) -> _CombineTauStepsResult: """ When combining movement models with different tau steps, it is necessary to create a new tau step scheme which can accomodate them all. This function performs that calculation, @@ -91,10 +98,10 @@ def combine_tau_steps(strata_tau_lengths: dict[str, list[float]]) -> tuple[list[ """ # Convert the tau lengths into the starting point and stopping point for each tau step. # Starts and stops are expressed as fractions of one day. - def tau_starts(taus: list[float]) -> list[float]: + def tau_starts(taus: Sequence[float]) -> Sequence[float]: return [0.0, *accumulate(taus)][:-1] - def tau_stops(taus: list[float]) -> list[float]: + def tau_stops(taus: Sequence[float]) -> Sequence[float]: return [*accumulate(taus)] strata_tau_starts = map_values(tau_starts, strata_tau_lengths) @@ -108,10 +115,10 @@ def tau_stops(taus: list[float]) -> list[float]: combined_tau_stops.sort() # Now calculate the combined tau lengths. - combined_tau_lengths = [ + combined_tau_lengths = tuple( stop - start for start, stop in zip(combined_tau_starts, combined_tau_stops) - ] + ) # But the individual strata MMs are indexed by their original tau steps, # so we need to calculate the appropriate re-indexing to the new tau steps @@ -125,44 +132,40 @@ def tau_stops(taus: list[float]) -> list[float]: for name, curr in strata_tau_stops.items() } - return combined_tau_lengths, tau_start_mapping, tau_stop_mapping + return _CombineTauStepsResult(combined_tau_lengths, tau_start_mapping, tau_stop_mapping) -def remap_taus(strata_mms: list[tuple[str, MovementSpec]]) -> OrderedDict[str, MovementSpec]: +def remap_taus(strata_mms: list[tuple[str, MovementModel]]) -> OrderedDict[str, MovementModel]: """ When combining movement models with different tau steps, it is necessary to create a new tau step scheme which can accomodate them all. """ new_tau_steps, start_mapping, stop_mapping = combine_tau_steps({ - strata: mm.steps.step_lengths + strata: mm.steps for strata, mm in strata_mms }) def clause_remap_tau(clause: MovementClause, strata: str) -> MovementClause: - match clause: - case DailyClause(): - return DailyClause( - days=clause.days, - leave_step=start_mapping[strata][clause.leave_step], - duration=clause.duration, - return_step=stop_mapping[strata][clause.return_step], - function=clause.function, - ) - - def spec_remap_tau(orig_spec: MovementSpec, strata: str) -> MovementSpec: - return MovementSpec( - steps=MoveSteps(new_tau_steps), - attributes=orig_spec.attributes, - predef=orig_spec.predef, - clauses=[ - clause_remap_tau(c, strata) - for c in orig_spec.clauses - ], + leave_step = start_mapping[strata][clause.leaves.step] + return_step = stop_mapping[strata][clause.returns.step] + + clone = deepcopy(clause) + clone.leaves = TickIndex(leave_step) + clone.returns = TickDelta(clause.returns.days, return_step) + return clone + + def model_remap_tau(orig_model: MovementModel, strata: str) -> MovementModel: + clone = deepcopy(orig_model) + clone.steps = new_tau_steps + clone.clauses = tuple( + clause_remap_tau(c, strata) + for c in orig_model.clauses ) + return clone return OrderedDict([ - (strata_name, spec_remap_tau(spec, strata_name)) - for strata_name, spec in strata_mms + (strata_name, model_remap_tau(model, strata_name)) + for strata_name, model in strata_mms ]) @@ -179,7 +182,7 @@ class Rume(ABC): strata: Sequence[Gpm] ipm: BaseCompartmentModel - mms: OrderedDict[str, MovementSpec] + mms: OrderedDict[str, MovementModel] scope: GeoScope time_frame: TimeFrame params: Mapping[NamePattern, ParamValue] @@ -195,7 +198,7 @@ def __post_init__(self): # but they all have the same set of tau steps so it doesn't matter # which we use. (Using the first one is safe.) first_strata = self.strata[0].name - tau_step_lengths = self.mms[first_strata].steps.step_lengths + tau_step_lengths = self.mms[first_strata].steps dim = SimDimensions.build( tau_step_lengths=tau_step_lengths, @@ -208,7 +211,7 @@ def __post_init__(self): object.__setattr__(self, 'dim', dim) @cached_property - def attributes(self) -> Mapping[AbsoluteName, AttributeDef]: + def requirements(self) -> Mapping[AbsoluteName, AttributeDef]: """Returns the attributes required by the RUME.""" def generate_items(): # IPM attributes are already fully named. @@ -216,48 +219,51 @@ def generate_items(): # Name the MM and Init attributes. for gpm in self.strata: strata_name = gpm_strata(gpm.name) - for a in gpm.mm.attributes: + for a in gpm.mm.requirements: yield AbsoluteName(strata_name, "mm", a.name), a for a in gpm.init.requirements: yield AbsoluteName(strata_name, "init", a.name), a return OrderedDict(generate_items()) - def compartment_mask(self, strata_name: str) -> NDArray[np.bool_]: + @cached_property + def compartment_mask(self) -> Mapping[str, NDArray[np.bool_]]: """ - Returns a mask which describes which compartments belong in the given strata. - For example: if the model has three strata ('1', '2', and '3') with three compartments each, - `strata_compartment_mask('2')` returns `[0 0 0 1 1 1 0 0 0]` + Masks that describe which compartments belong in the given strata. + For example: if the model has three strata ('a', 'b', and 'c') with three compartments each, + `strata_compartment_mask('b')` returns `[0 0 0 1 1 1 0 0 0]` (where 0 stands for False and 1 stands for True). - Raises ValueError if no strata matches the given name. """ - result = np.full(shape=self.ipm.num_compartments, - fill_value=False, dtype=np.bool_) - ci, cf = 0, 0 - found = False - for gpm in self.strata: - # Iterate through the strata IPMs: - ipm = gpm.ipm - if strata_name != gpm.name: - # keep count of how many compartments precede our target strata - ci += ipm.num_compartments - else: - # when we find our target, we now have the target's compartment index range - cf = ci + ipm.num_compartments - # set those to True and break - result[ci:cf] = True - found = True - break - if not found: - raise ValueError(f"Not a valid strata name in this model: {strata_name}") - return result - - def compartment_mobility(self, strata_name: str) -> NDArray[np.bool_]: - """Calculates which compartments should be considered subject to movement in a particular strata.""" - compartment_mobility = np.array( + def mask(length: int, true_slice: slice) -> NDArray[np.bool_]: + # A boolean array with the given slice set to True, all others False + m = np.zeros(shape=length, dtype=np.bool_) + m[true_slice] = True + return m + + # num of compartments in the combined IPM + C = self.ipm.num_compartments + # num of compartments in each strata + strata_cs = [gpm.ipm.num_compartments for gpm in self.strata] + # start and stop index for each strata + strata_ranges = pairwise([0, *accumulate(strata_cs)]) + # map stata name to the mask for each strata + return dict(zip( + [g.name for g in self.strata], + [mask(C, s) for s in starmap(slice, strata_ranges)] + )) + + @cached_property + def compartment_mobility(self) -> Mapping[str, NDArray[np.bool_]]: + """Masks that describe which compartments should be considered subject to movement in a particular strata.""" + # The mobility mask for all strata. + all_mobility = np.array( ['immobile' not in c.tags for c in self.ipm.compartments], dtype=np.bool_ ) - return self.compartment_mask(strata_name) * compartment_mobility + # Mobility for a single strata is all_mobility boolean-and whether the compartment is in that strata. + return { + strata.name: all_mobility & self.compartment_mask[strata.name] + for strata in self.strata + } @abstractmethod def name_display_formatter(self) -> Callable[[AbsoluteName | NamePattern], str]: @@ -267,7 +273,7 @@ def params_description(self) -> str: """Provide a description of all attributes required by the RUME.""" format_name = self.name_display_formatter() lines = [] - for name, attr in self.attributes.items(): + for name, attr in self.requirements.items(): properties = [ f"type: {dtype_str(attr.type)}", f"shape: {attr.shape}", @@ -289,7 +295,7 @@ def generate_params_dict(self) -> str: """Generate a skeleton dictionary you can use to provide parameter values to the room.""" format_name = self.name_display_formatter() lines = ["{"] - for name, attr in self.attributes.items(): + for name, attr in self.requirements.items(): value = 'PLACEHOLDER' if attr.default_value is not None: value = str(attr.default_value) @@ -319,7 +325,7 @@ class SingleStrataRume(Rume): def build( cls, ipm: CompartmentModel, - mm: MovementSpec, + mm: MovementModel, init: Initializer, scope: GeoScope, time_frame: TimeFrame, diff --git a/epymorph/simulation.py b/epymorph/simulation.py index c4c9e9c4..83a536a7 100644 --- a/epymorph/simulation.py +++ b/epymorph/simulation.py @@ -86,6 +86,11 @@ class Tick(NamedTuple): """What's the tau length of the current step? (0.666,0.333,0.666,0.333,...)""" +class TickIndex(NamedTuple): + """A zero-based index of the simulation tau steps.""" + step: int # which tau step within that day (zero-indexed) + + class TickDelta(NamedTuple): """ An offset relative to a Tick expressed as a number of days which should elapse, @@ -371,8 +376,8 @@ def rng(self) -> np.random.Generator: """The simulation's random number generator.""" @abstractmethod - def defer(self, other: 'SimulationFunction[_DeferredT]') -> _DeferredT: - """Defer processing to another instance of a SimulationFunction.""" + def export(self) -> tuple[NamespacedAttributeResolver, SimDimensions, GeoScope, np.random.Generator]: + """Tuples the contents of this context so it can be re-used (see: defer()).""" class _EmptyContext(_Context): @@ -391,7 +396,7 @@ def scope(self) -> GeoScope: def rng(self) -> np.random.Generator: raise TypeError("Invalid access of function context.") - def defer(self, other: 'SimulationFunction[_DeferredT]') -> _DeferredT: + def export(self) -> tuple[NamespacedAttributeResolver, SimDimensions, GeoScope, np.random.Generator]: raise TypeError("Invalid access of function context.") @@ -431,8 +436,11 @@ def scope(self) -> GeoScope: def rng(self) -> np.random.Generator: return self._rng - def defer(self, other: 'SimulationFunction[_DeferredT]') -> _DeferredT: - return other.evaluate_in_context(self._data, self._dim, self._scope, self._rng) + def export(self) -> tuple[NamespacedAttributeResolver, SimDimensions, GeoScope, np.random.Generator]: + return (self._data, self._dim, self._scope, self._rng) + + +_TypeT = TypeVar("_TypeT") class SimulationFunctionClass(ABCMeta): @@ -441,11 +449,11 @@ class SimulationFunctionClass(ABCMeta): Used to verify proper class implementation. """ def __new__( - mcs: Type['SimulationFunctionClass'], + mcs: Type[_TypeT], name: str, bases: tuple[type, ...], dct: dict[str, Any], - ) -> 'SimulationFunctionClass': + ) -> _TypeT: # Check requirements if this class overrides it. # (Otherwise class will inherit from parent.) if (reqs := dct.get("requirements")) is not None: @@ -483,10 +491,11 @@ def __new__( return super().__new__(mcs, name, bases, dct) -class SimulationFunction(ABC, Generic[T_co], metaclass=SimulationFunctionClass): +class BaseSimulationFunction(ABC, Generic[T_co], metaclass=SimulationFunctionClass): """ A function which runs in the context of a simulation to produce a value (as a numpy array). - Implement a SimulationFunction by extending this class and overriding the `evaluate()` method. + This base class exists to share functionality without limiting the function signature + of evaluate(). """ requirements: Sequence[AttributeDef] = () @@ -494,28 +503,23 @@ class SimulationFunction(ABC, Generic[T_co], metaclass=SimulationFunctionClass): _ctx: _Context = _EMPTY_CONTEXT - def evaluate_in_context( + def with_context( self, data: NamespacedAttributeResolver, dim: SimDimensions, scope: GeoScope, rng: np.random.Generator, - ) -> T_co: - """Evaluate a function within a context. epymorph calls this function; you generally don't need to.""" + ) -> Self: + """ + Constructs a clone of this instance which has access to the given context. + epymorph calls this function; you generally don't need to. + """ # clone this instance, then run evaluate on that; accomplishes two things: # 1. don't have to worry about cleaning up _ctx # 2. instances can use @cached_property without surprising results clone = deepcopy(self) setattr(clone, "_ctx", _RealContext(data, dim, scope, rng)) - return clone.evaluate() - - @abstractmethod - def evaluate(self) -> T_co: - """ - Implement this method to provide logic for the function. - Your implementation is free to use `data`, `dim`, and `rng` in this function body. - You can also use `defer` to utilize another SimulationFunction instance. - """ + return clone def data(self, attribute: AttributeDef | str) -> NDArray: """Retrieve the value of a specific attribute.""" @@ -547,10 +551,76 @@ def rng(self) -> np.random.Generator: """The simulation's random number generator.""" return self._ctx.rng + +class SimulationFunction(BaseSimulationFunction[T_co]): + """ + A function which runs in the context of a simulation to produce a value (as a numpy array). + Implement a SimulationFunction by extending this class and overriding the `evaluate()` method. + """ + + def evaluate_in_context( + self, + data: NamespacedAttributeResolver, + dim: SimDimensions, + scope: GeoScope, + rng: np.random.Generator, + ) -> T_co: + """ + Evaluate this function within a context. + epymorph calls this function; you generally don't need to. + """ + return super()\ + .with_context(data, dim, scope, rng)\ + .evaluate() + + @abstractmethod + def evaluate(self) -> T_co: + """ + Implement this method to provide logic for the function. + Your implementation is free to use `data`, `dim`, and `rng` in this function body. + You can also use `defer` to utilize another SimulationFunction instance. + """ + @final def defer(self, other: 'SimulationFunction[_DeferredT]') -> _DeferredT: """Defer processing to another instance of a SimulationFunction.""" - return self._ctx.defer(other) + return other.evaluate_in_context(*self._ctx.export()) + + +class SimulationTickFunction(BaseSimulationFunction[T_co]): + """ + A function which runs in the context of a simulation to produce a sim-time-specific value (as a numpy array). + Implement a SimulationTickFunction by extending this class and overriding the `evaluate()` method. + """ + + def evaluate_in_context( + self, + data: NamespacedAttributeResolver, + dim: SimDimensions, + scope: GeoScope, + rng: np.random.Generator, + tick: Tick + ) -> T_co: + """ + Evaluate this function within a context. + epymorph calls this function; you generally don't need to. + """ + return super()\ + .with_context(data, dim, scope, rng)\ + .evaluate(tick) + + @abstractmethod + def evaluate(self, tick: Tick) -> T_co: + """ + Implement this method to provide logic for the function. + Your implementation is free to use `data`, `dim`, and `rng` in this function body. + You can also use `defer` to utilize another SimulationTickFunction instance. + """ + + @final + def defer(self, other: 'SimulationTickFunction[_DeferredT]', tick: Tick) -> _DeferredT: + """Defer processing to another instance of a SimulationTickFunction.""" + return other.evaluate_in_context(*self._ctx.export(), tick) ############### diff --git a/epymorph/simulator/basic/basic_simulator.py b/epymorph/simulator/basic/basic_simulator.py index 67d67741..881286d1 100644 --- a/epymorph/simulator/basic/basic_simulator.py +++ b/epymorph/simulator/basic/basic_simulator.py @@ -18,7 +18,7 @@ from epymorph.simulator.basic.mm_exec import MovementExecutor from epymorph.simulator.basic.output import Output from epymorph.simulator.data import (evaluate_params, initialize_rume, - validate_attributes) + validate_requirements) from epymorph.simulator.world_list import ListWorld @@ -62,7 +62,7 @@ def run( }, rng=rng, ) - validate_attributes(rume, db) + validate_requirements(rume, db) except AttributeException as e: msg = f"RUME attribute requirements were not met. See errors:\n- {e}" raise SimValidationException(msg) from None diff --git a/epymorph/simulator/basic/mm_exec.py b/epymorph/simulator/basic/mm_exec.py index 1d41a049..612b4796 100644 --- a/epymorph/simulator/basic/mm_exec.py +++ b/epymorph/simulator/basic/mm_exec.py @@ -1,18 +1,11 @@ -from typing import NamedTuple - import numpy as np from numpy.typing import NDArray -from epymorph.data_shape import SimDimensions from epymorph.data_type import AttributeArray, SimDType -from epymorph.database import (Database, DatabaseWithFallback, ModuleNamespace, - NamePattern) -from epymorph.error import MmCompileException +from epymorph.database import Database, ModuleNamespace +from epymorph.error import MmSimException from epymorph.event import (MovementEventsMixin, OnMovementClause, OnMovementFinish, OnMovementStart) -from epymorph.movement.compile import compile_spec -from epymorph.movement.movement_model import (MovementContext, MovementModel, - TravelClause) from epymorph.rume import Rume from epymorph.simulation import (NamespacedAttributeResolver, Tick, gpm_strata, resolve_tick_delta) @@ -21,13 +14,12 @@ def calculate_travelers( - # General movement model info. - ctx: MovementContext, - # Clause info. - clause: TravelClause, + clause_name: str, clause_mobility: NDArray[np.bool_], + requested_movers: NDArray[SimDType], + available_movers: NDArray[SimDType], tick: Tick, - local_cohorts: NDArray[SimDType], + rng: np.random.Generator ) -> OnMovementClause: """ Calculate the number of travelers resulting from this movement clause for this tick. @@ -35,27 +27,28 @@ def calculate_travelers( then selects exactly which individuals (by compartment) should move. Returns an (N,N,C) array; from-source-to-destination-by-compartment. """ - _, N, C, _ = ctx.dim.TNCE + # Extract number of nodes and cohorts from the provided array. + (N, C) = available_movers.shape - clause_movers = clause.requested(ctx, tick) - np.fill_diagonal(clause_movers, 0) - clause_sum = clause_movers.sum(axis=1, dtype=SimDType) + initial_requested_movers = requested_movers + np.fill_diagonal(requested_movers, 0) + requested_sum = requested_movers.sum(axis=1, dtype=SimDType) - available_movers = local_cohorts * clause_mobility + available_movers = available_movers * clause_mobility available_sum = available_movers.sum(axis=1, dtype=SimDType) # If clause requested total is greater than the total available, # use mvhg to select as many as possible. - if not np.any(clause_sum > available_sum): + if not np.any(requested_sum > available_sum): throttled = False - requested_movers = clause_movers - requested_sum = clause_sum + # requested_movers = requested_movers + # requested_sum = requested_sum else: throttled = True - requested_movers = clause_movers.copy() + requested_movers = requested_movers.copy() for src in range(N): - if clause_sum[src] > available_sum[src]: - requested_movers[src, :] = ctx.rng.multivariate_hypergeometric( + if requested_sum[src] > available_sum[src]: + requested_movers[src, :] = rng.multivariate_hypergeometric( colors=requested_movers[src, :], nsample=available_sum[src] ) @@ -70,14 +63,14 @@ def calculate_travelers( continue # Select which individuals will be leaving this node. - mover_cs = ctx.rng.multivariate_hypergeometric( + mover_cs = rng.multivariate_hypergeometric( available_movers[src, :], requested_sum[src] ).astype(SimDType) # Select which location they are each going to. # (Each row contains the compartments for a destination.) - travelers_cs[src, :, :] = ctx.rng.multinomial( + travelers_cs[src, :, :] = rng.multinomial( mover_cs, requested_prb[src, :] ).T.astype(SimDType) @@ -86,26 +79,14 @@ def calculate_travelers( tick.sim_index, tick.day, tick.step, - clause.name, - clause_movers, + clause_name, + initial_requested_movers, travelers_cs, requested_sum.sum(), throttled, ) -class _Ctx(NamedTuple): - dim: SimDimensions - rng: np.random.Generator - data: NamespacedAttributeResolver - - -class _StrataInfo(NamedTuple): - model: MovementModel - mobility: NDArray[np.bool_] - ctx: _Ctx - - class MovementExecutor: """Movement model execution specifically for multi-strata simulations.""" @@ -115,10 +96,9 @@ class MovementExecutor: """the world state""" _rng: np.random.Generator """the simulation RNG""" - _event_target: MovementEventsMixin - _data: Database[AttributeArray] - _strata: dict[str, _StrataInfo] + _event_target: MovementEventsMixin + _strata_data: dict[str, NamespacedAttributeResolver] def __init__( self, @@ -128,47 +108,20 @@ def __init__( rng: np.random.Generator, event_target: MovementEventsMixin, ): - # Introduce a new data layer so we have a place to store predefs - data = DatabaseWithFallback({}, db) - self._rume = rume self._world = world - self._data = data self._rng = rng self._event_target = event_target - self._strata = { - strata: _StrataInfo( - # Compile movement model - model=compile_spec(mm, rng), - # Get compartment mobility for this strata - mobility=rume.compartment_mobility(strata), - # Assemble a context with a resolver for this strata - ctx=_Ctx( - dim=rume.dim, - rng=rng, - data=NamespacedAttributeResolver( - data=data, - dim=rume.dim, - namespace=ModuleNamespace(gpm_strata(strata), "mm"), - ), - ), + + # Create a per-strata attribute resolver for MM clauses. + self._strata_data = { + strata: NamespacedAttributeResolver( + data=db, + dim=rume.dim, + namespace=ModuleNamespace(gpm_strata(strata), "mm"), ) for strata, mm in rume.mms.items() } - self._compute_predefs() - - def _compute_predefs(self) -> None: - """Compute predefs and store results to our database.""" - for strata, (model, _, ctx) in self._strata.items(): - result = model.predef(ctx) - if not isinstance(result, dict): - msg = f"Movement predef: did not return a dictionary result (got: {type(result)})" - raise MmCompileException(msg) - for key, value in result.items(): - if not isinstance(value, np.ndarray): - msg = f"Movement predef: key '{key}' invalid; it is not a numpy array." - pattern = NamePattern(gpm_strata(strata), "mm", key) - self._data.update(pattern, value.copy()) def apply(self, tick: Tick) -> None: """Applies movement for this tick, mutating the world state.""" @@ -178,19 +131,39 @@ def apply(self, tick: Tick) -> None: # Process travel clauses. total = 0 - for model, mobility, ctx in self._strata.values(): + for strata, model in self._rume.mms.items(): for clause in model.clauses: - if not clause.predicate(ctx, tick): + if not clause.is_active(tick): continue - local_array = self._world.get_local_array() + + available_movers = self._world.get_local_array() + + try: + requested_movers = clause.evaluate_in_context( + self._strata_data[strata], + self._rume.dim, + self._rume.scope, + self._rng, + tick + ) + except Exception as e: + # NOTE: catching exceptions here is necessary to get nice error messages + # for some value error cause by incorrect parameter and/or clause definition + msg = f"Error from applying clause '{clause.__class__.__name__}': see exception trace" + raise MmSimException(msg) from e clause_event = calculate_travelers( - ctx, clause, mobility, tick, local_array) + clause.__class__.__name__, + self._rume.compartment_mobility[strata], + requested_movers, + available_movers, + tick, + self._rng + ) self._event_target.on_movement_clause.publish(clause_event) travelers = clause_event.actual - returns = clause.returns(ctx, tick) - return_tick = resolve_tick_delta(ctx.dim, tick, returns) + return_tick = resolve_tick_delta(self._rume.dim, tick, clause.returns) self._world.apply_travel(travelers, return_tick) total += travelers.sum() diff --git a/epymorph/simulator/basic/test/basic_simulator_test.py b/epymorph/simulator/basic/test/basic_simulator_test.py index 2a22fa6d..ab2b4d05 100644 --- a/epymorph/simulator/basic/test/basic_simulator_test.py +++ b/epymorph/simulator/basic/test/basic_simulator_test.py @@ -277,4 +277,6 @@ def test_mm_clause_error(self): err_msg = str(e.exception) self.assertIn( - "Error from applying clause 'dispersers': see exception trace", err_msg) + "Error from applying clause 'Dispersers': see exception trace", + err_msg + ) diff --git a/epymorph/simulator/data.py b/epymorph/simulator/data.py index 55bc5268..7813895f 100644 --- a/epymorph/simulator/data.py +++ b/epymorph/simulator/data.py @@ -175,13 +175,13 @@ def evaluate_params( # Evaluate every attribute required by the RUME. attr_db, evaluate = _evaluation_context(rume, override_params, rng) - rume_attributes: list[tuple[AbsoluteName, ParamValue | None]] = [ - *((name, attr.default_value) for name, attr in rume.attributes.items()), + rume_reqs: list[tuple[AbsoluteName, ParamValue | None]] = [ + *((name, attr.default_value) for name, attr in rume.requirements.items()), # Artificially require the special geo labels attribute. (GEO_LABELS, rume.scope.get_node_ids()), ] - for name, default_value in rume_attributes: + for name, default_value in rume_reqs: try: evaluate(name, [], default_value) except AttributeException as e: @@ -214,15 +214,15 @@ def evaluate_param( return evaluate(param, [], None) -def validate_attributes( +def validate_requirements( rume: Rume, data: Database[AttributeArray], ) -> None: """ - Validate all attributes in a RUME; raises an ExceptionGroup containing all errors. + Validate all attributes requirements in a RUME; raises an ExceptionGroup containing all errors. """ def validate() -> Generator[AttributeException, None, None]: - for name, attr in rume.attributes.items(): + for name, attr in rume.requirements.items(): attr_match = data.query(name) if attr_match is None: msg = f"Missing required parameter: '{name}'" diff --git a/epymorph/simulator/test/data_test.py b/epymorph/simulator/test/data_test.py index 12586a5e..946bcf06 100644 --- a/epymorph/simulator/test/data_test.py +++ b/epymorph/simulator/test/data_test.py @@ -103,7 +103,7 @@ def test_eval_1(self): # We should have as many entries in our DB as we have attributes in the RUME, # plus 1 (for geo labels). - self.assertEqual(len(db._data), len(rume.attributes) + 1) + self.assertEqual(len(db._data), len(rume.requirements) + 1) self.assert_db(db, "gpm:aaa::ipm::beta", np.array(0.4, dtype=np.float64)) self.assert_db(db, "gpm:bbb::ipm::beta", np.array(0.3, dtype=np.float64)) diff --git a/epymorph/test/movement_model_test.py b/epymorph/test/movement_model_test.py new file mode 100644 index 00000000..d86b45ee --- /dev/null +++ b/epymorph/test/movement_model_test.py @@ -0,0 +1,163 @@ +# pylint: disable=missing-docstring,unused-variable +import unittest + +import numpy as np +from numpy.typing import NDArray + +from epymorph.data_type import SimDType +from epymorph.movement_model import EveryDay, MovementClause, MovementModel +from epymorph.simulation import Tick, TickDelta, TickIndex + + +class MovementClauseTest(unittest.TestCase): + + def test_create_01(self): + # Successful clause! + class MyClause(MovementClause): + leaves = TickIndex(step=0) + returns = TickDelta(days=0, step=1) + predicate = EveryDay() + + def evaluate(self, tick: Tick) -> NDArray[SimDType]: + return np.array([0]) + + clause = MyClause() + + self.assertEqual(clause.leaves, TickIndex(step=0)) + self.assertEqual(clause.returns, TickDelta(days=0, step=1)) + + def test_create_02(self): + # Test for error: forgot 'leaves' + with self.assertRaises(TypeError) as e: + class MyClause(MovementClause): + # leaves = TickIndex(step=0) + returns = TickDelta(days=0, step=1) + predicate = EveryDay() + + def evaluate(self, tick: Tick) -> NDArray[SimDType]: + return np.array([0]) + self.assertIn("invalid leaves in myclause", str(e.exception).lower()) + + def test_create_03(self): + # Test for error: forgot 'returns' + with self.assertRaises(TypeError) as e: + class MyClause(MovementClause): + leaves = TickIndex(step=0) + # returns = TickDelta(days=0, step=1) + predicate = EveryDay() + + def evaluate(self, tick: Tick) -> NDArray[SimDType]: + return np.array([0]) + self.assertIn("invalid returns in myclause", str(e.exception).lower()) + + def test_create_04(self): + # Test for error: forgot 'predicate' + with self.assertRaises(TypeError) as e: + class MyClause(MovementClause): + leaves = TickIndex(step=0) + returns = TickDelta(days=0, step=1) + # predicate = EveryDay() + + def evaluate(self, tick: Tick) -> NDArray[SimDType]: + return np.array([0]) + self.assertIn("invalid predicate in myclause", str(e.exception).lower()) + + def test_create_05(self): + # Test for error: invalid 'leaves' index + with self.assertRaises(TypeError) as e: + class MyClause(MovementClause): + leaves = TickIndex(step=-23) + returns = TickDelta(days=0, step=1) + predicate = EveryDay() + + def evaluate(self, tick: Tick) -> NDArray[SimDType]: + return np.array([0]) + self.assertIn("step indices cannot be less than zero", str(e.exception).lower()) + + def test_create_06(self): + # Test for error: invalid 'returns' index + with self.assertRaises(TypeError) as e: + class MyClause(MovementClause): + leaves = TickIndex(step=0) + returns = TickDelta(days=0, step=-23) + predicate = EveryDay() + + def evaluate(self, tick: Tick) -> NDArray[SimDType]: + return np.array([0]) + self.assertIn("step indices cannot be less than zero", str(e.exception).lower()) + + +class MovementModelTest(unittest.TestCase): + + class MyClause(MovementClause): + leaves = TickIndex(step=0) + returns = TickDelta(days=0, step=1) + predicate = EveryDay() + + def evaluate(self, tick: Tick) -> NDArray[SimDType]: + return np.array([0]) + + def test_create_01(self): + class MyModel(MovementModel): + steps = [1 / 3, 2 / 3] + clauses = [MovementModelTest.MyClause()] + + model = MyModel() + self.assertEqual(model.steps, (1 / 3, 2 / 3)) + self.assertEqual(len(model.clauses), 1) + self.assertEqual(model.clauses[0].__class__.__name__, "MyClause") + + def test_create_02(self): + # Test for error: forgot 'steps' + with self.assertRaises(TypeError) as e: + class MyModel(MovementModel): + # steps = [1 / 3, 2 / 3] + clauses = [MovementModelTest.MyClause()] + self.assertIn("invalid steps in mymodel", str(e.exception).lower()) + + def test_create_03(self): + # Test for error: 'steps' don't sum to 1 + with self.assertRaises(TypeError) as e: + class MyModel1(MovementModel): + steps = [1 / 3, 1 / 3] + clauses = [MovementModelTest.MyClause()] + self.assertIn("steps must sum to 1", str(e.exception).lower()) + + with self.assertRaises(TypeError) as e: + class MyModel2(MovementModel): + steps = [0.1, 0.75, 0.3, 0.2] + clauses = [MovementModelTest.MyClause()] + self.assertIn("steps must sum to 1", str(e.exception).lower()) + + def test_create_04(self): + # Test for error: 'steps' aren't all greater than zero + with self.assertRaises(TypeError) as e: + class MyModel(MovementModel): + steps = [1 / 3, -1 / 3, 1 / 3, 2 / 3] + clauses = [MovementModelTest.MyClause()] + self.assertIn("steps must all be greater than 0", str(e.exception).lower()) + + def test_create_05(self): + # Test for error: forgot 'clauses' + with self.assertRaises(TypeError) as e: + class MyModel(MovementModel): + steps = [1 / 3, 2 / 3] + # clauses = [MovementModelTest.MyClause()] + self.assertIn("invalid clauses", str(e.exception).lower()) + + def test_create_06(self): + # Test for error: clauses reference steps which don't exist + with self.assertRaises(TypeError) as e: + class MyClause(MovementClause): + leaves = TickIndex(0) + returns = TickDelta(days=0, step=9) + predicate = EveryDay() + + def evaluate(self, tick: Tick) -> NDArray[SimDType]: + return np.array([0]) + + class MyModel(MovementModel): + steps = (1 / 3, 2 / 3) + clauses = (MyClause(),) + self.assertIn("return step (9)", str(e.exception).lower()) + self.assertIn("not a valid index", str(e.exception).lower()) diff --git a/epymorph/test/rume_test.py b/epymorph/test/rume_test.py index 541faacb..f15f773a 100644 --- a/epymorph/test/rume_test.py +++ b/epymorph/test/rume_test.py @@ -1,15 +1,16 @@ # pylint: disable=missing-docstring +import numpy as np from numpy.testing import assert_array_equal +from numpy.typing import NDArray from epymorph import AttributeDef, Shapes, TimeFrame, init, mm_library from epymorph.compartment_model import CompartmentModel, compartment, edge from epymorph.database import AbsoluteName from epymorph.geography.us_census import StateScope -from epymorph.movement.parser import (ALL_DAYS, DailyClause, MovementSpec, - MoveSteps) -from epymorph.movement.parser_util import Duration +from epymorph.movement_model import EveryDay, MovementClause, MovementModel from epymorph.rume import (DEFAULT_STRATA, Gpm, MultistrataRume, SingleStrataRume, combine_tau_steps, remap_taus) +from epymorph.simulation import Tick, TickDelta, TickIndex from epymorph.test import EpymorphTestCase @@ -100,49 +101,43 @@ def test_combine_tau_steps_4(self): }) def test_remap_taus_1(self): - mm1 = MovementSpec( - steps=MoveSteps([1 / 3, 2 / 3]), - attributes=[], - predef=None, - clauses=[ - DailyClause( - days=ALL_DAYS, - leave_step=0, - duration=Duration(0, 'd'), - return_step=1, - function='place-hodor', - ), - ], - ) + class Clause1(MovementClause): + leaves = TickIndex(0) + returns = TickDelta(days=0, step=1) + predicate = EveryDay() - mm2 = MovementSpec( - steps=MoveSteps([1 / 2, 1 / 2]), - attributes=[], - predef=None, - clauses=[ - DailyClause( - days=ALL_DAYS, - leave_step=1, - duration=Duration(0, 'd'), - return_step=1, - function='place-hodor', - ), - ], - ) + def evaluate(self, tick: Tick) -> NDArray[np.int64]: + return np.array([]) - new_mms = remap_taus([('a', mm1), ('b', mm2)]) + class Model1(MovementModel): + steps = (1 / 3, 2 / 3) + clauses = (Clause1(),) - new_taus = new_mms["a"].steps.step_lengths + class Clause2(MovementClause): + leaves = TickIndex(1) + returns = TickDelta(days=0, step=1) + predicate = EveryDay() + + def evaluate(self, tick: Tick) -> NDArray[np.int64]: + return np.array([]) + + class Model2(MovementModel): + steps = (1 / 2, 1 / 2) + clauses = (Clause2(),) + + new_mms = remap_taus([('a', Model1()), ('b', Model2())]) + + new_taus = new_mms["a"].steps self.assertListAlmostEqual(new_taus, [1 / 3, 1 / 6, 1 / 2]) self.assertEqual(len(new_mms), 2) new_mm1 = new_mms['a'] - self.assertEqual(new_mm1.clauses[0].leave_step, 0) - self.assertEqual(new_mm1.clauses[0].return_step, 2) + self.assertEqual(new_mm1.clauses[0].leaves.step, 0) + self.assertEqual(new_mm1.clauses[0].returns.step, 2) new_mm2 = new_mms['b'] - self.assertEqual(new_mm2.clauses[0].leave_step, 2) - self.assertEqual(new_mm2.clauses[0].return_step, 2) + self.assertEqual(new_mm2.clauses[0].leaves.step, 2) + self.assertEqual(new_mm2.clauses[0].returns.step, 2) class RumeTest(EpymorphTestCase): @@ -152,7 +147,7 @@ def test_create_monostrata_1(self): sir = Sir() centroids = mm_library['centroids']() # Make sure centroids has the tau steps we will expect later... - self.assertListAlmostEqual(centroids.steps.step_lengths, [1 / 3, 2 / 3]) + self.assertListAlmostEqual(centroids.steps, [1 / 3, 2 / 3]) rume = SingleStrataRume.build( ipm=sir, @@ -172,11 +167,11 @@ def test_create_monostrata_1(self): self.assertEqual(rume.dim.nodes, 2) assert_array_equal( - rume.compartment_mask(DEFAULT_STRATA), + rume.compartment_mask[DEFAULT_STRATA], [True, True, True], ) assert_array_equal( - rume.compartment_mobility(DEFAULT_STRATA), + rume.compartment_mobility[DEFAULT_STRATA], [True, True, True], ) @@ -186,7 +181,7 @@ def test_create_multistrata_1(self): sir = Sir() no = mm_library['no']() # Make sure 'no' has the tau steps we will expect later... - self.assertListAlmostEqual(no.steps.step_lengths, [1.0]) + self.assertListAlmostEqual(no.steps, [1.0]) rume = MultistrataRume.build( strata=[ @@ -218,24 +213,24 @@ def test_create_multistrata_1(self): self.assertEqual(rume.dim.nodes, 2) assert_array_equal( - rume.compartment_mask("aaa"), + rume.compartment_mask["aaa"], [True, True, True, False, False, False], ) assert_array_equal( - rume.compartment_mask("bbb"), + rume.compartment_mask["bbb"], [False, False, False, True, True, True], ) assert_array_equal( - rume.compartment_mobility("aaa"), + rume.compartment_mobility["aaa"], [True, True, True, False, False, False], ) assert_array_equal( - rume.compartment_mobility("bbb"), + rume.compartment_mobility["bbb"], [False, False, False, True, True, True], ) # NOTE: these tests will break if someone alters the MM or Init definition; even just the comments - self.assertDictEqual(rume.attributes, { + self.assertDictEqual(rume.requirements, { AbsoluteName("gpm:aaa", "ipm", "beta"): AttributeDef("beta", float, Shapes.TxN), AbsoluteName("gpm:aaa", "ipm", "gamma"): AttributeDef("gamma", float, Shapes.TxN), AbsoluteName("gpm:bbb", "ipm", "beta"): AttributeDef("beta", float, Shapes.TxN), @@ -252,7 +247,7 @@ def test_create_multistrata_2(self): sir = Sir() centroids = mm_library['centroids']() # Make sure centroids has the tau steps we will expect later... - self.assertListAlmostEqual(centroids.steps.step_lengths, [1 / 3, 2 / 3]) + self.assertListAlmostEqual(centroids.steps, [1 / 3, 2 / 3]) rume = MultistrataRume.build( strata=[ @@ -278,7 +273,7 @@ def test_create_multistrata_2(self): self.assertEqual(rume.dim.nodes, 2) # NOTE: these tests will break if someone alters the MM or Init definition; even just the comments - self.assertDictEqual(rume.attributes, { + self.assertDictEqual(rume.requirements, { AbsoluteName("gpm:aaa", "ipm", "beta"): AttributeDef("beta", float, Shapes.TxN), @@ -298,6 +293,10 @@ def test_create_multistrata_2(self): comment="Influences the distance that movers tend to travel.", default_value=40.0), + AbsoluteName("gpm:aaa", "mm", "commuter_proportion"): + AttributeDef("commuter_proportion", float, Shapes.S, default_value=0.1, + comment="Decides what proportion of the total population should be commuting normally."), + AbsoluteName("gpm:aaa", "init", "population"): AttributeDef("population", int, Shapes.N, comment="The population at each geo node."),