Skip to content

Commit b256ef4

Browse files
authored
[HWORKS-1171] Make chunk size configurable when uploading models to t… (logicalclocks#228)
* [HWORKS-1171] Make chunk size configurable when uploading models to the model registry from outside * Revert black changes to predictor_state.py
1 parent 96b5b91 commit b256ef4

File tree

7 files changed

+82
-23
lines changed

7 files changed

+82
-23
lines changed

python/hsml/client/istio/grpc/proto/grpc_predict_v2_pb2.pyi

+6
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ class ModelInferRequest(_message.Message):
9090
key: _Optional[str] = ...,
9191
value: _Optional[_Union[InferParameter, _Mapping]] = ...,
9292
) -> None: ...
93+
9394
CONTENTS_FIELD_NUMBER: _ClassVar[int]
9495
DATATYPE_FIELD_NUMBER: _ClassVar[int]
9596
NAME_FIELD_NUMBER: _ClassVar[int]
@@ -123,6 +124,7 @@ class ModelInferRequest(_message.Message):
123124
key: _Optional[str] = ...,
124125
value: _Optional[_Union[InferParameter, _Mapping]] = ...,
125126
) -> None: ...
127+
126128
NAME_FIELD_NUMBER: _ClassVar[int]
127129
PARAMETERS_FIELD_NUMBER: _ClassVar[int]
128130
name: str
@@ -144,6 +146,7 @@ class ModelInferRequest(_message.Message):
144146
key: _Optional[str] = ...,
145147
value: _Optional[_Union[InferParameter, _Mapping]] = ...,
146148
) -> None: ...
149+
147150
ID_FIELD_NUMBER: _ClassVar[int]
148151
INPUTS_FIELD_NUMBER: _ClassVar[int]
149152
MODEL_NAME_FIELD_NUMBER: _ClassVar[int]
@@ -201,6 +204,7 @@ class ModelInferResponse(_message.Message):
201204
key: _Optional[str] = ...,
202205
value: _Optional[_Union[InferParameter, _Mapping]] = ...,
203206
) -> None: ...
207+
204208
CONTENTS_FIELD_NUMBER: _ClassVar[int]
205209
DATATYPE_FIELD_NUMBER: _ClassVar[int]
206210
NAME_FIELD_NUMBER: _ClassVar[int]
@@ -231,6 +235,7 @@ class ModelInferResponse(_message.Message):
231235
key: _Optional[str] = ...,
232236
value: _Optional[_Union[InferParameter, _Mapping]] = ...,
233237
) -> None: ...
238+
234239
ID_FIELD_NUMBER: _ClassVar[int]
235240
MODEL_NAME_FIELD_NUMBER: _ClassVar[int]
236241
MODEL_VERSION_FIELD_NUMBER: _ClassVar[int]
@@ -284,6 +289,7 @@ class ModelMetadataResponse(_message.Message):
284289
datatype: _Optional[str] = ...,
285290
shape: _Optional[_Iterable[int]] = ...,
286291
) -> None: ...
292+
287293
INPUTS_FIELD_NUMBER: _ClassVar[int]
288294
NAME_FIELD_NUMBER: _ClassVar[int]
289295
OUTPUTS_FIELD_NUMBER: _ClassVar[int]

python/hsml/core/dataset_api.py

+14-9
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,21 @@ class DatasetApi:
3838
def __init__(self):
3939
pass
4040

41-
DEFAULT_FLOW_CHUNK_SIZE = 1048576
41+
DEFAULT_UPLOAD_FLOW_CHUNK_SIZE = 10
42+
DEFAULT_UPLOAD_SIMULTANEOUS_UPLOADS = 3
43+
DEFAULT_UPLOAD_MAX_CHUNK_RETRIES = 1
44+
45+
DEFAULT_DOWNLOAD_FLOW_CHUNK_SIZE = 1_048_576
4246
FLOW_PERMANENT_ERRORS = [404, 413, 415, 500, 501]
4347

4448
def upload(
4549
self,
4650
local_path: str,
4751
upload_path: str,
4852
overwrite: bool = False,
49-
chunk_size=1048576,
50-
simultaneous_uploads=3,
51-
max_chunk_retries=1,
53+
chunk_size=DEFAULT_UPLOAD_FLOW_CHUNK_SIZE,
54+
simultaneous_uploads=DEFAULT_UPLOAD_SIMULTANEOUS_UPLOADS,
55+
max_chunk_retries=DEFAULT_UPLOAD_MAX_CHUNK_RETRIES,
5256
chunk_retry_interval=1,
5357
):
5458
"""Upload a file to the Hopsworks filesystem.
@@ -66,7 +70,7 @@ def upload(
6670
local_path: local path to file to upload
6771
upload_path: path to directory where to upload the file in Hopsworks Filesystem
6872
overwrite: overwrite file if exists
69-
chunk_size: upload chunk size in bytes. Default 1048576 bytes
73+
chunk_size: upload chunk size in megabytes. Default 10 MB
7074
simultaneous_uploads: number of simultaneous chunks to upload. Default 3
7175
max_chunk_retries: maximum retry for a chunk. Default is 1
7276
chunk_retry_interval: chunk retry interval in seconds. Default is 1sec
@@ -86,6 +90,7 @@ def upload(
8690
_, file_name = os.path.split(local_path)
8791

8892
destination_path = upload_path + "/" + file_name
93+
chunk_size_bytes = chunk_size * 1024 * 1024
8994

9095
if self.path_exists(destination_path):
9196
if overwrite:
@@ -97,10 +102,10 @@ def upload(
97102
)
98103
)
99104

100-
num_chunks = math.ceil(file_size / chunk_size)
105+
num_chunks = math.ceil(file_size / chunk_size_bytes)
101106

102107
base_params = self._get_flow_base_params(
103-
file_name, num_chunks, file_size, chunk_size
108+
file_name, num_chunks, file_size, chunk_size_bytes
104109
)
105110

106111
chunk_number = 1
@@ -119,7 +124,7 @@ def upload(
119124
while True:
120125
chunks = []
121126
for _ in range(simultaneous_uploads):
122-
chunk = f.read(chunk_size)
127+
chunk = f.read(chunk_size_bytes)
123128
if not chunk:
124129
break
125130
chunks.append(Chunk(chunk, chunk_number, "pending"))
@@ -241,7 +246,7 @@ def download(self, path, local_path):
241246
downloaded = 0
242247
# if not response.headers.get("Content-Length"), file is still downloading
243248
for chunk in response.iter_content(
244-
chunk_size=self.DEFAULT_FLOW_CHUNK_SIZE
249+
chunk_size=self.DEFAULT_DOWNLOAD_FLOW_CHUNK_SIZE
245250
):
246251
f.write(chunk)
247252
downloaded += len(chunk)

python/hsml/engine/hopsworks_engine.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def mkdir(self, remote_path: str):
3333
def delete(self, model_instance):
3434
self._model_api.delete(model_instance)
3535

36-
def upload(self, local_path: str, remote_path: str):
36+
def upload(self, local_path: str, remote_path: str, upload_configuration=None):
3737
local_path = self._get_abs_path(local_path)
3838
remote_path = self._prepend_project_path(remote_path)
3939
self._native_hdfs_api.upload(local_path, remote_path)

python/hsml/engine/local_engine.py

+19-2
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,27 @@ def mkdir(self, remote_path: str):
3232
def delete(self, model_instance):
3333
self._model_api.delete(model_instance)
3434

35-
def upload(self, local_path: str, remote_path: str):
35+
def upload(self, local_path: str, remote_path: str, upload_configuration=None):
3636
local_path = self._get_abs_path(local_path)
3737
remote_path = self._prepend_project_path(remote_path)
38-
self._dataset_api.upload(local_path, remote_path)
38+
39+
# Initialize the upload configuration to empty dictionary if is None
40+
upload_configuration = upload_configuration if upload_configuration else {}
41+
self._dataset_api.upload(
42+
local_path,
43+
remote_path,
44+
chunk_size=upload_configuration.get(
45+
"chunk_size", self._dataset_api.DEFAULT_UPLOAD_FLOW_CHUNK_SIZE
46+
),
47+
simultaneous_uploads=upload_configuration.get(
48+
"simultaneous_uploads",
49+
self._dataset_api.DEFAULT_UPLOAD_SIMULTANEOUS_UPLOADS,
50+
),
51+
max_chunk_retries=upload_configuration.get(
52+
"max_chunk_retries",
53+
self._dataset_api.DEFAULT_UPLOAD_MAX_CHUNK_RETRIES,
54+
),
55+
)
3956

4057
def download(self, remote_path: str, local_path: str):
4158
local_path = self._get_abs_path(local_path)

python/hsml/engine/model_engine.py

+21-3
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ def _upload_local_model(
203203
from_local_model_path,
204204
to_model_version_path,
205205
update_upload_progress,
206+
upload_configuration=None,
206207
):
207208
"""Copy or upload model files from a local path to the model version folder in the Models dataset."""
208209
n_dirs, n_files = 0, 0
@@ -222,17 +223,30 @@ def _upload_local_model(
222223
n_dirs += 1
223224
update_upload_progress(n_dirs, n_files)
224225
for f_name in files:
225-
self._engine.upload(root + "/" + f_name, remote_base_path)
226+
self._engine.upload(
227+
root + "/" + f_name,
228+
remote_base_path,
229+
upload_configuration=upload_configuration,
230+
)
226231
n_files += 1
227232
update_upload_progress(n_dirs, n_files)
228233
else:
229234
# if path is a file, upload file
230-
self._engine.upload(from_local_model_path, to_model_version_path)
235+
self._engine.upload(
236+
from_local_model_path,
237+
to_model_version_path,
238+
upload_configuration=upload_configuration,
239+
)
231240
n_files += 1
232241
update_upload_progress(n_dirs, n_files)
233242

234243
def _save_model_from_local_or_hopsfs_mount(
235-
self, model_instance, model_path, keep_original_files, update_upload_progress
244+
self,
245+
model_instance,
246+
model_path,
247+
keep_original_files,
248+
update_upload_progress,
249+
upload_configuration=None,
236250
):
237251
"""Save model files from a local path. The local path can be on hopsfs mount"""
238252
# check hopsfs mount
@@ -250,6 +264,7 @@ def _save_model_from_local_or_hopsfs_mount(
250264
from_local_model_path=model_path,
251265
to_model_version_path=model_instance.version_path,
252266
update_upload_progress=update_upload_progress,
267+
upload_configuration=upload_configuration,
253268
)
254269

255270
def _set_model_version(
@@ -297,6 +312,7 @@ def save(
297312
model_path,
298313
await_registration=480,
299314
keep_original_files=False,
315+
upload_configuration=None,
300316
):
301317
_client = client.get_instance()
302318

@@ -377,6 +393,7 @@ def update_upload_progress(n_dirs=0, n_files=0):
377393
model_path=model_path,
378394
keep_original_files=keep_original_files,
379395
update_upload_progress=update_upload_progress,
396+
upload_configuration=upload_configuration,
380397
)
381398
# check local relative
382399
elif os.path.exists(
@@ -387,6 +404,7 @@ def update_upload_progress(n_dirs=0, n_files=0):
387404
model_path=os.path.join(os.getcwd(), model_path),
388405
keep_original_files=keep_original_files,
389406
update_upload_progress=update_upload_progress,
407+
upload_configuration=upload_configuration,
390408
)
391409
# check project relative
392410
elif self._dataset_api.path_exists(

python/hsml/model.py

+15-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
import json
1818
import humps
19-
from typing import Union, Optional
19+
from typing import Union, Optional, Dict, Any
2020

2121

2222
from hsml import client, util
@@ -93,13 +93,25 @@ def __init__(
9393
self._feature_view = feature_view
9494
self._training_dataset_version = training_dataset_version
9595

96-
def save(self, model_path, await_registration=480, keep_original_files=False):
96+
def save(
97+
self,
98+
model_path,
99+
await_registration=480,
100+
keep_original_files=False,
101+
upload_configuration: Optional[Dict[str, Any]] = None,
102+
):
97103
"""Persist this model including model files and metadata to the model registry.
98104
99105
# Arguments
100106
model_path: Local or remote (Hopsworks file system) path to the folder where the model files are located, or path to a specific model file.
101107
await_registration: Awaiting time for the model to be registered in Hopsworks.
102108
keep_original_files: If the model files are located in hopsfs, whether to move or copy those files into the Models dataset. Default is False (i.e., model files will be moved)
109+
upload_configuration: When saving a model from outside Hopsworks, the model is uploaded to the model registry using the REST APIs. Each model artifact is divided into
110+
chunks and each chunk uploaded independently. This parameter can be used to control the upload chunk size, the parallelism and the number of retries.
111+
`upload_configuration` can contain the following keys:
112+
* key `chunk_size`: size of each chunk in megabytes. Default 10.
113+
* key `simultaneous_uploads`: number of chunks to upload in parallel. Default 3.
114+
* key `max_chunk_retries`: number of times to retry the upload of a chunk in case of failure. Default 1.
103115
104116
# Returns
105117
`Model`: The model metadata object.
@@ -109,6 +121,7 @@ def save(self, model_path, await_registration=480, keep_original_files=False):
109121
model_path,
110122
await_registration=await_registration,
111123
keep_original_files=keep_original_files,
124+
upload_configuration=upload_configuration,
112125
)
113126

114127
def download(self):

python/hsml/resources.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -324,9 +324,9 @@ def to_dict(self):
324324
return {
325325
humps.camelize(self.NUM_INSTANCES_KEY): self._num_instances,
326326
humps.camelize(self.RESOURCES_CONFIG_KEY): {
327-
"requests": self._requests.to_dict()
328-
if self._requests is not None
329-
else None,
327+
"requests": (
328+
self._requests.to_dict() if self._requests is not None else None
329+
),
330330
"limits": self._limits.to_dict() if self._limits is not None else None,
331331
},
332332
}
@@ -352,9 +352,9 @@ def to_dict(self):
352352
return {
353353
humps.camelize(self.NUM_INSTANCES_KEY): self._num_instances,
354354
humps.camelize(self.RESOURCES_CONFIG_KEY): {
355-
"requests": self._requests.to_dict()
356-
if self._requests is not None
357-
else None,
355+
"requests": (
356+
self._requests.to_dict() if self._requests is not None else None
357+
),
358358
"limits": self._limits.to_dict() if self._limits is not None else None,
359359
},
360360
}

0 commit comments

Comments
 (0)