Skip to content

Commit 8c99438

Browse files
authored
[HWORKS-829] Register models without compressing files (logicalclocks#206)
* [HWORKS-829] Upload uncompressed files, copy_from_local if internal, move if hopsfs-mount * Set keep_original_files default value to False * Reuse local/hopsworks engine for copy, move and download ops
1 parent eee4f5b commit 8c99438

File tree

6 files changed

+213
-69
lines changed

6 files changed

+213
-69
lines changed

python/hsml/constants.py

+4
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ class MODEL:
2525
FRAMEWORK_SKLEARN = "SKLEARN"
2626

2727

28+
class MODEL_REGISTRY:
29+
HOPSFS_MOUNT_PREFIX = "/home/yarnapp/hopsfs/"
30+
31+
2832
class MODEL_SERVING:
2933
MODELS_DATASET = "Models"
3034

python/hsml/core/native_hdfs_api.py

+19-2
Original file line numberDiff line numberDiff line change
@@ -39,5 +39,22 @@ def chmod(self, hdfs_path, mode):
3939
def mkdir(self, path):
4040
return hdfs.mkdir(path)
4141

42-
def delete(self, path):
43-
hdfs.rm(path, recursive=True)
42+
def rm(self, path, recursive=True):
43+
hdfs.rm(path, recursive=recursive)
44+
45+
def upload(self, local_path: str, remote_path: str):
46+
# copy from local fs to hdfs
47+
hdfs.put(local_path, remote_path)
48+
49+
def download(self, remote_path: str, local_path: str):
50+
# copy from hdfs to local fs
51+
print("Downloading file ...", end=" ")
52+
hdfs.get(remote_path, local_path)
53+
54+
def copy(self, source_path: str, destination_path: str):
55+
# both paths are hdfs paths
56+
hdfs.cp(source_path, destination_path)
57+
58+
def move(self, source_path: str, destination_path: str):
59+
# both paths are hdfs paths
60+
hdfs.rename(source_path, destination_path)

python/hsml/engine/hopsworks_engine.py

+42-19
Original file line numberDiff line numberDiff line change
@@ -14,29 +14,52 @@
1414
# limitations under the License.
1515
#
1616

17+
import os
18+
1719
from hsml.core import native_hdfs_api
18-
from hsml import constants
20+
from hsml import client
1921

2022

2123
class HopsworksEngine:
2224
def __init__(self):
2325
self._native_hdfs_api = native_hdfs_api.NativeHdfsApi()
2426

25-
def mkdir(self, model_instance):
26-
model_version_dir_hdfs = "/Projects/{}/{}/{}/{}".format(
27-
model_instance.project_name,
28-
constants.MODEL_SERVING.MODELS_DATASET,
29-
model_instance.name,
30-
str(model_instance.version),
31-
)
32-
self._native_hdfs_api.mkdir(model_version_dir_hdfs)
33-
self._native_hdfs_api.chmod(model_version_dir_hdfs, "ug+rwx")
34-
35-
def delete(self, model_instance):
36-
model_version_dir_hdfs = "/Projects/{}/{}/{}/{}".format(
37-
model_instance.project_name,
38-
constants.MODEL_SERVING.MODELS_DATASET,
39-
model_instance.name,
40-
str(model_instance.version),
41-
)
42-
self._native_hdfs_api.delete(model_version_dir_hdfs)
27+
def mkdir(self, remote_path: str):
28+
remote_path = self._prepend_project_path(remote_path)
29+
self._native_hdfs_api.mkdir(remote_path)
30+
self._native_hdfs_api.chmod(remote_path, "ug+rwx")
31+
32+
def delete(self, remote_path: str):
33+
remote_path = self._prepend_project_path(remote_path)
34+
self._native_hdfs_api.rm(remote_path)
35+
36+
def upload(self, local_path: str, remote_path: str):
37+
local_path = self._get_abs_path(local_path)
38+
remote_path = self._prepend_project_path(remote_path)
39+
self._native_hdfs_api.upload(local_path, remote_path)
40+
self._native_hdfs_api.chmod(remote_path, "ug+rwx")
41+
42+
def download(self, remote_path: str, local_path: str):
43+
local_path = self._get_abs_path(local_path)
44+
remote_path = self._prepend_project_path(remote_path)
45+
self._native_hdfs_api.download(remote_path, local_path)
46+
47+
def copy(self, source_path: str, destination_path: str):
48+
# both paths are hdfs paths
49+
source_path = self._prepend_project_path(source_path)
50+
destination_path = self._prepend_project_path(destination_path)
51+
self._native_hdfs_api.copy(source_path, destination_path)
52+
53+
def move(self, source_path: str, destination_path: str):
54+
source_path = self._prepend_project_path(source_path)
55+
destination_path = self._prepend_project_path(destination_path)
56+
self._native_hdfs_api.move(source_path, destination_path)
57+
58+
def _get_abs_path(self, local_path: str):
59+
return local_path if os.path.isabs(local_path) else os.path.abspath(local_path)
60+
61+
def _prepend_project_path(self, remote_path: str):
62+
if not remote_path.startswith("/Projects/"):
63+
_client = client.get_instance()
64+
remote_path = "/Projects/{}/{}".format(_client._project_name, remote_path)
65+
return remote_path

python/hsml/engine/local_engine.py

+38-4
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,49 @@
1414
# limitations under the License.
1515
#
1616

17+
import os
18+
1719
from hsml.core import dataset_api
20+
from hsml import client
1821

1922

2023
class LocalEngine:
2124
def __init__(self):
2225
self._dataset_api = dataset_api.DatasetApi()
2326

24-
def mkdir(self, model_instance):
25-
self._dataset_api.mkdir(model_instance.version_path)
27+
def mkdir(self, remote_path: str):
28+
remote_path = self._prepend_project_path(remote_path)
29+
self._dataset_api.mkdir(remote_path)
30+
31+
def delete(self, remote_path: str):
32+
remote_path = self._prepend_project_path(remote_path)
33+
self._dataset_api.rm(remote_path)
34+
35+
def upload(self, local_path: str, remote_path: str):
36+
local_path = self._get_abs_path(local_path)
37+
remote_path = self._prepend_project_path(remote_path)
38+
self._dataset_api.upload(local_path, remote_path)
39+
40+
def download(self, remote_path: str, local_path: str):
41+
local_path = self._get_abs_path(local_path)
42+
remote_path = self._prepend_project_path(remote_path)
43+
self._dataset_api.download(remote_path, local_path)
44+
45+
def copy(self, source_path, destination_path):
46+
source_path = self._prepend_project_path(source_path)
47+
destination_path = self._prepend_project_path(destination_path)
48+
self._dataset_api.copy(source_path, destination_path)
49+
50+
def move(self, source_path, destination_path):
51+
source_path = self._prepend_project_path(source_path)
52+
destination_path = self._prepend_project_path(destination_path)
53+
self._dataset_api.move(source_path, destination_path)
54+
55+
def _get_abs_path(self, local_path: str):
56+
return local_path if os.path.isabs(local_path) else os.path.abspath(local_path)
2657

27-
def delete(self, model_instance):
28-
self._dataset_api.rm(model_instance.version_path)
58+
def _prepend_project_path(self, remote_path: str):
59+
if not remote_path.startswith("/Projects/"):
60+
_client = client.get_instance()
61+
remote_path = "/Projects/{}/{}".format(_client._project_name, remote_path)
62+
return remote_path

0 commit comments

Comments
 (0)