Skip to content

Commit 273c302

Browse files
authored
[HWORKS-565] parallelize uploads (logicalclocks#208)
1 parent 8c99438 commit 273c302

File tree

1 file changed

+157
-23
lines changed

1 file changed

+157
-23
lines changed

python/hsml/core/dataset_api.py

+157-23
Original file line numberDiff line numberDiff line change
@@ -19,52 +19,186 @@
1919
import json
2020
from hsml.client.exceptions import RestAPIError
2121
import time
22+
import copy
23+
from tqdm.auto import tqdm
2224

2325
from hsml import client, tag
26+
from concurrent.futures import ThreadPoolExecutor, wait
27+
28+
29+
class Chunk:
30+
def __init__(self, content, number, status):
31+
self.content = content
32+
self.number = number
33+
self.status = status
34+
self.retries = 0
2435

2536

2637
class DatasetApi:
2738
def __init__(self):
2839
pass
2940

3041
DEFAULT_FLOW_CHUNK_SIZE = 1048576
42+
FLOW_PERMANENT_ERRORS = [404, 413, 415, 500, 501]
43+
44+
def upload(
45+
self,
46+
local_path: str,
47+
upload_path: str,
48+
overwrite: bool = False,
49+
chunk_size=1048576,
50+
simultaneous_uploads=3,
51+
max_chunk_retries=1,
52+
chunk_retry_interval=1,
53+
):
54+
"""Upload a file to the Hopsworks filesystem.
3155
32-
def upload(self, local_abs_path, upload_path):
33-
"""Upload file/directory in local path to datasets
56+
```python
3457
35-
:param local_abs_path: local path to upload
36-
:type local_abs_path: str
37-
:param upload_path: path in datasets to upload
38-
:type upload_path: str
58+
conn = hsml.connection(project="my-project")
59+
60+
dataset_api = conn.get_dataset_api()
61+
62+
uploaded_file_path = dataset_api.upload("my_local_file.txt", "Resources")
63+
64+
```
65+
# Arguments
66+
local_path: local path to file to upload
67+
upload_path: path to directory where to upload the file in Hopsworks Filesystem
68+
overwrite: overwrite file if exists
69+
chunk_size: upload chunk size in bytes. Default 1048576 bytes
70+
simultaneous_uploads: number of simultaneous chunks to upload. Default 3
71+
max_chunk_retries: maximum retry for a chunk. Default is 1
72+
chunk_retry_interval: chunk retry interval in seconds. Default is 1sec
73+
# Returns
74+
`str`: Path to uploaded file
75+
# Raises
76+
`RestAPIError`: If unable to upload the file
3977
"""
78+
# local path could be absolute or relative,
79+
if not os.path.isabs(local_path) and os.path.exists(
80+
os.path.join(os.getcwd(), local_path)
81+
):
82+
local_path = os.path.join(os.getcwd(), local_path)
83+
84+
file_size = os.path.getsize(local_path)
85+
86+
_, file_name = os.path.split(local_path)
4087

41-
size = os.path.getsize(local_abs_path)
88+
destination_path = upload_path + "/" + file_name
4289

43-
_, file_name = os.path.split(local_abs_path)
90+
if self.path_exists(destination_path):
91+
if overwrite:
92+
self.rm(destination_path)
93+
else:
94+
raise Exception(
95+
"{} already exists, set overwrite=True to overwrite it".format(
96+
local_path
97+
)
98+
)
4499

45-
num_chunks = math.ceil(size / self.DEFAULT_FLOW_CHUNK_SIZE)
100+
num_chunks = math.ceil(file_size / chunk_size)
46101

47-
base_params = self._get_flow_base_params(file_name, num_chunks, size)
102+
base_params = self._get_flow_base_params(
103+
file_name, num_chunks, file_size, chunk_size
104+
)
48105

49106
chunk_number = 1
50-
with open(local_abs_path, "rb") as f:
51-
while True:
52-
chunk = f.read(self.DEFAULT_FLOW_CHUNK_SIZE)
53-
if not chunk:
54-
break
55-
56-
query_params = base_params
57-
query_params["flowCurrentChunkSize"] = len(chunk)
58-
query_params["flowChunkNumber"] = chunk_number
107+
with open(local_path, "rb") as f:
108+
pbar = None
109+
try:
110+
pbar = tqdm(
111+
total=file_size,
112+
bar_format="{desc}: {percentage:.3f}%|{bar}| {n_fmt}/{total_fmt} elapsed<{elapsed} remaining<{remaining}",
113+
desc="Uploading",
114+
)
115+
except Exception:
116+
self._log.exception("Failed to initialize progress bar.")
117+
self._log.info("Starting upload")
118+
with ThreadPoolExecutor(simultaneous_uploads) as executor:
119+
while True:
120+
chunks = []
121+
for _ in range(simultaneous_uploads):
122+
chunk = f.read(chunk_size)
123+
if not chunk:
124+
break
125+
chunks.append(Chunk(chunk, chunk_number, "pending"))
126+
chunk_number += 1
127+
128+
if len(chunks) == 0:
129+
break
130+
131+
# upload each chunk and update pbar
132+
futures = [
133+
executor.submit(
134+
self._upload_chunk,
135+
base_params,
136+
upload_path,
137+
file_name,
138+
chunk,
139+
pbar,
140+
max_chunk_retries,
141+
chunk_retry_interval,
142+
)
143+
for chunk in chunks
144+
]
145+
# wait for all upload tasks to complete
146+
_, _ = wait(futures)
147+
try:
148+
_ = [future.result() for future in futures]
149+
except Exception as e:
150+
if pbar is not None:
151+
pbar.close()
152+
raise e
153+
154+
if pbar is not None:
155+
pbar.close()
156+
else:
157+
self._log.info("Upload finished")
158+
159+
return upload_path + "/" + os.path.basename(local_path)
160+
161+
def _upload_chunk(
162+
self,
163+
base_params,
164+
upload_path,
165+
file_name,
166+
chunk: Chunk,
167+
pbar,
168+
max_chunk_retries,
169+
chunk_retry_interval,
170+
):
171+
query_params = copy.copy(base_params)
172+
query_params["flowCurrentChunkSize"] = len(chunk.content)
173+
query_params["flowChunkNumber"] = chunk.number
174+
175+
chunk.status = "uploading"
176+
while True:
177+
try:
178+
self._upload_request(
179+
query_params, upload_path, file_name, chunk.content
180+
)
181+
break
182+
except RestAPIError as re:
183+
chunk.retries += 1
184+
if (
185+
re.response.status_code in DatasetApi.FLOW_PERMANENT_ERRORS
186+
or chunk.retries > max_chunk_retries
187+
):
188+
chunk.status = "failed"
189+
raise re
190+
time.sleep(chunk_retry_interval)
191+
continue
59192

60-
self._upload_request(query_params, upload_path, file_name, chunk)
193+
chunk.status = "uploaded"
61194

62-
chunk_number += 1
195+
if pbar is not None:
196+
pbar.update(query_params["flowCurrentChunkSize"])
63197

64-
def _get_flow_base_params(self, file_name, num_chunks, size):
198+
def _get_flow_base_params(self, file_name, num_chunks, size, chunk_size):
65199
return {
66200
"templateId": -1,
67-
"flowChunkSize": self.DEFAULT_FLOW_CHUNK_SIZE,
201+
"flowChunkSize": chunk_size,
68202
"flowTotalSize": size,
69203
"flowIdentifier": str(size) + "_" + file_name,
70204
"flowFilename": file_name,

0 commit comments

Comments
 (0)