-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathevent.py
188 lines (145 loc) · 5.17 KB
/
event.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
"""
epymorph's event frameworks.
The idea is to have a set of classes which define event protocols for
logical components of epymorph.
"""
from typing import NamedTuple
from numpy.typing import NDArray
from epymorph.attribute import AbsoluteName
from epymorph.data_type import SimDType
from epymorph.rume import Rume
from epymorph.util import Event, Singleton
#####################
# Simulation Events #
#####################
class OnStart(NamedTuple):
"""The payload of a simulation on_start event."""
simulator: str
"""Name of the simulator class."""
rume: Rume
"""The RUME for the simulation."""
class OnTick(NamedTuple):
"""The payload of a Simulation tick event."""
tick_index: int
"""The index of the just-completed tick."""
ticks: int
"""The total number of ticks for the simulation."""
###################
# Movement Events #
###################
class OnMovementStart(NamedTuple):
"""The payload for the event when movement processing starts for a tick."""
tick: int
"""Which simulation tick."""
day: int
"""Which simulation day."""
step: int
"""Which tau step (by index)."""
class OnMovementClause(NamedTuple):
"""The payload for the event when a single movement clause has been processed."""
tick: int
"""Which simulation tick."""
day: int
"""Which simulation day."""
step: int
"""Which tau step (by index)."""
clause_name: str
"""The clause processed."""
requested: NDArray[SimDType]
"""
The number of individuals this clause 'wants' to move, that is,
the values returned by its clause function. (An NxN array.)
"""
actual: NDArray[SimDType]
"""
The actual number of individuals moved, by source, destination, and compartment.
(An NxNxC array.)
"""
total: int
"""The number of individuals moved by this clause."""
is_throttled: bool
"""
Did the clause request to move more people than were available (at any location)?
"""
class OnMovementFinish(NamedTuple):
"""
The payload for the event when movement processing finishes for one simulation tick.
"""
tick: int
"""Which simulation tick."""
day: int
"""Which simulation day."""
step: int
"""Which tau step (by index)."""
total: int
"""The total number of individuals moved during this tick."""
################
# ADRIO Events #
################
class DownloadActivity(NamedTuple):
"""A description of ADRIO network download activity.
All fields are optional, as it may be possible to measure some but not others
depending on the data source. ADRIOs should avoid reporting DownloadActivity
where all fields are None, and should instead just report None for
`AdrioProgress.download`"""
total: int | None
"""How many bytes are expected to be downloaded?"""
downloaded: int | None
"""How many bytes have been downloaded?"""
download_speed: int | None
"""What is the current approximate download speed?"""
class AdrioProgress(NamedTuple):
"""The payload of AdrioEvents.on_adrio_progress
Perhaps not all ADRIOs will report progress, but those that do
should emit one event when they start (with `ratio_complete` == 0)
and one when they finish (with `ratio_complete` == 1). They are free
to report as many intermediate progress events as they like."""
adrio_name: str
"""The full name of the ADRIO class."""
attribute: AbsoluteName
"""The name of the attribute being evaluated."""
final: bool
"""Is this the last progress update for this ADRIO?"""
ratio_complete: float
"""What ratio complete is it? (0: just started; 1: done)"""
download: DownloadActivity | None
"""Download activity if any, and if it can be measured."""
duration: float | None
"""If complete, how long did the ADRIO take overall (in seconds)?"""
############
# EventBus #
############
class EventBus(metaclass=Singleton):
"""The one-stop for epymorph events. This class uses the singleton pattern."""
# Simulation Events
on_start: Event[OnStart]
"""Event fires at the start of a simulation run."""
on_tick: Event[OnTick]
"""Event fires after each tick has been processed."""
on_finish: Event[None]
"""Event fires after a simulation run is complete."""
# Movement Events
on_movement_start: Event[OnMovementStart]
"""
Event fires at the start of the movement processing phase for every simulation tick.
"""
on_movement_clause: Event[OnMovementClause]
"""Event fires after processing each active movement clause."""
on_movement_finish: Event[OnMovementFinish]
"""
Event fires at the end of the movement processing phase for every simulation tick.
"""
# ADRIO Events
on_adrio_progress: Event[AdrioProgress]
"""Event fires when an ADRIO is fetching data."""
def __init__(self):
# SimulationEvents
self.on_start = Event()
self.on_tick = Event()
self.on_finish = Event()
# MovementEvents
self.on_movement_start = Event()
self.on_movement_clause = Event()
self.on_movement_finish = Event()
# AdrioEvents
self.on_adrio_progress = Event()