Skip to content

Commit e2b6924

Browse files
authored
Feat/parquet owen (#325)
* add convert to parquet function in and run in the pipeline * correct imports * correct arg * cureent arguement names * correct input for csv to pq * add parquet package back in * tweak to remove temptable * add updated tests * make acceptance test work * ensure no error if all orgs are blank * add converted resources to the cache directory * change to cache directory * change to cache directory default * use output path in other function * use a specific path for duck.db database to help with overflow * make query entity range specific * add condition for no range * update arguement in wrong place * start range in correct place * get value from tuple * correct max in for loop * iterate file count * build a single file * remove cache_dir thats not provided * make the right directory * need remove whole tree * run full command * change logginng level * make parquet package prinnt info logs * make intgeger * include entity field change * ensure where clause isnt used if no range is supplied * use better logging * logging tweaks * add loading loggings * remove indexes before loading * add crreate cursur * remove error and some wrong validation * chacnge package structure * align with proper convert changes * fix convert tests * update file for black with pyton 3.9 * remove custom_tmp_dir * remove document and documentation-url as theyve been removed from the spec * correct test * correct column mapping test * correct lookup phase test * fix test after changes parquet structure * remove tmp_dir * try name change
1 parent 3b8fcb4 commit e2b6924

23 files changed

+2032
-988
lines changed

digital_land/cli.py

+23-3
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,18 @@ def convert_cmd(input_path, output_path):
147147
@column_field_dir
148148
@dataset_resource_dir
149149
@issue_dir
150+
@click.option(
151+
"--cache-dir",
152+
type=click.Path(),
153+
default="var/cache",
154+
help="link to a cache directory to store temporary data that can be deleted once process is finished",
155+
)
156+
@click.option(
157+
"--resource-path",
158+
type=click.Path(exists=True),
159+
default="collection/resource.csv",
160+
help="link to where the resource list is stored",
161+
)
150162
@click.argument("input-paths", nargs=-1, type=click.Path(exists=True))
151163
@click.pass_context
152164
def dataset_create_cmd(
@@ -157,6 +169,8 @@ def dataset_create_cmd(
157169
column_field_dir,
158170
dataset_resource_dir,
159171
issue_dir,
172+
cache_dir,
173+
resource_path,
160174
):
161175
return dataset_create(
162176
input_paths=input_paths,
@@ -168,6 +182,8 @@ def dataset_create_cmd(
168182
column_field_dir=column_field_dir,
169183
dataset_resource_dir=dataset_resource_dir,
170184
issue_dir=issue_dir,
185+
cache_dir=cache_dir,
186+
resource_path=resource_path,
171187
)
172188

173189

@@ -194,7 +210,11 @@ def dataset_dump_flattened_cmd(ctx, input_path, output_path):
194210
@click.option("--endpoints", help="list of endpoint hashes", default="")
195211
@click.option("--organisations", help="list of organisations", default="")
196212
@click.option("--entry-date", help="default entry-date value", default="")
197-
@click.option("--custom-temp-dir", help="default temporary directory", default=None)
213+
@click.option(
214+
"--cache-dir",
215+
help="cache directory to store conveted files etc. in",
216+
default="var/cache",
217+
)
198218
@click.option("--config-path", help="Path to a configuration sqlite", default=None)
199219
@click.option(
200220
"--resource",
@@ -224,7 +244,7 @@ def pipeline_command(
224244
endpoints,
225245
organisations,
226246
entry_date,
227-
custom_temp_dir,
247+
cache_dir,
228248
collection_dir,
229249
operational_issue_dir,
230250
config_path,
@@ -255,7 +275,7 @@ def pipeline_command(
255275
endpoints=endpoints,
256276
organisations=organisations,
257277
entry_date=entry_date,
258-
custom_temp_dir=custom_temp_dir,
278+
cache_dir=cache_dir,
259279
config_path=config_path,
260280
resource=resource,
261281
output_log_dir=output_log_dir,

digital_land/commands.py

+81-19
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828
ConvertedResourceLog,
2929
)
3030
from digital_land.organisation import Organisation
31+
3132
from digital_land.package.dataset import DatasetPackage
33+
from digital_land.package.dataset_parquet import DatasetParquetPackage
3234
from digital_land.phase.combine import FactCombinePhase
3335
from digital_land.phase.concat import ConcatFieldPhase
3436
from digital_land.phase.convert import ConvertPhase, execute
@@ -55,6 +57,7 @@
5557
from digital_land.phase.reference import EntityReferencePhase, FactReferencePhase
5658
from digital_land.phase.save import SavePhase
5759
from digital_land.pipeline import run_pipeline, Lookups, Pipeline
60+
from digital_land.pipeline.process import convert_tranformed_csv_to_pq
5861
from digital_land.schema import Schema
5962
from digital_land.update import add_source_endpoint
6063
from digital_land.configuration.main import Config
@@ -172,17 +175,17 @@ def collection_retire_endpoints_and_sources(
172175
#
173176
# pipeline commands
174177
#
175-
def convert(input_path, output_path, custom_temp_dir=None):
178+
def convert(input_path, output_path):
176179
if not output_path:
177180
output_path = default_output_path("converted", input_path)
178181
dataset_resource_log = DatasetResourceLog()
179182
converted_resource_log = ConvertedResourceLog()
183+
# TBD this actualy duplictaes the data and does nothing else, should just convert it?
180184
run_pipeline(
181185
ConvertPhase(
182186
input_path,
183187
dataset_resource_log=dataset_resource_log,
184188
converted_resource_log=converted_resource_log,
185-
custom_temp_dir=custom_temp_dir,
186189
),
187190
DumpPhase(output_path),
188191
)
@@ -201,10 +204,11 @@ def pipeline_run(
201204
operational_issue_dir="performance/operational_issue/",
202205
organisation_path=None,
203206
save_harmonised=False,
207+
# TBD save all logs in a log directory, this will mean only one path passed in.
204208
column_field_dir=None,
205209
dataset_resource_dir=None,
206210
converted_resource_dir=None,
207-
custom_temp_dir=None, # TBD: rename to "tmpdir"
211+
cache_dir="var/cache",
208212
endpoints=[],
209213
organisations=[],
210214
entry_date="",
@@ -213,6 +217,9 @@ def pipeline_run(
213217
output_log_dir=None,
214218
converted_path=None,
215219
):
220+
# set up paths
221+
cache_dir = Path(cache_dir)
222+
216223
if resource is None:
217224
resource = resource_from_path(input_path)
218225
dataset = dataset
@@ -276,7 +283,6 @@ def pipeline_run(
276283
path=input_path,
277284
dataset_resource_log=dataset_resource_log,
278285
converted_resource_log=converted_resource_log,
279-
custom_temp_dir=custom_temp_dir,
280286
output_path=converted_path,
281287
),
282288
NormalisePhase(skip_patterns=skip_patterns, null_path=null_path),
@@ -353,9 +359,18 @@ def pipeline_run(
353359
issue_log.save(os.path.join(issue_dir, resource + ".csv"))
354360
issue_log.save_parquet(os.path.join(output_log_dir, "issue/"))
355361
operational_issue_log.save(output_dir=operational_issue_dir)
356-
column_field_log.save(os.path.join(column_field_dir, resource + ".csv"))
362+
if column_field_dir:
363+
column_field_log.save(os.path.join(column_field_dir, resource + ".csv"))
357364
dataset_resource_log.save(os.path.join(dataset_resource_dir, resource + ".csv"))
358365
converted_resource_log.save(os.path.join(converted_resource_dir, resource + ".csv"))
366+
# create converted parquet in the var directory
367+
cache_dir = Path(organisation_path).parent
368+
transformed_parquet_dir = cache_dir / "transformed_parquet" / dataset
369+
transformed_parquet_dir.mkdir(exist_ok=True, parents=True)
370+
convert_tranformed_csv_to_pq(
371+
input_path=output_path,
372+
output_path=transformed_parquet_dir / f"{resource}.parquet",
373+
)
359374

360375

361376
#
@@ -371,42 +386,91 @@ def dataset_create(
371386
issue_dir="issue",
372387
column_field_dir="var/column-field",
373388
dataset_resource_dir="var/dataset-resource",
389+
cache_dir="var/cache",
390+
resource_path="collection/resource.csv",
374391
):
392+
# set level for logging to see what's going on
393+
logger.setLevel(logging.INFO)
394+
logging.getLogger("digital_land.package.dataset_parquet").setLevel(logging.INFO)
395+
396+
# chek all paths are paths
397+
issue_dir = Path(issue_dir)
398+
column_field_dir = Path(column_field_dir)
399+
dataset_resource_dir = Path(dataset_resource_dir)
400+
cache_dir = Path(cache_dir)
401+
resource_path = Path(resource_path)
402+
403+
# get the transformed files from the cache directory this is assumed right now but we may want to be stricter in the future
404+
transformed_parquet_dir = cache_dir / "transformed_parquet" / dataset
405+
406+
# creat directory for dataset_parquet_package, will create a general provenance one for now
407+
dataset_parquet_path = cache_dir / "provenance"
408+
375409
if not output_path:
376410
print("missing output path", file=sys.stderr)
377411
sys.exit(2)
378412

379413
# Set up initial objects
380-
column_field_dir = Path(column_field_dir)
381-
dataset_resource_dir = Path(dataset_resource_dir)
382414
organisation = Organisation(
383415
organisation_path=organisation_path, pipeline_dir=Path(pipeline.path)
384416
)
417+
418+
# create sqlite dataset packageas before and load inn data that isn't in the parquetpackage yet
385419
package = DatasetPackage(
386420
dataset,
387421
organisation=organisation,
388422
path=output_path,
389423
specification_dir=None, # TBD: package should use this specification object
390424
)
391-
package.create()
425+
# don'tt use create as we don't want to create the indexes
426+
package.create_database()
427+
package.disconnect()
392428
for path in input_paths:
393429
path_obj = Path(path)
394-
package.load_transformed(path)
395-
package.load_column_fields(column_field_dir / dataset / path_obj.name)
396-
package.load_dataset_resource(dataset_resource_dir / dataset / path_obj.name)
397-
package.load_entities()
398-
399-
old_entity_path = os.path.join(pipeline.path, "old-entity.csv")
400-
if os.path.exists(old_entity_path):
430+
logging.info(f"loading column field log into {output_path}")
431+
package.load_column_fields(column_field_dir / dataset / f"{path_obj.stem}.csv")
432+
logging.info(f"loading dataset resource log into {output_path}")
433+
package.load_dataset_resource(
434+
dataset_resource_dir / dataset / f"{path_obj.stem}.csv"
435+
)
436+
logging.info(f"loading old entities into {output_path}")
437+
old_entity_path = Path(pipeline.path) / "old-entity.csv"
438+
if old_entity_path.exists():
401439
package.load_old_entities(old_entity_path)
402440

403-
issue_paths = os.path.join(issue_dir, dataset)
404-
if os.path.exists(issue_paths):
441+
logging.info(f"loading issues into {output_path}")
442+
issue_paths = issue_dir / dataset
443+
if issue_paths.exists():
405444
for issue_path in os.listdir(issue_paths):
406445
package.load_issues(os.path.join(issue_paths, issue_path))
407446
else:
408447
logging.warning("No directory for this dataset in the provided issue_directory")
409448

449+
# Repeat for parquet
450+
# Set up cache directory to store parquet files. The sqlite files created from this will be saved in the dataset
451+
if not os.path.exists(cache_dir):
452+
os.makedirs(cache_dir)
453+
454+
pqpackage = DatasetParquetPackage(
455+
dataset,
456+
path=dataset_parquet_path,
457+
specification_dir=None, # TBD: package should use this specification object
458+
duckdb_path=cache_dir / "overflow.duckdb",
459+
)
460+
pqpackage.load_facts(transformed_parquet_dir)
461+
pqpackage.load_fact_resource(transformed_parquet_dir)
462+
pqpackage.load_entities(transformed_parquet_dir, resource_path, organisation_path)
463+
464+
logger.info("loading fact,fact_resource and entity into {output_path}")
465+
pqpackage.load_to_sqlite(output_path)
466+
467+
logger.info(f"add indexes to {output_path}")
468+
package.connect()
469+
package.create_cursor()
470+
package.create_indexes()
471+
package.disconnect()
472+
473+
logger.info(f"creating dataset package {output_path} counts")
410474
package.add_counts()
411475

412476

@@ -1148,7 +1212,6 @@ def get_resource_unidentified_lookups(
11481212
# could alter resource_from_path to file from path and promote to a utils folder
11491213
resource = resource_from_path(input_path)
11501214
dataset_resource_log = DatasetResourceLog(dataset=dataset, resource=resource)
1151-
custom_temp_dir = tmp_dir # './var'
11521215

11531216
print("")
11541217
print("----------------------------------------------------------------------")
@@ -1202,7 +1265,6 @@ def get_resource_unidentified_lookups(
12021265
ConvertPhase(
12031266
path=input_path,
12041267
dataset_resource_log=dataset_resource_log,
1205-
custom_temp_dir=custom_temp_dir,
12061268
),
12071269
NormalisePhase(skip_patterns=skip_patterns, null_path=null_path),
12081270
ParsePhase(),

0 commit comments

Comments
 (0)