-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathoperation.py
226 lines (196 loc) · 7.94 KB
/
operation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
import requests
import pandas as pd
import urllib
import os
import time
# # TODO is there a way to represent this in a generalised count or not
def count_lpa_boundary(
conn,
lpa: str,
expected: int,
organisation_entity: int = None,
comparison_rule: str = "equals_to",
geometric_relation: str = "within",
):
"""
Specific version of a count which given a local authority
and a dataset checks for any entities relating to the lpa boundary.
relation defaults to within but can be changed. This should only be used on geographic
datasets
args:
conn: sqlite connection used to connect to the db, wil be created by the checkpoint class
lpa: The reference to the local planning authority (geography dataset) boundary to use
expected: the expected count, must be a non-negative integer
organisation: optional additional filter to filter by organisation_entity as well as boundary
geometric_relation: how to decide if the data is related to the lpa boundary
"""
# get lpa boundary
# get geometric boundary from API
# TODO should be moved to the sdk/api for accessing the platform
try:
base_url = "https://www.planning.data.gov.uk"
endpoint = f"curie/statistical-geography:{lpa}.json"
response = requests.get(
f"{base_url}/{endpoint}",
)
response.raise_for_status()
data = response.json()
lpa_geometry = data["geometry"]
except requests.exceptions.RequestException as err:
passed = False
message = f"An error occurred when retrieving lpa geometry from platform {err}"
details = {}
return passed, message, details
# now deal with spatial options
# Determine the spatial condition based on the geometric_relation parameter
spatial_options = {
"within": f"""
CASE
WHEN geometry != '' THEN ST_WITHIN(ST_GeomFromText(geometry), ST_GeomFromText('{lpa_geometry}'))
ELSE ST_WITHIN(ST_GeomFromText(point), ST_GeomFromText('{lpa_geometry}'))
END
""",
"intersects": f"""
CASE
WHEN geometry != '' THEN ST_INTERSECTS(ST_GeomFromText(geometry), ST_GeomFromText('{lpa_geometry}'))
ELSE ST_INTERSECTS(ST_GeomFromText(point), ST_GeomFromText('{lpa_geometry}'))
END
""",
"not_intersects": f"""
CASE
WHEN geometry != '' THEN NOT ST_INTERSECTS(ST_GeomFromText(geometry), ST_GeomFromText('{lpa_geometry}'))
ELSE NOT ST_INTERSECTS(ST_GeomFromText(point), ST_GeomFromText('{lpa_geometry}'))
END
""",
"centroid_within": f"""
CASE
WHEN point != '' THEN ST_WITHIN(ST_GeomFromText(point), ST_GeomFromText('{lpa_geometry}'))
ELSE ST_WITHIN(ST_CENTROID(ST_GeomFromText(geometry)), ST_GeomFromText('{lpa_geometry}'))
END
""",
}
if geometric_relation not in spatial_options:
raise ValueError(
f"Invalid geometric_relation: '{geometric_relation}'. Must be one of {list(spatial_options.keys())}."
)
spatial_condition = spatial_options[geometric_relation]
# set up initial query
query = """
SELECT entity
FROM entity
WHERE (geometry != '' OR point != '')
"""
if organisation_entity:
query = query + f"AND organisation_entity = '{organisation_entity}'"
query = query + f"AND ({spatial_condition});"
rows = conn.execute(query).fetchall()
entities = [row[0] for row in rows]
actual = len(entities)
# compare expected to actual
# Define comparison rules
comparison_rules = {
"equals_to": actual == expected,
"not_equal_to": actual != expected,
"greater_than": actual > expected,
"greater_than_or_equal_to": actual >= expected,
"less_than": actual < expected,
"less_than_or_equal_to": actual <= expected,
}
# Perform comparison based on the specified operator
if comparison_rule not in comparison_rules:
raise ValueError(
f"Invalid comparison_operator: '{comparison_rule}'. Must be one of {list(comparison_rules.keys())}."
)
result = comparison_rules[comparison_rule]
message = f"there were {actual} entities found"
details = {
"actual": actual,
"expected": expected,
"entities": entities,
}
return result, message, details
def count_deleted_entities(
conn,
expected: int,
organisation_entity: int = None,
):
# get database name to identify dataset
db_path = conn.execute("PRAGMA database_list").fetchall()[0][2]
db_name = os.path.splitext(os.path.basename(db_path))[0]
# get dataset specific active resource list
params = urllib.parse.urlencode(
{
"sql": f"""select * from reporting_historic_endpoints rhe join organisation o on rhe.organisation=o.organisation
where pipeline == '{db_name}' and o.entity='{organisation_entity}' and resource_end_date == "" group by endpoint""",
"_size": "max",
}
)
base_url = f"https://datasette.planning.data.gov.uk/digital-land.csv?{params}"
# Can have an issue getting data from datasette. If this occurs then wait a minute and retry
max_retries = 60 # Retry for an hour
for attempt in range(max_retries):
try:
get_resource = pd.read_csv(base_url)
break
except urllib.error.HTTPError:
time.sleep(60)
else:
raise Exception("Failed to fetch datasette after multiple attempts")
resource_list = get_resource["resource"].to_list()
# use resource list to get current entities
query = f"""select f.entity
from fact_resource fe join fact f on fe.fact=f.fact join entity e on f.entity=e.entity
where resource in ({','.join(f"'{x}'" for x in resource_list)})
group by reference
"""
rows = conn.execute(query).fetchall()
get_active_entities = [row[0] for row in rows]
# get entities from entity table to compare against resource entities
query = f"""
select entity from entity where organisation_entity = '{organisation_entity}';
"""
rows = conn.execute(query).fetchall()
get_entities = [row[0] for row in rows]
# identify entities present in the entity table but missing from the resource
entities = [item for item in get_entities if item not in get_active_entities]
actual = len(entities)
result = bool(actual == expected)
message = f"there were {actual} entities found"
details = {
"actual": actual,
"expected": expected,
"entities": entities,
}
return result, message, details
def check_columns(conn, expected: dict):
# This operation checks that the db connection provided contains the tables with the expected columns provided
# expected: a dictionary containing table names as keys, with a list of their expected columns as the value
details = []
success_count = 0
failure_count = 0
for k, v in expected.items():
table_name = k
expected_columns = v
sql = f"""
PRAGMA table_info({table_name})
"""
rows = conn.execute(sql).fetchall()
actual = [row[1] for row in rows]
success = set(expected_columns).issubset(set(actual))
missing = list(set(expected_columns) - set(actual))
details.append(
{
"table": table_name,
"success": success,
"missing": missing,
"actual": actual,
"expected": expected_columns,
}
)
if success:
success_count += 1
else:
failure_count += 1
result = False if failure_count > 0 else True
message = f"{success_count} out of {success_count + failure_count} tables had expected columns"
return result, message, details