Skip to content

Commit 18df7da

Browse files
Feat/incre load new resources (#359)
* New function 'set_up_connection' for use with 'dataset_update' * Added 'load_from_s3' for use with 'dataset_update' * Refomatting * Removed unneeded comment * Added boto3 and moto * Added tests mocking dataset_update * Test for the dataset_create & dataset_update commands * Moved tests to tests/integration/test_commands.py * Added 'dataset_update' function * Added test for package.insert * Changed function names to start with test * black edits * Updated S3 table_name
1 parent 122b3fb commit 18df7da

File tree

5 files changed

+783
-3
lines changed

5 files changed

+783
-3
lines changed

digital_land/commands.py

+67-1
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ def dataset_create(
422422
path=output_path,
423423
specification_dir=None, # TBD: package should use this specification object
424424
)
425-
# don'tt use create as we don't want to create the indexes
425+
# don't use create as we don't want to create the indexes
426426
package.create_database()
427427
package.disconnect()
428428
for path in input_paths:
@@ -474,6 +474,72 @@ def dataset_create(
474474
package.add_counts()
475475

476476

477+
#
478+
# update dataset from processed new resources
479+
#
480+
def dataset_update(
481+
input_paths,
482+
output_path,
483+
organisation_path,
484+
pipeline,
485+
dataset,
486+
specification,
487+
bucket_name=None, # bucket name from bash script, need to put into cli.
488+
object_key=None, # object-key, latter part of 'bucket'
489+
issue_dir="issue",
490+
column_field_dir="var/column-field",
491+
dataset_resource_dir="var/dataset-resource",
492+
):
493+
"""
494+
Updates the current state of the sqlite files being held in S3 with new resources
495+
"""
496+
if not output_path:
497+
print("missing output path", file=sys.stderr)
498+
sys.exit(2)
499+
500+
if not bucket_name or not object_key:
501+
print("Missing bucket name or object_key to get sqlite files", file=sys.stderr)
502+
sys.exit(2)
503+
504+
# Set up initial objects
505+
column_field_dir = Path(column_field_dir)
506+
dataset_resource_dir = Path(dataset_resource_dir)
507+
organisation = Organisation(
508+
organisation_path=organisation_path, pipeline_dir=Path(pipeline.path)
509+
)
510+
package = DatasetPackage(
511+
dataset,
512+
organisation=organisation,
513+
path=output_path,
514+
specification_dir=None, # TBD: package should use this specification object
515+
)
516+
# Copy files from S3 and load into tables
517+
table_name = dataset
518+
package.load_from_s3(
519+
bucket_name=bucket_name, object_key=object_key, table_name=table_name
520+
)
521+
522+
for path in input_paths:
523+
path_obj = Path(path)
524+
package.load_transformed(path)
525+
package.load_column_fields(column_field_dir / dataset / path_obj.name)
526+
package.load_dataset_resource(dataset_resource_dir / dataset / path_obj.name)
527+
package.load_entities()
528+
529+
old_entity_path = os.path.join(pipeline.path, "old-entity.csv")
530+
if os.path.exists(old_entity_path):
531+
package.load_old_entities(old_entity_path)
532+
533+
issue_paths = os.path.join(issue_dir, dataset)
534+
if os.path.exists(issue_paths):
535+
for issue_path in os.listdir(issue_paths):
536+
package.load_issues(os.path.join(issue_paths, issue_path))
537+
else:
538+
logging.warning("No directory for this dataset in the provided issue_directory")
539+
540+
package.add_counts()
541+
542+
477543
def dataset_dump(input_path, output_path):
478544
cmd = f"sqlite3 -header -csv {input_path} 'select * from entity;' > {output_path}"
479545
logging.info(cmd)

digital_land/package/sqlite.py

+35-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import csv
44
import sqlite3
55
import logging
6+
import boto3
7+
import botocore.exceptions
68
from .package import Package
79
from decimal import Decimal
810

@@ -298,15 +300,46 @@ def create_database(self):
298300
if os.path.exists(self.path):
299301
os.remove(self.path)
300302

303+
self.set_up_connection()
304+
305+
self.create_tables()
306+
307+
def set_up_connection(self):
301308
self.connect()
302309

303310
if self._spatialite:
304311
self.connection.execute("select InitSpatialMetadata(1)")
305312

306-
self.create_tables()
307-
308313
def create(self):
309314
self.create_database()
310315
self.load()
311316
self.create_indexes()
312317
self.disconnect()
318+
319+
def load_from_s3(self, bucket_name, object_key, table_name):
320+
# Ensure parameters are valid
321+
if not isinstance(bucket_name, str) or not isinstance(object_key, str):
322+
raise ValueError("Bucket name and object key must be strings.")
323+
324+
local_path = os.path.dirname(self.path)
325+
s3 = boto3.client("s3")
326+
327+
file_key = f"{table_name}.sqlite3"
328+
local_file_path = os.path.join(local_path, file_key)
329+
330+
try:
331+
os.makedirs(local_path, exist_ok=True) # Ensure local directory exists
332+
s3.download_file(bucket_name, object_key + "/" + file_key, local_file_path)
333+
except botocore.exceptions.NoCredentialsError:
334+
logger.error(
335+
"❌ AWS credentials not found. Run `aws configure` to set them."
336+
)
337+
except botocore.exceptions.ParamValidationError as e:
338+
logger.error(f"❌ Parameter validation error: {e}")
339+
except botocore.exceptions.ClientError as e:
340+
logger.error(f"❌ AWS S3 error: {e}")
341+
342+
self.set_up_connection()
343+
self.load()
344+
# self.create_indexes()# Do we need this?
345+
self.disconnect()

setup.py

+2
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ def get_long_description():
5858
"dask[dataframe]",
5959
"pyarrow",
6060
"pygit2",
61+
"boto3",
62+
"moto",
6163
],
6264
entry_points={"console_scripts": ["digital-land=digital_land.cli:cli"]},
6365
setup_requires=["pytest-runner"],

tests/integration/package/test_dataset.py

+112
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,7 @@ def test_entry_date_upsert_uploads_blank_fields(
371371
# run upload to fact table not fact resource for testing the upsert
372372
package.connect()
373373
package.create_cursor()
374+
374375
fact_fields = package.specification.schema["fact"]["fields"]
375376
fact_conflict_fields = ["fact"]
376377
fact_update_fields = [
@@ -406,3 +407,114 @@ def test_entry_date_upsert_uploads_blank_fields(
406407
]
407408

408409
assert actual_result == expected_result, "actual result does not match query"
410+
411+
412+
def test_insert_newest_date(
413+
specification_dir,
414+
organisation_csv,
415+
blank_patch_csv,
416+
transformed_fact_resources,
417+
transformed_fact_resources_with_blank,
418+
tmp_path,
419+
):
420+
dataset = "conservation-area"
421+
sqlite3_path = os.path.join(tmp_path, f"{dataset}.sqlite3")
422+
423+
organisation = Organisation(
424+
organisation_path=organisation_csv,
425+
pipeline_dir=Path(os.path.dirname(blank_patch_csv)),
426+
)
427+
package = DatasetPackage(
428+
"conservation-area",
429+
organisation=organisation,
430+
path=sqlite3_path,
431+
specification_dir=specification_dir, # TBD: package should use this specification object
432+
)
433+
434+
# create package
435+
package.create()
436+
437+
# run upload to fact table not fact resource for testing the upsert
438+
package.connect()
439+
package.create_cursor()
440+
441+
fact_resource_fields = package.specification.schema["fact-resource"]["fields"]
442+
for row in transformed_fact_resources:
443+
package.insert("fact-resource", fact_resource_fields, row, upsert=True)
444+
package.commit()
445+
package.disconnect()
446+
447+
# retrieve results
448+
package.connect()
449+
package.create_cursor()
450+
package.cursor.execute("SELECT * FROM fact_resource;")
451+
cols = [column[0] for column in package.cursor.description]
452+
actual_result = pd.DataFrame.from_records(
453+
package.cursor.fetchall(), columns=cols
454+
).to_dict(orient="records")
455+
expected_result = [
456+
{
457+
"end_date": "",
458+
"fact": "1f90248fd06e49accd42b80e43d58beeac300f942f1a9f71da4b64865356b1f3",
459+
"entry_date": "2021-09-06",
460+
"entry_number": None,
461+
"priority": None,
462+
"resource": "",
463+
"start_date": "",
464+
},
465+
{
466+
"end_date": "",
467+
"fact": "1f90248fd06e49accd42b80e43d58beeac300f942f1a9f71da4b64865356b1f3",
468+
"entry_date": "2022-11-02",
469+
"entry_number": None,
470+
"priority": None,
471+
"resource": "",
472+
"start_date": "",
473+
},
474+
]
475+
476+
assert actual_result == expected_result, "actual result does not match query"
477+
478+
# create package
479+
package.create()
480+
481+
# run upload to fact table not fact resource for testing the upsert
482+
package.connect()
483+
package.create_cursor()
484+
485+
fact_resource_fields = package.specification.schema["fact-resource"]["fields"]
486+
for row in transformed_fact_resources_with_blank:
487+
package.insert("fact-resource", fact_resource_fields, row, upsert=True)
488+
package.commit()
489+
package.disconnect()
490+
491+
# retrieve results
492+
package.connect()
493+
package.create_cursor()
494+
package.cursor.execute("SELECT * FROM fact_resource;")
495+
cols = [column[0] for column in package.cursor.description]
496+
actual_result = pd.DataFrame.from_records(
497+
package.cursor.fetchall(), columns=cols
498+
).to_dict(orient="records")
499+
expected_result = [
500+
{
501+
"end_date": "2021-12-31",
502+
"fact": "1f90248fd06e49accd42b80e43d58beeac300f942f1a9f71da4b64865356b1f3",
503+
"entry_date": "2021-09-06",
504+
"entry_number": None,
505+
"priority": None,
506+
"resource": "",
507+
"start_date": "",
508+
},
509+
{
510+
"end_date": "",
511+
"fact": "1f90248fd06e49accd42b80e43d58beeac300f942f1a9f71da4b64865356b1f3",
512+
"entry_date": "2022-11-02",
513+
"entry_number": None,
514+
"priority": None,
515+
"resource": "",
516+
"start_date": "",
517+
},
518+
]
519+
520+
assert actual_result == expected_result, "actual result does not match query"

0 commit comments

Comments
 (0)