-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdata_usage.py
171 lines (144 loc) · 6.12 KB
/
data_usage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
from abc import abstractmethod
from dataclasses import dataclass
from functools import partial
from math import floor, inf
from pathlib import Path
from shutil import disk_usage
from typing import Protocol, Sequence, runtime_checkable
from humanize import naturaldelta, naturalsize
@dataclass(frozen=True)
class EmptyDataEstimate:
"""An empty data estimate given that the provided data does not support the
calculation of the data usage of a data fetch operation."""
name: str
"""The name of the given ADRIO."""
@dataclass(frozen=True)
class AvailableDataEstimate:
"""An estimate for the data usage of a data fetch operation.
Operations may download data and may utilize disk caching, so we would like
to be able to estimate ahead of time how much data to expect.
A concrete example of such an operation are ADRIOs fetch data from a third-party
source during the preparation of a RUME.
NOTE: all values are estimated and their accuracy may vary.
"""
name: str
"""What is responsible for loading this data?"""
cache_key: str
"""Multiple things may in fact load the same set of data; even though both would
report the same estimate for missing data, only the first one to load would really
incur that cost. The others would then find the cached data waiting.
This key should make it possible to discover this case -- if two estimates are
produced with the same key, it can be assumed that the estimate should only
be counted once. Cache keys are only comparable within a single simulation context,
so we don't need to perfectly distinguish between different scopes or time frames.
"""
new_network_bytes: int
"""How much new data (in bytes) will need to be downloaded."""
max_bandwidth: int | None
"""A source-specific limit on download bandwidth (in bytes per second).
(Some sources may impose known limits on downloads.)
"""
new_cache_bytes: int
"""How much new data (in bytes) will be written to disk cache."""
total_cache_bytes: int
"""The total data (in bytes) that will be in the cache after fetch.
This includes new cached files and previously cached files."""
DataEstimate = EmptyDataEstimate | AvailableDataEstimate
@runtime_checkable
class CanEstimateData(Protocol):
@abstractmethod
def estimate_data(self) -> DataEstimate:
"""Estimate the data usage for this entity.
If a reasonable estimate cannot be made, return EmptyDataEstimate."""
@dataclass(frozen=True)
class DataEstimateTotal:
new_network_bytes: int
"""How much new data (in bytes) will need to be downloaded."""
new_cache_bytes: int
"""How much new data (in bytes) will be written to disk cache."""
total_cache_bytes: int
"""The total data (in bytes) that will be in the cache after fetch."""
download_time: float
"""The estimated time (in seconds) to download all new data."""
def estimate_total(
estimates: Sequence[DataEstimate],
max_bandwidth: int,
) -> DataEstimateTotal:
"""Combines a number of individual data estimates into a total.
Includes a total download time with the assumed bandwidth limit
as well as source-specific bandwidth limits.
"""
new_net = 0
new_cache = 0
tot_cache = 0
download_time = 0.0
cache_keys = set[str]()
for e in estimates:
if isinstance(e, AvailableDataEstimate):
if e.cache_key in cache_keys:
continue
cache_keys.add(e.cache_key)
new_net += e.new_network_bytes
new_cache += e.new_cache_bytes
tot_cache += e.total_cache_bytes
download_time += e.new_network_bytes / (
min(max_bandwidth, e.max_bandwidth or inf)
)
return DataEstimateTotal(new_net, new_cache, tot_cache, download_time)
def estimate_report(
cache_path: Path,
estimates: Sequence[DataEstimate],
max_bandwidth: int,
) -> list[str]:
"""Generate a report from the given set of data estimates.
Describes an itemized list of how much data will be downloaded and
how much new data will be written to cache, then totals that up
and reports how long that will take and whether or not there is enough
available disk space."""
# short-hand formatting functions
ff = partial(naturalsize, binary=False) # format file size
ft = naturaldelta # format time duration
cache_keys = set[str]()
result = list[str]()
for e in estimates:
if isinstance(e, AvailableDataEstimate):
if e.cache_key in cache_keys or (
(e.new_network_bytes) == 0 or (e.new_cache_bytes) == 0
):
line = f"- {e.name} will be pulled from cache"
else:
line = f"- {e.name} will download {ff(e.new_network_bytes)} of new data"
cache_keys.add(e.cache_key)
else:
line = f"- {e.name} (no estimate available)"
result.append(line)
total = estimate_total(estimates, max_bandwidth)
result.append("In total we will:")
if total.new_network_bytes == 0:
result.append("- Download no additional data")
else:
result.append(
f"- Download {ff(total.new_network_bytes)}, "
f"taking {ft(total.download_time)} "
f"(assuming {ff(max_bandwidth)}/s)"
)
available_space = disk_usage(cache_path).free
if total.new_cache_bytes == 0:
result.append("- Write no new data to disk cache")
elif total.new_cache_bytes < floor(available_space * 0.9):
result.append(
f"- Write {ff(total.new_cache_bytes)} to disk cache "
f"(you have {ff(available_space)} free space)"
)
elif total.new_cache_bytes < available_space:
result.append(f"- Write {ff(total.new_cache_bytes)} to disk cache")
result.append(
"WARNING: this is very close to exceeding available free space "
f"of {ff(available_space)}!"
)
else:
result.append(f"- Write {ff(total.new_cache_bytes)} to disk cache")
result.append(
f"ERROR: this exceeds available free space of {ff(available_space)}!"
)
return result