Skip to content

Commit 4f17aad

Browse files
authored
[HWORKS-835] Download model artifact without compressing files (logicalclocks#209)
1 parent 273c302 commit 4f17aad

File tree

3 files changed

+71
-24
lines changed

3 files changed

+71
-24
lines changed

python/hsml/core/dataset_api.py

+1-3
Original file line numberDiff line numberDiff line change
@@ -239,9 +239,7 @@ def download(self, path, local_path):
239239
) as response:
240240
with open(local_path, "wb") as f:
241241
downloaded = 0
242-
file_size = response.headers.get("Content-Length")
243-
if not file_size:
244-
print("Downloading file ...", end=" ")
242+
# if not response.headers.get("Content-Length"), file is still downloading
245243
for chunk in response.iter_content(
246244
chunk_size=self.DEFAULT_FLOW_CHUNK_SIZE
247245
):

python/hsml/core/native_hdfs_api.py

-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ def upload(self, local_path: str, remote_path: str):
4848

4949
def download(self, remote_path: str, local_path: str):
5050
# copy from hdfs to local fs
51-
print("Downloading file ...", end=" ")
5251
hdfs.get(remote_path, local_path)
5352

5453
def copy(self, source_path: str, destination_path: str):

python/hsml/engine/model_engine.py

+70-20
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,59 @@ def _copy_or_move_hopsfs_model(
117117
n_dirs += 1
118118
update_upload_progress(n_dirs=n_dirs, n_files=n_files)
119119

120+
def _download_model_from_hopsfs_recursive(
121+
self,
122+
from_hdfs_model_path: str,
123+
to_local_path: str,
124+
update_download_progress,
125+
n_dirs,
126+
n_files,
127+
):
128+
"""Download model files from a model path in hdfs, recursively"""
129+
130+
for entry in self._dataset_api.list(from_hdfs_model_path, sort_by="NAME:desc")[
131+
"items"
132+
]:
133+
path = entry["attributes"]["path"]
134+
basename = os.path.basename(path)
135+
if "." in path:
136+
# we assume that if a dot is contained in the path, it's the path to a file
137+
local_file_path = os.path.join(to_local_path, basename)
138+
self._engine.download(path, local_file_path)
139+
n_files += 1
140+
update_download_progress(n_dirs=n_dirs, n_files=n_files)
141+
else:
142+
# otherwise, it's a folder
143+
if basename == "Artifacts":
144+
continue # skip Artifacts subfolder
145+
local_folder_path = os.path.join(to_local_path, basename)
146+
os.mkdir(local_folder_path)
147+
n_dirs, n_files = self._download_model_from_hopsfs_recursive(
148+
from_hdfs_model_path=path,
149+
to_local_path=local_folder_path,
150+
update_download_progress=update_download_progress,
151+
n_dirs=n_dirs,
152+
n_files=n_files,
153+
)
154+
n_dirs += 1
155+
update_download_progress(n_dirs=n_dirs, n_files=n_files)
156+
157+
return n_dirs, n_files
158+
159+
def _download_model_from_hopsfs(
160+
self, from_hdfs_model_path: str, to_local_path: str, update_download_progress
161+
):
162+
"""Download model files from a model path in hdfs."""
163+
164+
n_dirs, n_files = self._download_model_from_hopsfs_recursive(
165+
from_hdfs_model_path=from_hdfs_model_path,
166+
to_local_path=to_local_path,
167+
update_download_progress=update_download_progress,
168+
n_dirs=0,
169+
n_files=0,
170+
)
171+
update_download_progress(n_dirs=n_dirs, n_files=n_files, done=True)
172+
120173
def _upload_local_model(
121174
self,
122175
from_local_model_path,
@@ -341,31 +394,28 @@ def download(self, model_instance):
341394
tempfile.gettempdir(), str(uuid.uuid4()), model_instance._name
342395
)
343396
model_version_path = model_name_path + "/" + str(model_instance._version)
344-
zip_path = model_version_path + ".zip"
345-
os.makedirs(model_name_path)
397+
os.makedirs(model_version_path)
346398

347-
temp_download_dir = "/Resources" + "/" + str(uuid.uuid4())
348-
try:
349-
self._engine.mkdir(temp_download_dir)
350-
self._dataset_api.zip(
351-
model_instance.version_path,
352-
destination_path=temp_download_dir,
353-
block=True,
354-
timeout=600,
399+
def update_download_progress(n_dirs, n_files, done=False):
400+
print(
401+
"Downloading model artifact (%s dirs, %s files)... %s"
402+
% (n_dirs, n_files, "DONE" if done else ""),
403+
end="\r",
355404
)
356-
self._engine.download(
357-
temp_download_dir + "/" + str(model_instance._version) + ".zip",
358-
zip_path,
405+
406+
try:
407+
from_hdfs_model_path = model_instance.version_path
408+
if from_hdfs_model_path.startswith("hdfs:/"):
409+
projects_index = from_hdfs_model_path.find("/Projects", 0)
410+
from_hdfs_model_path = from_hdfs_model_path[projects_index:]
411+
412+
self._download_model_from_hopsfs(
413+
from_hdfs_model_path=from_hdfs_model_path,
414+
to_local_path=model_version_path,
415+
update_download_progress=update_download_progress,
359416
)
360-
self._dataset_api.rm(temp_download_dir)
361-
util.decompress(zip_path, extract_dir=model_name_path)
362-
os.remove(zip_path)
363417
except BaseException as be:
364418
raise be
365-
finally:
366-
if os.path.exists(zip_path):
367-
os.remove(zip_path)
368-
self._dataset_api.rm(temp_download_dir)
369419

370420
return model_version_path
371421

0 commit comments

Comments
 (0)