15
15
#
16
16
17
17
import os
18
+ import tempfile
18
19
import time
19
20
import uuid
20
21
from typing import Dict , List , Union
21
22
22
- from hsml import util , constants
23
+ from hsml import constants
23
24
from hsml .client .exceptions import ModelServingException , RestAPIError
24
25
from hsml .client .istio .utils .infer_type import InferInput
25
26
from hsml .constants import (
31
32
INFERENCE_ENDPOINTS as IE ,
32
33
)
33
34
from hsml .core import dataset_api , serving_api
35
+ from hsml .engine import local_engine
34
36
from tqdm .auto import tqdm
35
37
36
38
@@ -51,6 +53,8 @@ def __init__(self):
51
53
self ._serving_api = serving_api .ServingApi ()
52
54
self ._dataset_api = dataset_api .DatasetApi ()
53
55
56
+ self ._engine = local_engine .LocalEngine ()
57
+
54
58
def _poll_deployment_status (
55
59
self , deployment_instance , status : str , await_status : int , update_progress = None
56
60
):
@@ -304,7 +308,64 @@ def _get_stopped_instances(self, available_instances, requested_instances):
304
308
num_instances = requested_instances - available_instances
305
309
return num_instances if num_instances >= 0 else 0
306
310
307
- def download_artifact (self , deployment_instance ):
311
+ def _download_files_from_hopsfs_recursive (
312
+ self ,
313
+ from_hdfs_path : str ,
314
+ to_local_path : str ,
315
+ update_download_progress ,
316
+ n_dirs ,
317
+ n_files ,
318
+ ):
319
+ """Download model files from a model path in hdfs, recursively"""
320
+
321
+ for entry in self ._dataset_api .list (from_hdfs_path , sort_by = "NAME:desc" )[
322
+ "items"
323
+ ]:
324
+ path_attr = entry ["attributes" ]
325
+ path = path_attr ["path" ]
326
+ basename = os .path .basename (path )
327
+
328
+ if path_attr .get ("dir" , False ):
329
+ # otherwise, make a recursive call for the folder
330
+ if (
331
+ basename == constants .MODEL_SERVING .ARTIFACTS_DIR_NAME
332
+ ): # TODO: Not needed anymore
333
+ continue # skip Artifacts subfolder
334
+ local_folder_path = os .path .join (to_local_path , basename )
335
+ os .mkdir (local_folder_path )
336
+ n_dirs , n_files = self ._download_files_from_hopsfs_recursive (
337
+ from_hdfs_path = path ,
338
+ to_local_path = local_folder_path ,
339
+ update_download_progress = update_download_progress ,
340
+ n_dirs = n_dirs ,
341
+ n_files = n_files ,
342
+ )
343
+ n_dirs += 1
344
+ update_download_progress (n_dirs = n_dirs , n_files = n_files )
345
+ else :
346
+ # if it's a file, download it
347
+ local_file_path = os .path .join (to_local_path , basename )
348
+ self ._engine .download (path , local_file_path )
349
+ n_files += 1
350
+ update_download_progress (n_dirs = n_dirs , n_files = n_files )
351
+
352
+ return n_dirs , n_files
353
+
354
+ def _download_files_from_hopsfs (
355
+ self , from_hdfs_path : str , to_local_path : str , update_download_progress
356
+ ):
357
+ """Download files from a model path in hdfs."""
358
+
359
+ n_dirs , n_files = self ._download_files_from_hopsfs_recursive (
360
+ from_hdfs_path = from_hdfs_path ,
361
+ to_local_path = to_local_path ,
362
+ update_download_progress = update_download_progress ,
363
+ n_dirs = 0 ,
364
+ n_files = 0 ,
365
+ )
366
+ update_download_progress (n_dirs = n_dirs , n_files = n_files , done = True )
367
+
368
+ def download_artifact_files (self , deployment_instance ):
308
369
if deployment_instance .id is None :
309
370
raise ModelServingException (
310
371
"Deployment is not created yet. To create the deployment use `.save()`"
@@ -316,30 +377,38 @@ def download_artifact(self, deployment_instance):
316
377
Download the model files by using `model.download()`"
317
378
)
318
379
319
- from_artifact_zip_path = deployment_instance .artifact_path
320
- to_artifacts_path = os .path .join (
321
- os .getcwd (),
380
+ artifact_files_path = os .path .join (
381
+ tempfile .gettempdir (),
322
382
str (uuid .uuid4 ()),
323
383
deployment_instance .model_name ,
324
384
str (deployment_instance .model_version ),
325
- constants .MODEL_REGISTRY .ARTIFACTS_DIR_NAME ,
326
- )
327
- to_artifact_version_path = (
328
- to_artifacts_path + "/" + str (deployment_instance .artifact_version )
385
+ constants .MODEL_SERVING .ARTIFACTS_DIR_NAME ,
386
+ str (deployment_instance .artifact_version ),
329
387
)
330
- to_artifact_zip_path = to_artifact_version_path + ".zip"
388
+ os . makedirs ( artifact_files_path )
331
389
332
- os .makedirs (to_artifacts_path )
390
+ def update_download_progress (n_dirs , n_files , done = False ):
391
+ print (
392
+ "Downloading artifact files (%s dirs, %s files)... %s"
393
+ % (n_dirs , n_files , "DONE" if done else "" ),
394
+ end = "\r " ,
395
+ )
333
396
334
397
try :
335
- self ._dataset_api .download (from_artifact_zip_path , to_artifact_zip_path )
336
- util .decompress (to_artifact_zip_path , extract_dir = to_artifacts_path )
337
- os .remove (to_artifact_zip_path )
338
- finally :
339
- if os .path .exists (to_artifact_zip_path ):
340
- os .remove (to_artifact_zip_path )
341
-
342
- return to_artifact_version_path
398
+ from_hdfs_path = deployment_instance .artifact_files_path
399
+ if from_hdfs_path .startswith ("hdfs:/" ):
400
+ projects_index = from_hdfs_path .find ("/Projects" , 0 )
401
+ from_hdfs_path = from_hdfs_path [projects_index :]
402
+
403
+ self ._download_files_from_hopsfs (
404
+ from_hdfs_path = from_hdfs_path ,
405
+ to_local_path = artifact_files_path ,
406
+ update_download_progress = update_download_progress ,
407
+ )
408
+ except BaseException as be :
409
+ raise be
410
+
411
+ return artifact_files_path
343
412
344
413
def create (self , deployment_instance ):
345
414
try :
0 commit comments