28
28
ConvertedResourceLog ,
29
29
)
30
30
from digital_land .organisation import Organisation
31
+
31
32
from digital_land .package .dataset import DatasetPackage
33
+ from digital_land .package .dataset_parquet import DatasetParquetPackage
32
34
from digital_land .phase .combine import FactCombinePhase
33
35
from digital_land .phase .concat import ConcatFieldPhase
34
36
from digital_land .phase .convert import ConvertPhase , execute
55
57
from digital_land .phase .reference import EntityReferencePhase , FactReferencePhase
56
58
from digital_land .phase .save import SavePhase
57
59
from digital_land .pipeline import run_pipeline , Lookups , Pipeline
60
+ from digital_land .pipeline .process import convert_tranformed_csv_to_pq
58
61
from digital_land .schema import Schema
59
62
from digital_land .update import add_source_endpoint
60
63
from digital_land .configuration .main import Config
@@ -172,17 +175,17 @@ def collection_retire_endpoints_and_sources(
172
175
#
173
176
# pipeline commands
174
177
#
175
- def convert (input_path , output_path , custom_temp_dir = None ):
178
+ def convert (input_path , output_path ):
176
179
if not output_path :
177
180
output_path = default_output_path ("converted" , input_path )
178
181
dataset_resource_log = DatasetResourceLog ()
179
182
converted_resource_log = ConvertedResourceLog ()
183
+ # TBD this actualy duplictaes the data and does nothing else, should just convert it?
180
184
run_pipeline (
181
185
ConvertPhase (
182
186
input_path ,
183
187
dataset_resource_log = dataset_resource_log ,
184
188
converted_resource_log = converted_resource_log ,
185
- custom_temp_dir = custom_temp_dir ,
186
189
),
187
190
DumpPhase (output_path ),
188
191
)
@@ -201,10 +204,11 @@ def pipeline_run(
201
204
operational_issue_dir = "performance/operational_issue/" ,
202
205
organisation_path = None ,
203
206
save_harmonised = False ,
207
+ # TBD save all logs in a log directory, this will mean only one path passed in.
204
208
column_field_dir = None ,
205
209
dataset_resource_dir = None ,
206
210
converted_resource_dir = None ,
207
- custom_temp_dir = None , # TBD: rename to "tmpdir"
211
+ cache_dir = "var/cache" ,
208
212
endpoints = [],
209
213
organisations = [],
210
214
entry_date = "" ,
@@ -213,6 +217,9 @@ def pipeline_run(
213
217
output_log_dir = None ,
214
218
converted_path = None ,
215
219
):
220
+ # set up paths
221
+ cache_dir = Path (cache_dir )
222
+
216
223
if resource is None :
217
224
resource = resource_from_path (input_path )
218
225
dataset = dataset
@@ -276,7 +283,6 @@ def pipeline_run(
276
283
path = input_path ,
277
284
dataset_resource_log = dataset_resource_log ,
278
285
converted_resource_log = converted_resource_log ,
279
- custom_temp_dir = custom_temp_dir ,
280
286
output_path = converted_path ,
281
287
),
282
288
NormalisePhase (skip_patterns = skip_patterns , null_path = null_path ),
@@ -353,9 +359,18 @@ def pipeline_run(
353
359
issue_log .save (os .path .join (issue_dir , resource + ".csv" ))
354
360
issue_log .save_parquet (os .path .join (output_log_dir , "issue/" ))
355
361
operational_issue_log .save (output_dir = operational_issue_dir )
356
- column_field_log .save (os .path .join (column_field_dir , resource + ".csv" ))
362
+ if column_field_dir :
363
+ column_field_log .save (os .path .join (column_field_dir , resource + ".csv" ))
357
364
dataset_resource_log .save (os .path .join (dataset_resource_dir , resource + ".csv" ))
358
365
converted_resource_log .save (os .path .join (converted_resource_dir , resource + ".csv" ))
366
+ # create converted parquet in the var directory
367
+ cache_dir = Path (organisation_path ).parent
368
+ transformed_parquet_dir = cache_dir / "transformed_parquet" / dataset
369
+ transformed_parquet_dir .mkdir (exist_ok = True , parents = True )
370
+ convert_tranformed_csv_to_pq (
371
+ input_path = output_path ,
372
+ output_path = transformed_parquet_dir / f"{ resource } .parquet" ,
373
+ )
359
374
360
375
361
376
#
@@ -371,42 +386,91 @@ def dataset_create(
371
386
issue_dir = "issue" ,
372
387
column_field_dir = "var/column-field" ,
373
388
dataset_resource_dir = "var/dataset-resource" ,
389
+ cache_dir = "var/cache" ,
390
+ resource_path = "collection/resource.csv" ,
374
391
):
392
+ # set level for logging to see what's going on
393
+ logger .setLevel (logging .INFO )
394
+ logging .getLogger ("digital_land.package.dataset_parquet" ).setLevel (logging .INFO )
395
+
396
+ # chek all paths are paths
397
+ issue_dir = Path (issue_dir )
398
+ column_field_dir = Path (column_field_dir )
399
+ dataset_resource_dir = Path (dataset_resource_dir )
400
+ cache_dir = Path (cache_dir )
401
+ resource_path = Path (resource_path )
402
+
403
+ # get the transformed files from the cache directory this is assumed right now but we may want to be stricter in the future
404
+ transformed_parquet_dir = cache_dir / "transformed_parquet" / dataset
405
+
406
+ # creat directory for dataset_parquet_package, will create a general provenance one for now
407
+ dataset_parquet_path = cache_dir / "provenance"
408
+
375
409
if not output_path :
376
410
print ("missing output path" , file = sys .stderr )
377
411
sys .exit (2 )
378
412
379
413
# Set up initial objects
380
- column_field_dir = Path (column_field_dir )
381
- dataset_resource_dir = Path (dataset_resource_dir )
382
414
organisation = Organisation (
383
415
organisation_path = organisation_path , pipeline_dir = Path (pipeline .path )
384
416
)
417
+
418
+ # create sqlite dataset packageas before and load inn data that isn't in the parquetpackage yet
385
419
package = DatasetPackage (
386
420
dataset ,
387
421
organisation = organisation ,
388
422
path = output_path ,
389
423
specification_dir = None , # TBD: package should use this specification object
390
424
)
391
- package .create ()
425
+ # don'tt use create as we don't want to create the indexes
426
+ package .create_database ()
427
+ package .disconnect ()
392
428
for path in input_paths :
393
429
path_obj = Path (path )
394
- package .load_transformed (path )
395
- package .load_column_fields (column_field_dir / dataset / path_obj .name )
396
- package .load_dataset_resource (dataset_resource_dir / dataset / path_obj .name )
397
- package .load_entities ()
398
-
399
- old_entity_path = os .path .join (pipeline .path , "old-entity.csv" )
400
- if os .path .exists (old_entity_path ):
430
+ logging .info (f"loading column field log into { output_path } " )
431
+ package .load_column_fields (column_field_dir / dataset / f"{ path_obj .stem } .csv" )
432
+ logging .info (f"loading dataset resource log into { output_path } " )
433
+ package .load_dataset_resource (
434
+ dataset_resource_dir / dataset / f"{ path_obj .stem } .csv"
435
+ )
436
+ logging .info (f"loading old entities into { output_path } " )
437
+ old_entity_path = Path (pipeline .path ) / "old-entity.csv"
438
+ if old_entity_path .exists ():
401
439
package .load_old_entities (old_entity_path )
402
440
403
- issue_paths = os .path .join (issue_dir , dataset )
404
- if os .path .exists (issue_paths ):
441
+ logging .info (f"loading issues into { output_path } " )
442
+ issue_paths = issue_dir / dataset
443
+ if issue_paths .exists ():
405
444
for issue_path in os .listdir (issue_paths ):
406
445
package .load_issues (os .path .join (issue_paths , issue_path ))
407
446
else :
408
447
logging .warning ("No directory for this dataset in the provided issue_directory" )
409
448
449
+ # Repeat for parquet
450
+ # Set up cache directory to store parquet files. The sqlite files created from this will be saved in the dataset
451
+ if not os .path .exists (cache_dir ):
452
+ os .makedirs (cache_dir )
453
+
454
+ pqpackage = DatasetParquetPackage (
455
+ dataset ,
456
+ path = dataset_parquet_path ,
457
+ specification_dir = None , # TBD: package should use this specification object
458
+ duckdb_path = cache_dir / "overflow.duckdb" ,
459
+ )
460
+ pqpackage .load_facts (transformed_parquet_dir )
461
+ pqpackage .load_fact_resource (transformed_parquet_dir )
462
+ pqpackage .load_entities (transformed_parquet_dir , resource_path , organisation_path )
463
+
464
+ logger .info ("loading fact,fact_resource and entity into {output_path}" )
465
+ pqpackage .load_to_sqlite (output_path )
466
+
467
+ logger .info (f"add indexes to { output_path } " )
468
+ package .connect ()
469
+ package .create_cursor ()
470
+ package .create_indexes ()
471
+ package .disconnect ()
472
+
473
+ logger .info (f"creating dataset package { output_path } counts" )
410
474
package .add_counts ()
411
475
412
476
@@ -1148,7 +1212,6 @@ def get_resource_unidentified_lookups(
1148
1212
# could alter resource_from_path to file from path and promote to a utils folder
1149
1213
resource = resource_from_path (input_path )
1150
1214
dataset_resource_log = DatasetResourceLog (dataset = dataset , resource = resource )
1151
- custom_temp_dir = tmp_dir # './var'
1152
1215
1153
1216
print ("" )
1154
1217
print ("----------------------------------------------------------------------" )
@@ -1202,7 +1265,6 @@ def get_resource_unidentified_lookups(
1202
1265
ConvertPhase (
1203
1266
path = input_path ,
1204
1267
dataset_resource_log = dataset_resource_log ,
1205
- custom_temp_dir = custom_temp_dir ,
1206
1268
),
1207
1269
NormalisePhase (skip_patterns = skip_patterns , null_path = null_path ),
1208
1270
ParsePhase (),
0 commit comments