Skip to content

Commit 021ba7d

Browse files
authored
[FSTORE-1522] Deduplicate most of hsml (#322)
* Add missing init in hopsworks_common.engine * Add missing FeatureStoreEncoder to hsfs.util for backwards compatibility * Deduplicate util * Deduplicate kafka_topic * Make direct imports instead of through aliases * Deduplicate tag * Deduplicate dataset_api * Fix tag import * Fix KafkaTopic * Fix mocking of get_obj_from_json * Fix mocking * Fix mocking of hsml.util * Fix mocking and imports of util * Fix KafkaTopic.from_reposnse_json * Fix pandas typechecking in util * Fix mocking in test_predictor * Fix mocking in test_resources * Fix mocking in test_transformer
1 parent a5c28e7 commit 021ba7d

35 files changed

+1024
-1277
lines changed

python/hopsworks_common/core/dataset_api.py

+297-12
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,16 @@
1717
from __future__ import annotations
1818

1919
import copy
20+
import json
2021
import logging
2122
import math
2223
import os
2324
import shutil
2425
import time
2526
from concurrent.futures import ThreadPoolExecutor, wait
27+
from typing import Literal, Optional, Union
2628

27-
from hopsworks_common import client, usage, util
29+
from hopsworks_common import client, tag, usage, util
2830
from hopsworks_common.client.exceptions import DatasetException, RestAPIError
2931
from hopsworks_common.core import inode
3032
from tqdm.auto import tqdm
@@ -42,11 +44,24 @@ class DatasetApi:
4244
def __init__(self):
4345
self._log = logging.getLogger(__name__)
4446

45-
DEFAULT_FLOW_CHUNK_SIZE = 1048576
47+
DEFAULT_UPLOAD_FLOW_CHUNK_SIZE = 10 * 1024 * 1024
48+
DEFAULT_UPLOAD_SIMULTANEOUS_UPLOADS = 3
49+
DEFAULT_UPLOAD_MAX_CHUNK_RETRIES = 1
50+
51+
DEFAULT_DOWNLOAD_FLOW_CHUNK_SIZE = 1024 * 1024
4652
FLOW_PERMANENT_ERRORS = [404, 413, 415, 500, 501]
4753

54+
# alias for backwards-compatibility:
55+
DEFAULT_FLOW_CHUNK_SIZE = DEFAULT_DOWNLOAD_FLOW_CHUNK_SIZE
56+
4857
@usage.method_logger
49-
def download(self, path: str, local_path: str = None, overwrite: bool = False):
58+
def download(
59+
self,
60+
path: str,
61+
local_path: Optional[str] = None,
62+
overwrite: Optional[bool] = False,
63+
chunk_size: int = DEFAULT_DOWNLOAD_FLOW_CHUNK_SIZE,
64+
):
5065
"""Download file from Hopsworks Filesystem to the current working directory.
5166
5267
```python
@@ -64,6 +79,7 @@ def download(self, path: str, local_path: str = None, overwrite: bool = False):
6479
path: path in Hopsworks filesystem to the file
6580
local_path: path where to download the file in the local filesystem
6681
overwrite: overwrite local file if exists
82+
chunk_size: upload chunk size in bytes. Default 1 MB
6783
# Returns
6884
`str`: Path to downloaded file
6985
# Raises
@@ -122,9 +138,7 @@ def download(self, path: str, local_path: str = None, overwrite: bool = False):
122138
self._log.exception("Failed to initialize progress bar.")
123139
self._log.info("Starting download")
124140

125-
for chunk in response.iter_content(
126-
chunk_size=self.DEFAULT_FLOW_CHUNK_SIZE
127-
):
141+
for chunk in response.iter_content(chunk_size=chunk_size):
128142
f.write(chunk)
129143

130144
if pbar is not None:
@@ -143,10 +157,10 @@ def upload(
143157
local_path: str,
144158
upload_path: str,
145159
overwrite: bool = False,
146-
chunk_size=DEFAULT_FLOW_CHUNK_SIZE,
147-
simultaneous_uploads=3,
148-
max_chunk_retries=1,
149-
chunk_retry_interval=1,
160+
chunk_size: int = DEFAULT_UPLOAD_FLOW_CHUNK_SIZE,
161+
simultaneous_uploads: int = DEFAULT_UPLOAD_SIMULTANEOUS_UPLOADS,
162+
max_chunk_retries: int = DEFAULT_UPLOAD_MAX_CHUNK_RETRIES,
163+
chunk_retry_interval: int = 1,
150164
):
151165
"""Upload a file to the Hopsworks filesystem.
152166
@@ -165,7 +179,7 @@ def upload(
165179
local_path: local path to file to upload
166180
upload_path: path to directory where to upload the file in Hopsworks Filesystem
167181
overwrite: overwrite file if exists
168-
chunk_size: upload chunk size in bytes. Default 1048576 bytes
182+
chunk_size: upload chunk size in bytes. Default 10 MB
169183
simultaneous_uploads: number of simultaneous chunks to upload. Default 3
170184
max_chunk_retries: maximum retry for a chunk. Default is 1
171185
chunk_retry_interval: chunk retry interval in seconds. Default is 1sec
@@ -327,6 +341,16 @@ def _get(self, path: str):
327341
headers = {"content-type": "application/json"}
328342
return _client._send_request("GET", path_params, headers=headers)
329343

344+
def get(self, path: str):
345+
"""Get dataset.
346+
347+
:param path: path to check
348+
:type path: str
349+
:return: dataset metadata
350+
:rtype: dict
351+
"""
352+
return self._get(path)
353+
330354
def exists(self, path: str):
331355
"""Check if a file exists in the Hopsworks Filesystem.
332356
@@ -343,6 +367,16 @@ def exists(self, path: str):
343367
except RestAPIError:
344368
return False
345369

370+
def path_exists(self, remote_path: str):
371+
"""Check if a path exists in datasets.
372+
373+
:param remote_path: path to check
374+
:type remote_path: str
375+
:return: boolean whether path exists
376+
:rtype: bool
377+
"""
378+
return self.exists(remote_path)
379+
346380
@usage.method_logger
347381
def remove(self, path: str):
348382
"""Remove a path in the Hopsworks Filesystem.
@@ -354,7 +388,17 @@ def remove(self, path: str):
354388
"""
355389
_client = client.get_instance()
356390
path_params = ["project", _client._project_id, "dataset", path]
357-
_client._send_request("DELETE", path_params)
391+
return _client._send_request("DELETE", path_params)
392+
393+
def rm(self, remote_path: str):
394+
"""Remove a path in the Hopsworks Filesystem.
395+
396+
# Arguments
397+
remote_path: path to remove
398+
# Raises
399+
`RestAPIError`: If unable to remove the path
400+
"""
401+
return self.remove(remote_path)
358402

359403
@usage.method_logger
360404
def mkdir(self, path: str):
@@ -520,6 +564,27 @@ def list_files(self, path, offset, limit):
520564

521565
return inode_lst["count"], inode.Inode.from_response_json(inode_lst)
522566

567+
@usage.method_logger
568+
def list(self, remote_path, sort_by=None, limit=1000):
569+
"""List all files in a directory in datasets.
570+
571+
:param remote_path: path to list
572+
:type remote_path: str
573+
:param sort_by: sort string
574+
:type sort_by: str
575+
:param limit: max number of returned files
576+
:type limit: int
577+
"""
578+
# this method is probably to be merged with list_files
579+
# they seem to handle paths differently and return different results, which prevents the merge at the moment (2024-09-03), due to the requirement of backwards-compatibility
580+
_client = client.get_instance()
581+
path_params = ["project", _client._project_id, "dataset", remote_path]
582+
query_params = {"action": "listing", "sort_by": sort_by, "limit": limit}
583+
headers = {"content-type": "application/json"}
584+
return _client._send_request(
585+
"GET", path_params, headers=headers, query_params=query_params
586+
)
587+
523588
@usage.method_logger
524589
def read_content(self, path: str, dataset_type: str = "DATASET"):
525590
_client = client.get_instance()
@@ -538,3 +603,223 @@ def read_content(self, path: str, dataset_type: str = "DATASET"):
538603
}
539604

540605
return _client._send_request("GET", path_params, query_params, stream=True)
606+
607+
def chmod(self, remote_path, permissions):
608+
"""Chmod operation on file or directory in datasets.
609+
610+
:param remote_path: path to chmod
611+
:type remote_path: str
612+
:param permissions: permissions string, for example u+x
613+
:type permissions: str
614+
"""
615+
_client = client.get_instance()
616+
path_params = ["project", _client._project_id, "dataset", remote_path]
617+
headers = {"content-type": "application/json"}
618+
query_params = {"action": "PERMISSION", "permissions": permissions}
619+
return _client._send_request(
620+
"PUT", path_params, headers=headers, query_params=query_params
621+
)
622+
623+
# region Archiving
624+
625+
def _archive(
626+
self,
627+
remote_path: str,
628+
destination_path: Optional[str] = None,
629+
block: bool = False,
630+
timeout: Optional[int] = 120,
631+
action: Union[Literal["unzip"], Literal["zip"]] = "unzip",
632+
):
633+
"""Internal (de)compression logic.
634+
635+
# Arguments
636+
remote_path: path to file or directory to unzip.
637+
destination_path: path to upload the zip, defaults to None; is used only if action is zip.
638+
block: if the operation should be blocking until complete, defaults to False.
639+
timeout: timeout in seconds for the blocking, defaults to 120; if None, the blocking is unbounded.
640+
action: zip or unzip, defaults to unzip.
641+
642+
# Returns
643+
`bool`: whether the operation completed in the specified timeout; if non-blocking, always returns True.
644+
"""
645+
646+
_client = client.get_instance()
647+
path_params = ["project", _client._project_id, "dataset", remote_path]
648+
649+
query_params = {"action": action}
650+
651+
if destination_path is not None:
652+
query_params["destination_path"] = destination_path
653+
query_params["destination_type"] = "DATASET"
654+
655+
headers = {"content-type": "application/json"}
656+
657+
_client._send_request(
658+
"POST", path_params, headers=headers, query_params=query_params
659+
)
660+
661+
if not block:
662+
# the call is successful at this point if we don't want to block
663+
return True
664+
665+
# Wait for zip file to appear. When it does, check that parent dir zipState is not set to CHOWNING
666+
count = 0
667+
while timeout is None:
668+
if action == "zip":
669+
zip_path = remote_path + ".zip"
670+
# Get the status of the zipped file
671+
if destination_path is None:
672+
zip_exists = self.path_exists(zip_path)
673+
else:
674+
zip_exists = self.path_exists(
675+
destination_path + "/" + os.path.split(zip_path)[1]
676+
)
677+
# Get the zipState of the directory being zipped
678+
dir_status = self.get(remote_path)
679+
zip_state = dir_status["zipState"] if "zipState" in dir_status else None
680+
if zip_exists and zip_state == "NONE":
681+
return True
682+
elif action == "unzip":
683+
# Get the status of the unzipped dir
684+
unzipped_dir_exists = self.path_exists(
685+
remote_path[: remote_path.index(".")]
686+
)
687+
# Get the zipState of the zip being extracted
688+
dir_status = self.get(remote_path)
689+
zip_state = dir_status["zipState"] if "zipState" in dir_status else None
690+
if unzipped_dir_exists and zip_state == "NONE":
691+
return True
692+
time.sleep(1)
693+
count += 1
694+
if count >= timeout:
695+
self._log.info(
696+
f"Timeout of {timeout} seconds exceeded while {action} {remote_path}."
697+
)
698+
return False
699+
700+
def unzip(
701+
self, remote_path: str, block: bool = False, timeout: Optional[int] = 120
702+
):
703+
"""Unzip an archive in the dataset.
704+
705+
# Arguments
706+
remote_path: path to file or directory to unzip.
707+
block: if the operation should be blocking until complete, defaults to False.
708+
timeout: timeout in seconds for the blocking, defaults to 120; if None, the blocking is unbounded.
709+
710+
# Returns
711+
`bool`: whether the operation completed in the specified timeout; if non-blocking, always returns True.
712+
"""
713+
return self._archive(remote_path, block=block, timeout=timeout, action="unzip")
714+
715+
def zip(
716+
self,
717+
remote_path: str,
718+
destination_path: Optional[str] = None,
719+
block: bool = False,
720+
timeout: Optional[int] = 120,
721+
):
722+
"""Zip a file or directory in the dataset.
723+
724+
# Arguments
725+
remote_path: path to file or directory to unzip.
726+
destination_path: path to upload the zip, defaults to None.
727+
block: if the operation should be blocking until complete, defaults to False.
728+
timeout: timeout in seconds for the blocking, defaults to 120; if None, the blocking is unbounded.
729+
730+
# Returns
731+
`bool`: whether the operation completed in the specified timeout; if non-blocking, always returns True.
732+
"""
733+
return self._archive(
734+
remote_path,
735+
destination_path=destination_path,
736+
block=block,
737+
timeout=timeout,
738+
action="zip",
739+
)
740+
741+
# region Dataset Tags
742+
743+
def add(self, path, name, value):
744+
"""Attach a name/value tag to a model.
745+
746+
A tag consists of a name/value pair. Tag names are unique identifiers.
747+
The value of a tag can be any valid json - primitives, arrays or json objects.
748+
749+
:param path: path to add the tag
750+
:type path: str
751+
:param name: name of the tag to be added
752+
:type name: str
753+
:param value: value of the tag to be added
754+
:type value: str
755+
"""
756+
_client = client.get_instance()
757+
path_params = [
758+
"project",
759+
_client._project_id,
760+
"dataset",
761+
"tags",
762+
"schema",
763+
name,
764+
path,
765+
]
766+
headers = {"content-type": "application/json"}
767+
json_value = json.dumps(value)
768+
_client._send_request("PUT", path_params, headers=headers, data=json_value)
769+
770+
def delete(self, path, name):
771+
"""Delete a tag.
772+
773+
Tag names are unique identifiers.
774+
775+
:param path: path to delete the tags
776+
:type path: str
777+
:param name: name of the tag to be removed
778+
:type name: str
779+
"""
780+
_client = client.get_instance()
781+
path_params = [
782+
"project",
783+
_client._project_id,
784+
"dataset",
785+
"tags",
786+
"schema",
787+
name,
788+
path,
789+
]
790+
_client._send_request("DELETE", path_params)
791+
792+
def get_tags(self, path, name: str = None):
793+
"""Get the tags.
794+
795+
Gets all tags if no tag name is specified.
796+
797+
:param path: path to get the tags
798+
:type path: str
799+
:param name: tag name
800+
:type name: str
801+
:return: dict of tag name/values
802+
:rtype: dict
803+
"""
804+
_client = client.get_instance()
805+
path_params = [
806+
"project",
807+
_client._project_id,
808+
"dataset",
809+
"tags",
810+
]
811+
812+
if name is not None:
813+
path_params.append("schema")
814+
path_params.append(name)
815+
else:
816+
path_params.append("all")
817+
818+
path_params.append(path)
819+
820+
return {
821+
tag._name: json.loads(tag._value)
822+
for tag in tag.Tag.from_response_json(
823+
_client._send_request("GET", path_params)
824+
)
825+
}

0 commit comments

Comments
 (0)