Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/parquet owen #325

Merged
merged 52 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
5ed2010
add convert to parquet function in and run in the pipeline
eveleighoj Dec 5, 2024
8622c43
correct imports
eveleighoj Dec 5, 2024
ff82675
correct arg
eveleighoj Dec 5, 2024
f93068f
cureent arguement names
eveleighoj Dec 5, 2024
271e664
correct input for csv to pq
eveleighoj Dec 5, 2024
cdd480e
add parquet package back in
eveleighoj Dec 5, 2024
e6ac9bf
tweak to remove temptable
eveleighoj Dec 5, 2024
37ad0c7
add updated tests
eveleighoj Jan 6, 2025
5a621ae
make acceptance test work
eveleighoj Jan 8, 2025
60ee6fe
ensure no error if all orgs are blank
eveleighoj Jan 10, 2025
792b9a6
add converted resources to the cache directory
eveleighoj Jan 11, 2025
a0a445e
change to cache directory
eveleighoj Jan 11, 2025
b1d08c7
change to cache directory default
eveleighoj Jan 11, 2025
175f465
use output path in other function
eveleighoj Jan 12, 2025
cbb5eec
use a specific path for duck.db database to help with overflow
eveleighoj Jan 14, 2025
f56cf08
make query entity range specific
eveleighoj Jan 15, 2025
7c04686
add condition for no range
eveleighoj Jan 15, 2025
ccd1dda
update arguement in wrong place
eveleighoj Jan 15, 2025
753b40a
start range in correct place
eveleighoj Jan 15, 2025
25f7618
get value from tuple
eveleighoj Jan 15, 2025
1b8f7a4
correct max in for loop
eveleighoj Jan 15, 2025
73835e9
iterate file count
eveleighoj Jan 16, 2025
8938a25
build a single file
eveleighoj Jan 16, 2025
0f590eb
remove cache_dir thats not provided
eveleighoj Jan 16, 2025
e7a3f7f
make the right directory
eveleighoj Jan 16, 2025
f0722c5
need remove whole tree
eveleighoj Jan 16, 2025
b89079a
run full command
eveleighoj Jan 16, 2025
57feeb8
change logginng level
eveleighoj Jan 16, 2025
2917f43
make parquet package prinnt info logs
eveleighoj Jan 16, 2025
3d3abd9
make intgeger
eveleighoj Jan 16, 2025
3453840
include entity field change
eveleighoj Jan 16, 2025
f43907d
ensure where clause isnt used if no range is supplied
eveleighoj Jan 16, 2025
c94d4f1
use better logging
eveleighoj Jan 16, 2025
8ec07f9
logging tweaks
eveleighoj Jan 16, 2025
bd0749d
add loading loggings
eveleighoj Jan 16, 2025
ba48403
remove indexes before loading
eveleighoj Jan 16, 2025
0bf17f2
add crreate cursur
eveleighoj Jan 16, 2025
0183d9b
remove error and some wrong validation
eveleighoj Jan 23, 2025
4a605ad
chacnge package structure
eveleighoj Jan 24, 2025
14e015b
merge in main
eveleighoj Jan 24, 2025
1b34bd1
align with proper convert changes
eveleighoj Jan 24, 2025
1b29314
fix convert tests
eveleighoj Feb 11, 2025
930a180
update file for black with pyton 3.9
eveleighoj Feb 11, 2025
40ce011
remove custom_tmp_dir
eveleighoj Feb 12, 2025
f326c4a
remove document and documentation-url as theyve been removed from the…
eveleighoj Feb 12, 2025
b2c40ef
correct test
eveleighoj Feb 13, 2025
e3707f6
correct column mapping test
eveleighoj Feb 13, 2025
2d62a83
correct lookup phase test
eveleighoj Feb 13, 2025
d258699
fix test after changes parquet structure
eveleighoj Feb 13, 2025
4ffcfcc
remove tmp_dir
eveleighoj Feb 13, 2025
bf3566b
try name change
eveleighoj Feb 13, 2025
320ef2e
Merge branch 'main' into feat/parquet_owen
eveleighoj Feb 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions digital_land/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,18 @@ def convert_cmd(input_path, output_path):
@column_field_dir
@dataset_resource_dir
@issue_dir
@click.option(
"--cache-dir",
type=click.Path(),
default="var/cache",
help="link to a cache directory to store temporary data that can be deleted once process is finished",
)
@click.option(
"--resource-path",
type=click.Path(exists=True),
default="collection/resource.csv",
help="link to where the resource list is stored",
)
@click.argument("input-paths", nargs=-1, type=click.Path(exists=True))
@click.pass_context
def dataset_create_cmd(
Expand All @@ -157,6 +169,8 @@ def dataset_create_cmd(
column_field_dir,
dataset_resource_dir,
issue_dir,
cache_dir,
resource_path,
):
return dataset_create(
input_paths=input_paths,
Expand All @@ -168,6 +182,8 @@ def dataset_create_cmd(
column_field_dir=column_field_dir,
dataset_resource_dir=dataset_resource_dir,
issue_dir=issue_dir,
cache_dir=cache_dir,
resource_path=resource_path,
)


Expand All @@ -194,7 +210,11 @@ def dataset_dump_flattened_cmd(ctx, input_path, output_path):
@click.option("--endpoints", help="list of endpoint hashes", default="")
@click.option("--organisations", help="list of organisations", default="")
@click.option("--entry-date", help="default entry-date value", default="")
@click.option("--custom-temp-dir", help="default temporary directory", default=None)
@click.option(
"--cache-dir",
help="cache directory to store conveted files etc. in",
default="var/cache",
)
@click.option("--config-path", help="Path to a configuration sqlite", default=None)
@click.option(
"--resource",
Expand Down Expand Up @@ -224,7 +244,7 @@ def pipeline_command(
endpoints,
organisations,
entry_date,
custom_temp_dir,
cache_dir,
collection_dir,
operational_issue_dir,
config_path,
Expand Down Expand Up @@ -255,7 +275,7 @@ def pipeline_command(
endpoints=endpoints,
organisations=organisations,
entry_date=entry_date,
custom_temp_dir=custom_temp_dir,
cache_dir=cache_dir,
config_path=config_path,
resource=resource,
output_log_dir=output_log_dir,
Expand Down
100 changes: 81 additions & 19 deletions digital_land/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
ConvertedResourceLog,
)
from digital_land.organisation import Organisation

from digital_land.package.dataset import DatasetPackage
from digital_land.package.dataset_parquet import DatasetParquetPackage
from digital_land.phase.combine import FactCombinePhase
from digital_land.phase.concat import ConcatFieldPhase
from digital_land.phase.convert import ConvertPhase, execute
Expand All @@ -55,6 +57,7 @@
from digital_land.phase.reference import EntityReferencePhase, FactReferencePhase
from digital_land.phase.save import SavePhase
from digital_land.pipeline import run_pipeline, Lookups, Pipeline
from digital_land.pipeline.process import convert_tranformed_csv_to_pq
from digital_land.schema import Schema
from digital_land.update import add_source_endpoint
from digital_land.configuration.main import Config
Expand Down Expand Up @@ -172,17 +175,17 @@ def collection_retire_endpoints_and_sources(
#
# pipeline commands
#
def convert(input_path, output_path, custom_temp_dir=None):
def convert(input_path, output_path):
if not output_path:
output_path = default_output_path("converted", input_path)
dataset_resource_log = DatasetResourceLog()
converted_resource_log = ConvertedResourceLog()
# TBD this actualy duplictaes the data and does nothing else, should just convert it?
run_pipeline(
ConvertPhase(
input_path,
dataset_resource_log=dataset_resource_log,
converted_resource_log=converted_resource_log,
custom_temp_dir=custom_temp_dir,
),
DumpPhase(output_path),
)
Expand All @@ -201,10 +204,11 @@ def pipeline_run(
operational_issue_dir="performance/operational_issue/",
organisation_path=None,
save_harmonised=False,
# TBD save all logs in a log directory, this will mean only one path passed in.
column_field_dir=None,
dataset_resource_dir=None,
converted_resource_dir=None,
custom_temp_dir=None, # TBD: rename to "tmpdir"
cache_dir="var/cache",
endpoints=[],
organisations=[],
entry_date="",
Expand All @@ -213,6 +217,9 @@ def pipeline_run(
output_log_dir=None,
converted_path=None,
):
# set up paths
cache_dir = Path(cache_dir)

if resource is None:
resource = resource_from_path(input_path)
dataset = dataset
Expand Down Expand Up @@ -276,7 +283,6 @@ def pipeline_run(
path=input_path,
dataset_resource_log=dataset_resource_log,
converted_resource_log=converted_resource_log,
custom_temp_dir=custom_temp_dir,
output_path=converted_path,
),
NormalisePhase(skip_patterns=skip_patterns, null_path=null_path),
Expand Down Expand Up @@ -353,9 +359,18 @@ def pipeline_run(
issue_log.save(os.path.join(issue_dir, resource + ".csv"))
issue_log.save_parquet(os.path.join(output_log_dir, "issue/"))
operational_issue_log.save(output_dir=operational_issue_dir)
column_field_log.save(os.path.join(column_field_dir, resource + ".csv"))
if column_field_dir:
column_field_log.save(os.path.join(column_field_dir, resource + ".csv"))
dataset_resource_log.save(os.path.join(dataset_resource_dir, resource + ".csv"))
converted_resource_log.save(os.path.join(converted_resource_dir, resource + ".csv"))
# create converted parquet in the var directory
cache_dir = Path(organisation_path).parent
transformed_parquet_dir = cache_dir / "transformed_parquet" / dataset
transformed_parquet_dir.mkdir(exist_ok=True, parents=True)
convert_tranformed_csv_to_pq(
input_path=output_path,
output_path=transformed_parquet_dir / f"{resource}.parquet",
)


#
Expand All @@ -371,42 +386,91 @@ def dataset_create(
issue_dir="issue",
column_field_dir="var/column-field",
dataset_resource_dir="var/dataset-resource",
cache_dir="var/cache",
resource_path="collection/resource.csv",
):
# set level for logging to see what's going on
logger.setLevel(logging.INFO)
logging.getLogger("digital_land.package.dataset_parquet").setLevel(logging.INFO)

# chek all paths are paths
issue_dir = Path(issue_dir)
column_field_dir = Path(column_field_dir)
dataset_resource_dir = Path(dataset_resource_dir)
cache_dir = Path(cache_dir)
resource_path = Path(resource_path)

# get the transformed files from the cache directory this is assumed right now but we may want to be stricter in the future
transformed_parquet_dir = cache_dir / "transformed_parquet" / dataset

# creat directory for dataset_parquet_package, will create a general provenance one for now
dataset_parquet_path = cache_dir / "provenance"

if not output_path:
print("missing output path", file=sys.stderr)
sys.exit(2)

# Set up initial objects
column_field_dir = Path(column_field_dir)
dataset_resource_dir = Path(dataset_resource_dir)
organisation = Organisation(
organisation_path=organisation_path, pipeline_dir=Path(pipeline.path)
)

# create sqlite dataset packageas before and load inn data that isn't in the parquetpackage yet
package = DatasetPackage(
dataset,
organisation=organisation,
path=output_path,
specification_dir=None, # TBD: package should use this specification object
)
package.create()
# don'tt use create as we don't want to create the indexes
package.create_database()
package.disconnect()
for path in input_paths:
path_obj = Path(path)
package.load_transformed(path)
package.load_column_fields(column_field_dir / dataset / path_obj.name)
package.load_dataset_resource(dataset_resource_dir / dataset / path_obj.name)
package.load_entities()

old_entity_path = os.path.join(pipeline.path, "old-entity.csv")
if os.path.exists(old_entity_path):
logging.info(f"loading column field log into {output_path}")
package.load_column_fields(column_field_dir / dataset / f"{path_obj.stem}.csv")
logging.info(f"loading dataset resource log into {output_path}")
package.load_dataset_resource(
dataset_resource_dir / dataset / f"{path_obj.stem}.csv"
)
logging.info(f"loading old entities into {output_path}")
old_entity_path = Path(pipeline.path) / "old-entity.csv"
if old_entity_path.exists():
package.load_old_entities(old_entity_path)

issue_paths = os.path.join(issue_dir, dataset)
if os.path.exists(issue_paths):
logging.info(f"loading issues into {output_path}")
issue_paths = issue_dir / dataset
if issue_paths.exists():
for issue_path in os.listdir(issue_paths):
package.load_issues(os.path.join(issue_paths, issue_path))
else:
logging.warning("No directory for this dataset in the provided issue_directory")

# Repeat for parquet
# Set up cache directory to store parquet files. The sqlite files created from this will be saved in the dataset
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)

pqpackage = DatasetParquetPackage(
dataset,
path=dataset_parquet_path,
specification_dir=None, # TBD: package should use this specification object
duckdb_path=cache_dir / "overflow.duckdb",
)
pqpackage.load_facts(transformed_parquet_dir)
pqpackage.load_fact_resource(transformed_parquet_dir)
pqpackage.load_entities(transformed_parquet_dir, resource_path, organisation_path)

logger.info("loading fact,fact_resource and entity into {output_path}")
pqpackage.load_to_sqlite(output_path)

logger.info(f"add indexes to {output_path}")
package.connect()
package.create_cursor()
package.create_indexes()
package.disconnect()

logger.info(f"creating dataset package {output_path} counts")
package.add_counts()


Expand Down Expand Up @@ -1148,7 +1212,6 @@ def get_resource_unidentified_lookups(
# could alter resource_from_path to file from path and promote to a utils folder
resource = resource_from_path(input_path)
dataset_resource_log = DatasetResourceLog(dataset=dataset, resource=resource)
custom_temp_dir = tmp_dir # './var'

print("")
print("----------------------------------------------------------------------")
Expand Down Expand Up @@ -1202,7 +1265,6 @@ def get_resource_unidentified_lookups(
ConvertPhase(
path=input_path,
dataset_resource_log=dataset_resource_log,
custom_temp_dir=custom_temp_dir,
),
NormalisePhase(skip_patterns=skip_patterns, null_path=null_path),
ParsePhase(),
Expand Down
Loading