Skip to content

Commit 320ef2e

Browse files
committed
Merge branch 'main' into feat/parquet_owen
merge changes to main branch in
2 parents bf3566b + 3b8fcb4 commit 320ef2e

File tree

11 files changed

+166
-28
lines changed

11 files changed

+166
-28
lines changed

digital_land/cli.py

+11-4
Original file line numberDiff line numberDiff line change
@@ -84,11 +84,17 @@ def fetch_cmd(ctx, url):
8484
type=click.Path(exists=True),
8585
default="collection/endpoint.csv",
8686
)
87+
@click.option("--refill-todays-logs", default=False)
8788
@collection_dir
8889
@click.pass_context
89-
def collect_cmd(ctx, endpoint_path, collection_dir):
90+
def collect_cmd(ctx, endpoint_path, collection_dir, refill_todays_logs):
9091
"""fetch resources from collection endpoints"""
91-
return collect(endpoint_path, collection_dir, ctx.obj["PIPELINE"])
92+
return collect(
93+
endpoint_path,
94+
collection_dir,
95+
ctx.obj["PIPELINE"],
96+
refill_todays_logs=refill_todays_logs,
97+
)
9298

9399

94100
#
@@ -110,9 +116,10 @@ def collection_pipeline_makerules_cmd(collection_dir):
110116

111117

112118
@cli.command("collection-save-csv", short_help="save collection as CSV package")
119+
@click.option("--refill-todays-logs", default=False)
113120
@collection_dir
114-
def collection_save_csv_cmd(collection_dir):
115-
return collection_save_csv(collection_dir)
121+
def collection_save_csv_cmd(collection_dir, refill_todays_logs):
122+
return collection_save_csv(collection_dir, refill_todays_logs)
116123

117124

118125
@cli.command(

digital_land/collect.py

+18-11
Original file line numberDiff line numberDiff line change
@@ -51,18 +51,23 @@ def log_path(self, log_datetime, endpoint):
5151
log_date = log_datetime.isoformat()[:10]
5252
return os.path.join(self.log_dir, log_date, endpoint + ".json")
5353

54-
def save_log(self, path, log):
55-
self.save(path, canonicaljson.encode_canonical_json(log))
54+
def save_log(self, path, log, refill_todays_logs=False):
55+
self.save(
56+
path,
57+
canonicaljson.encode_canonical_json(log),
58+
refill_todays_logs=refill_todays_logs,
59+
)
5660

5761
def save_content(self, content):
5862
resource = hashlib.sha256(content).hexdigest()
5963
path = os.path.join(self.resource_dir, resource)
6064
self.save(path, content)
6165
return resource
6266

63-
def save(self, path, data):
67+
def save(self, path, data, refill_todays_logs=False):
6468
os.makedirs(os.path.dirname(path), exist_ok=True)
65-
if not os.path.exists(path):
69+
# if refill_todays_logs=True then files in log_path need to be overwritten
70+
if not os.path.exists(path) or refill_todays_logs:
6671
logging.info(path)
6772
with open(path, "wb") as f:
6873
f.write(data)
@@ -126,6 +131,7 @@ def fetch(
126131
log_datetime=datetime.utcnow(),
127132
end_date="",
128133
plugin="",
134+
refill_todays_logs=False,
129135
):
130136
if end_date and datetime.strptime(end_date, "%Y-%m-%d") < log_datetime:
131137
return FetchStatus.EXPIRED
@@ -139,11 +145,12 @@ def fetch(
139145
)
140146
return FetchStatus.HASH_FAILURE
141147

142-
# fetch each source at most once per-day
148+
# fetch each source at most once per-day, though with an option to re-collect the latest day's sources
143149
log_path = self.log_path(log_datetime, endpoint)
144-
if os.path.isfile(log_path):
145-
logging.debug(f"{log_path} exists")
146-
return FetchStatus.ALREADY_FETCHED
150+
if not refill_todays_logs:
151+
if os.path.isfile(log_path):
152+
logging.debug(f"{log_path} exists")
153+
return FetchStatus.ALREADY_FETCHED
147154

148155
log = {
149156
"endpoint-url": url,
@@ -167,8 +174,7 @@ def fetch(
167174
log["elapsed"] = str(round(timer() - start, 3))
168175

169176
status = self.save_resource(content, log_path, log)
170-
171-
self.save_log(log_path, log)
177+
self.save_log(log_path, log, refill_todays_logs=refill_todays_logs)
172178
return status
173179

174180
def save_resource(self, content, url, log):
@@ -182,7 +188,7 @@ def save_resource(self, content, url, log):
182188

183189
return FetchStatus.FAILED
184190

185-
def collect(self, endpoint_path):
191+
def collect(self, endpoint_path, refill_todays_logs=False):
186192
for row in csv.DictReader(open(endpoint_path, newline="")):
187193
endpoint = row["endpoint"]
188194
url = row["endpoint-url"]
@@ -197,4 +203,5 @@ def collect(self, endpoint_path):
197203
endpoint=endpoint,
198204
end_date=row.get("end-date", ""),
199205
plugin=plugin,
206+
refill_todays_logs=refill_todays_logs,
200207
)

digital_land/collection.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ def save_csv(self, directory=None):
354354
self.log.save_csv(directory=directory)
355355
self.resource.save_csv(directory=directory)
356356

357-
def load(self, directory=None):
357+
def load(self, directory=None, refill_todays_logs=False):
358358
directory = directory or self.dir
359359
self.source.load(directory=directory)
360360
self.endpoint.load(directory=directory)
@@ -363,7 +363,9 @@ def load(self, directory=None):
363363

364364
# Try to load log store from csv first
365365
try:
366-
self.log.load_csv(directory=directory)
366+
self.log.load_csv(
367+
directory=directory, refill_todays_logs=refill_todays_logs
368+
)
367369
logging.info(f"Log loaded from CSV - {len(self.log.entries)} entries")
368370
except FileNotFoundError:
369371
logging.info("No log.csv - building from log items")

digital_land/commands.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,10 @@ def fetch(url, pipeline):
8686
collector.fetch(url)
8787

8888

89-
def collect(endpoint_path, collection_dir, pipeline):
89+
def collect(endpoint_path, collection_dir, pipeline, refill_todays_logs=False):
9090
"""fetch the sources listed in the endpoint-url column of the ENDPOINT_PATH CSV file"""
9191
collector = Collector(pipeline.name, Path(collection_dir))
92-
collector.collect(endpoint_path)
92+
collector.collect(endpoint_path, refill_todays_logs=refill_todays_logs)
9393

9494

9595
#
@@ -109,9 +109,9 @@ def collection_pipeline_makerules(collection_dir):
109109
collection.pipeline_makerules()
110110

111111

112-
def collection_save_csv(collection_dir):
112+
def collection_save_csv(collection_dir, refill_todays_logs=False):
113113
collection = Collection(name=None, directory=collection_dir)
114-
collection.load()
114+
collection.load(refill_todays_logs=refill_todays_logs)
115115
collection.update()
116116
collection.save_csv()
117117

digital_land/expectations/operation.py

+14-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import pandas as pd
33
import urllib
44
import os
5+
import time
56

67

78
# # TODO is there a way to represent this in a generalised count or not
@@ -39,7 +40,7 @@ def count_lpa_boundary(
3940
lpa_geometry = data["geometry"]
4041
except requests.exceptions.RequestException as err:
4142
passed = False
42-
message = f"An error occured when retrieving lpa geometry from platform {err}"
43+
message = f"An error occurred when retrieving lpa geometry from platform {err}"
4344
details = {}
4445
return passed, message, details
4546

@@ -142,7 +143,18 @@ def count_deleted_entities(
142143
}
143144
)
144145
base_url = f"https://datasette.planning.data.gov.uk/digital-land.csv?{params}"
145-
get_resource = pd.read_csv(base_url)
146+
147+
# Can have an issue getting data from datasette. If this occurs then wait a minute and retry
148+
max_retries = 60 # Retry for an hour
149+
for attempt in range(max_retries):
150+
try:
151+
get_resource = pd.read_csv(base_url)
152+
break
153+
except urllib.error.HTTPError:
154+
time.sleep(60)
155+
else:
156+
raise Exception("Failed to fetch datasette after multiple attempts")
157+
146158
resource_list = get_resource["resource"].to_list()
147159

148160
# use resource list to get current entities

digital_land/phase/patch.py

+2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ def __init__(
1616
def apply_patch(self, fieldname, value):
1717
patches = {**self.patch.get(fieldname, {}), **self.patch.get("", {})}
1818
for pattern, replacement in patches.items():
19+
if pattern == value:
20+
pattern = f"^{re.escape(pattern)}$"
1921
match = re.match(pattern, value, flags=re.IGNORECASE)
2022
if match:
2123
newvalue = match.expand(replacement)

digital_land/store/csv.py

+14-4
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,29 @@
44
import csv
55
import logging
66
from pathlib import Path
7+
from datetime import datetime
78
from .memory import MemoryStore
89

910

1011
class CSVStore(MemoryStore):
1112
def csv_path(store, directory=""):
1213
return Path(directory) / (store.schema.name + ".csv")
1314

14-
def load_csv(self, path=None, directory=""):
15+
def load_csv(self, path=None, directory="", refill_todays_logs=False):
1516
path = path or self.csv_path(directory)
16-
logging.debug("loading %s" % (path))
17+
today = datetime.now().date()
18+
logging.debug("loading %s" % path)
1719
reader = csv.DictReader(open(path, newline=""))
1820
for row in reader:
19-
self.add_entry(row)
21+
if not refill_todays_logs:
22+
self.add_entry(row)
23+
else:
24+
# Don't load in values of today's log so it can be overwritten
25+
if (
26+
"entry-date" in row
27+
and datetime.fromisoformat(row["entry-date"]).date() < today
28+
):
29+
self.add_entry(row)
2030

2131
def load(self, *args, **kwargs):
2232
self.load_csv(*args, **kwargs)
@@ -28,7 +38,7 @@ def save_csv(self, path=None, directory="", entries=None):
2838
entries = self.entries
2939

3040
os.makedirs(os.path.dirname(path), exist_ok=True)
31-
logging.debug("saving %s" % (path))
41+
logging.debug("saving %s" % path)
3242
f = open(path, "w", newline="")
3343
writer = csv.DictWriter(
3444
f, fieldnames=self.schema.fieldnames, extrasaction="ignore"

setup.py

+1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ def get_long_description():
5555
"json-stream",
5656
"duckdb",
5757
"dask",
58+
"dask[dataframe]",
5859
"pyarrow",
5960
"pygit2",
6061
],

tests/integration/test_collection.py

+69
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,75 @@ def test_collection_update_today(test_collection_update_fixture):
426426
assert collection.resource.entries[0]["end-date"] == ""
427427

428428

429+
def test_collection_update_refill_todays_logs(tmp_path):
430+
collection_dir = os.path.join(tmp_path, "collection")
431+
os.makedirs(collection_dir, exist_ok=True)
432+
433+
# Write the existing log and resource file
434+
_write_csv(
435+
dir=collection_dir,
436+
log={
437+
"bytes": "2",
438+
"content-type": "",
439+
"elapsed": "0.5",
440+
"endpoint": "test",
441+
"resource": "test",
442+
"status": "200",
443+
"entry-date": datetime.datetime.now().isoformat(),
444+
"start-date": datetime.datetime.now().isoformat(),
445+
"end-date": "",
446+
"exception": "",
447+
},
448+
resource={
449+
"resource": "test",
450+
"bytes": "2",
451+
"organisations": "test",
452+
"datasets": "test",
453+
"endpoints": "test",
454+
"start-date": "2019-01-01",
455+
"end-date": "",
456+
},
457+
)
458+
459+
# Write the endpoint/source for the new log item
460+
_write_csv(
461+
dir=collection_dir,
462+
endpoint={
463+
"endpoint": "test",
464+
"endpoint-url": "test.com",
465+
"parameters": "",
466+
"plugin": "",
467+
"entry-date": "2019-01-01",
468+
"start-date": "2019-01-01",
469+
"end-date": "",
470+
},
471+
source={
472+
"source": "test1",
473+
"attribution": "",
474+
"collection": "test",
475+
"documentation-url": "testing.com",
476+
"endpoint": "test",
477+
"licence": "test",
478+
"organisation": "test-org",
479+
"pipelines": "test",
480+
"entry-date": "2019-01-01",
481+
"start-date": "2019-01-01",
482+
"end-date": "",
483+
},
484+
)
485+
486+
collection = Collection(directory=collection_dir)
487+
488+
# Load from CSVs
489+
# With overwrite today true it shouldn't load today's log
490+
collection.load(refill_todays_logs=True)
491+
assert len(collection.log.entries) == 0
492+
493+
# While False it should load todays log as normal
494+
collection.load(refill_todays_logs=False)
495+
assert len(collection.log.entries) == 1
496+
497+
429498
def test_collection_retire_endpoints_and_sources(tmp_path):
430499

431500
# Create a temporary directory for the test collection

tests/unit/test_collect.py

+9
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,15 @@ def test_already_fetched(collector, prepared_response):
5959
assert new_status == FetchStatus.ALREADY_FETCHED
6060

6161

62+
@responses.activate
63+
def test_refill_todays_logs(collector, prepared_response):
64+
status = collector.fetch("http://some.url")
65+
assert status == FetchStatus.OK
66+
67+
new_status = collector.fetch("http://some.url", refill_todays_logs=True)
68+
assert new_status == FetchStatus.OK
69+
70+
6271
@responses.activate
6372
def test_expired(collector):
6473
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")

tests/unit/test_patch.py

+20-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ def test_patch_regex():
1414
"^2\\*$": "II*",
1515
"^2 Star$": "II*",
1616
"^3$": "III",
17-
}
17+
},
18+
"OrganisationURI": {
19+
"https://example.com/search?query=data&filter=name%20contains%20test": "patch_organisation",
20+
},
1821
}
1922

2023
p = PatchPhase(patches=patches, issues=issues)
@@ -48,3 +51,19 @@ def test_patch_regex():
4851
assert issue["issue-type"] == "patch"
4952
assert issue["value"] == "2 Star"
5053
assert issues.rows == []
54+
55+
assert (
56+
p.apply_patch(
57+
"OrganisationURI",
58+
"https://example.com/search?query=data&filter=name%20contains%20test",
59+
)
60+
== "patch_organisation"
61+
)
62+
issue = issues.rows.pop()
63+
assert issue["field"] == "OrganisationURI"
64+
assert issue["issue-type"] == "patch"
65+
assert (
66+
issue["value"]
67+
== "https://example.com/search?query=data&filter=name%20contains%20test"
68+
)
69+
assert issues.rows == []

0 commit comments

Comments
 (0)