Skip to content

Commit

Permalink
[feat] add process split-trip-geometries
Browse files Browse the repository at this point in the history
  • Loading branch information
datanel committed Feb 1, 2024
1 parent e7cdcea commit 8e70b9a
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 14 deletions.
6 changes: 4 additions & 2 deletions docker/debian8/Dockerfile-tyr-worker
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
FROM navitia/master

# Install binary enrich-ntfs-with-addresses from tartare-tools
ENV TARTARE_TOOLS_VERSION="v0.39.0"
# Install some binaries from tartare-tools
ENV TARTARE_TOOLS_VERSION="v0.42.0"
ARG GITHUB_TOKEN
RUN git config --global url."https://x-access-token:${GITHUB_TOKEN}@github.com/hove-io/".insteadOf "ssh://git@github.com/hove-io/"
RUN git clone -b ${TARTARE_TOOLS_VERSION} --depth 1 https://x-access-token:${GITHUB_TOKEN}@github.com/hove-io/tartare-tools

RUN cd tartare-tools \
&& cargo build --release -p enrich-ntfs-with-addresses \
&& cargo build --release -p split-trip-geometries \
&& cp target/release/enrich-ntfs-with-addresses /usr/bin/ \
&& cp target/release/split-trip-geometries /usr/bin/ \
&& cd .. \
&& rm -rf tartare-tools

Expand Down
64 changes: 52 additions & 12 deletions source/tyr/tyr/binarisation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1125,11 +1125,17 @@ def poi2mimir(self, instance_name, input, autocomplete_version, job_id=None, dat
@celery.task(bind=True)
def fusio2s3(self, instance_config, filename, job_id, dataset_uid):
"""Zip fusio file and launch fusio2s3"""
filename = enrich_ntfs_with_addresses("fusio", instance_config, filename, job_id, dataset_uid)

root_dir = os.path.dirname(filename)
loki_dir = os.path.join(root_dir, "for_loki")
os.makedirs(loki_dir, 0o755)

filename = enrich_ntfs_with_addresses("fusio", instance_config, loki_dir, filename, job_id, dataset_uid)
filename = split_trip_geometries(loki_dir, filename, job_id, dataset_uid)
_inner_2s3(self, "fusio", instance_config, filename, job_id, dataset_uid)


def enrich_ntfs_with_addresses(dataset_type, instance_config, filename, job_id, dataset_uid):
def enrich_ntfs_with_addresses(dataset_type, instance_config, loki_dir, filename, job_id, dataset_uid):
"""launch enrich-ntfs-with-addresses"""

job = models.Job.query.get(job_id)
Expand All @@ -1139,13 +1145,8 @@ def enrich_ntfs_with_addresses(dataset_type, instance_config, filename, job_id,
logger = get_instance_logger(instance, task_id=job_id)
filename = zip_if_needed(filename)

file_dir = os.path.dirname(filename)
file_basename = os.path.basename(filename)
output_dir = file_dir + "/for_loki"
os.makedirs(output_dir, 0o755)
output = output_dir + "/" + file_basename

previous_ntfs_path = output_dir + "/previous_ntfs.zip"
output = os.path.join(loki_dir, "with_addresses.zip")
previous_ntfs_path = os.path.join(loki_dir, "/previous_ntfs.zip")

file_key = "{coverage}/{dataset_type}.zip".format(coverage=instance_config.name, dataset_type=dataset_type)

Expand All @@ -1171,11 +1172,50 @@ def enrich_ntfs_with_addresses(dataset_type, instance_config, filename, job_id,
if use_previous_ntfs:
params.extend(["--previous-ntfs", previous_ntfs_path])

binary = "enrich-ntfs-with-addresses"

res = None
with collect_metric(binary, job, dataset_uid):
res = launch_exec(binary, params, logger)
if res != 0:
raise ValueError("{} failed".format(binary))
except:
logger.exception("")
job.state = "failed"
dataset.state = "failed"
raise
finally:
models.db.session.commit()

return output


def split_trip_geometries(loki_dir, filename, job_id, dataset_uid):
"""launch split-trip-geometries"""

job = models.Job.query.get(job_id)
dataset = _retrieve_dataset_and_set_state("fusio", job.id)
instance = job.instance

logger = get_instance_logger(instance, task_id=job_id)
filename = zip_if_needed(filename)

output = os.path.join(loki_dir, "with_split_trip_geometries.zip")

try:
params = [
"--input",
filename,
"--output",
output,
]

binary = "split-trip-geometries"
res = None
with collect_metric("enrich-ntfs-with-addresses", job, dataset_uid):
res = launch_exec("enrich-ntfs-with-addresses", params, logger)
with collect_metric(binary, job, dataset_uid):
res = launch_exec(binary, params, logger)
if res != 0:
raise ValueError("enrich-ntfs-with-addresses failed")
raise ValueError("{} failed".format(binary))
except:
logger.exception("")
job.state = "failed"
Expand Down

0 comments on commit 8e70b9a

Please sign in to comment.