Skip to content

Commit 788dda8

Browse files
committed
Encapsulate file mirroring in service class
1 parent 32f19dd commit 788dda8

File tree

3 files changed

+185
-64
lines changed

3 files changed

+185
-64
lines changed

.mypy.ini

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ modules =
5252
azul.indexer.field,
5353
azul.indexer.action_controller,
5454
azul.indexer.mirror_controller,
55-
azul.service.app_controller
55+
azul.service.app_controller,
56+
azul.indexer.mirror_service
5657
packages =
5758
azul.openapi
5859

scripts/mirror_file.py

Lines changed: 13 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
"""
66
import argparse
77
import logging
8-
import math
9-
import string
108
import sys
119

1210
from azul import (
@@ -17,24 +15,16 @@
1715
from azul.args import (
1816
AzulArgumentHelpFormatter,
1917
)
20-
from azul.azulclient import (
21-
AzulClient,
22-
)
23-
from azul.deployment import (
24-
aws,
25-
)
26-
from azul.drs import (
27-
AccessMethod,
28-
)
2918
from azul.http import (
3019
http_client,
3120
)
21+
from azul.indexer.mirror_service import (
22+
FilePart,
23+
MirrorService,
24+
)
3225
from azul.logging import (
3326
configure_script_logging,
3427
)
35-
from azul.plugins import (
36-
File,
37-
)
3828
from azul.plugins.metadata.hca import (
3929
HCAFile,
4030
)
@@ -47,9 +37,6 @@
4737
from azul.service.source_service import (
4838
SourceService,
4939
)
50-
from azul.service.storage_service import (
51-
StorageService,
52-
)
5340

5441
log = logging.getLogger(__name__)
5542

@@ -70,63 +57,26 @@ def get_file(catalog: CatalogName, file_uuid: str) -> HCAFile:
7057
return file
7158

7259

73-
def get_download_url(catalog: CatalogName, file: File) -> str:
74-
drs_uri = file.drs_uri
75-
drs = AzulClient().repository_plugin(catalog).drs_client()
76-
access = drs.get_object(drs_uri, AccessMethod.gs)
77-
assert access.method is AccessMethod.https, access
78-
return access.url
79-
80-
81-
def object_key(file: File) -> str:
82-
digest, digest_type = file.digest()
83-
assert all(c in string.hexdigits for c in digest), R(
84-
'Expected a hexadecimal digest', digest)
85-
return f'file/{digest.lower()}.{digest_type}'
86-
87-
8860
def mirror_file(catalog: CatalogName, file_uuid: str, part_size: int) -> str:
8961
assert config.enable_mirroring, R('Mirroring must be enabled')
9062
assert config.is_tdr_enabled(catalog), R('Only TDR catalogs are supported')
91-
assert config.is_hca_enabled(catalog), R('Only HCA catalogs are supported')
92-
# https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
93-
assert 5 * 2 ** 20 <= part_size <= 5 * 2 ** 30, R(
94-
'Invalid part size', part_size)
9563
file = get_file(catalog, file_uuid)
96-
download_url = get_download_url(catalog, file)
97-
key = object_key(file)
98-
storage = StorageService(bucket_name=aws.mirror_bucket)
99-
upload = storage.create_multipart_upload(key, content_type=file.content_type)
100-
101-
total_size = file.size
102-
part_count = math.ceil(total_size / part_size)
103-
assert part_count <= 10000, R('Part size is too small for this file',
104-
total_size, part_size, part_count)
105-
106-
def file_part(part_number: int) -> str:
107-
start = part_number * part_size
108-
end = min((part_number + 1) * part_size - 1, total_size)
109-
response = http.request('GET',
110-
download_url,
111-
headers={'Range': f'bytes={start}-{end}'})
112-
if response.status == 206:
113-
return storage.upload_multipart_part(response.data,
114-
part_number + 1,
115-
upload)
116-
else:
117-
raise RuntimeError('Unexpected response from repository', response.status)
118-
119-
etags = list(map(file_part, range(part_count)))
120-
storage.complete_multipart_upload(upload, etags)
121-
return storage.get_presigned_url(key, file.name)
64+
service = MirrorService()
65+
upload_id = service.begin_mirroring_file(file)
66+
etags = [
67+
service.mirror_file_part(catalog, file, part, upload_id)
68+
for part in FilePart.partition(file, part_size)
69+
]
70+
service.finish_mirroring_file(file, upload_id, etags=etags)
71+
return service.get_mirror_url(file)
12272

12373

12474
def main(argv):
12575
parser = argparse.ArgumentParser(description=__doc__,
12676
formatter_class=AzulArgumentHelpFormatter)
12777
parser.add_argument('-c', '--catalog', default=config.default_catalog)
12878
parser.add_argument('-f', '--file-uuid')
129-
parser.add_argument('-p', '--part-size', type=int, default=50 * 2 ** 20)
79+
parser.add_argument('-p', '--part-size', type=int, default=FilePart.default_size)
13080
args = parser.parse_args(argv)
13181
signed_url = mirror_file(args.catalog, args.file_uuid, args.part_size)
13282
print(signed_url)

src/azul/indexer/mirror_service.py

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
import logging
2+
import math
3+
import string
4+
import time
5+
from typing import (
6+
ClassVar,
7+
Iterable,
8+
Self,
9+
Sequence,
10+
TYPE_CHECKING,
11+
)
12+
13+
import attrs
14+
15+
from azul import (
16+
CatalogName,
17+
R,
18+
cache,
19+
cached_property,
20+
config,
21+
)
22+
from azul.attrs import (
23+
SerializableAttrs,
24+
)
25+
from azul.deployment import (
26+
aws,
27+
)
28+
from azul.drs import (
29+
AccessMethod,
30+
)
31+
from azul.http import (
32+
HasCachedHttpClient,
33+
)
34+
from azul.plugins import (
35+
File,
36+
RepositoryPlugin,
37+
)
38+
from azul.service.storage_service import (
39+
StorageService,
40+
)
41+
42+
if TYPE_CHECKING:
43+
from mypy_boto3_s3.service_resource import (
44+
MultipartUpload,
45+
)
46+
47+
log = logging.getLogger(__name__)
48+
49+
50+
@attrs.frozen(auto_attribs=True, kw_only=True)
51+
class FilePart(SerializableAttrs):
52+
part_number: int # Starts at 1
53+
start: int
54+
end: int # Included in the part
55+
56+
# https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
57+
min_size: ClassVar[int] = 5 * 1024 ** 2
58+
default_size: ClassVar[int] = 50 * 1024 ** 2
59+
max_size: ClassVar[int] = 5 * 1024 ** 3
60+
max_number: ClassVar[int] = 10000
61+
62+
@property
63+
def size(self) -> int:
64+
return self.end - self.start + 1
65+
66+
@classmethod
67+
def partition(cls, file: File, part_size: int) -> Iterable[Self]:
68+
assert file.size is not None, R('File size unknown', file)
69+
assert cls.min_size <= part_size <= cls.max_size, R('Invalid part size', part_size)
70+
part_count = math.ceil(file.size / part_size)
71+
assert part_count <= 10000, R(
72+
'Part size is too small for this file', part_size, file)
73+
for i in range(part_count):
74+
start = i * part_size
75+
end = min((i + 1) * part_size - 1, file.size)
76+
yield cls(part_number=i + 1, start=start, end=end)
77+
78+
79+
class MirrorService(HasCachedHttpClient):
80+
81+
@cached_property
82+
def _storage(self) -> StorageService:
83+
return StorageService(bucket_name=aws.mirror_bucket)
84+
85+
@cache
86+
def repository_plugin(self, catalog: CatalogName) -> RepositoryPlugin:
87+
return RepositoryPlugin.load(catalog).create(catalog)
88+
89+
def begin_mirroring_file(self, file: File) -> str:
90+
"""
91+
Initiate a multipart upload of the file's content and return the upload
92+
ID.
93+
"""
94+
upload = self._storage.create_multipart_upload(object_key=self.mirror_object_key(file),
95+
content_type=file.content_type)
96+
return upload.id
97+
98+
def mirror_file_part(self,
99+
catalog: CatalogName,
100+
file: File,
101+
part: FilePart,
102+
upload_id: str):
103+
"""
104+
Upload a part of a file to a multipart upload begun with
105+
:meth:`begin_mirroring_file`
106+
"""
107+
upload = self._get_upload(file, upload_id)
108+
file_content = self._download(catalog, file, part)
109+
return self._storage.upload_multipart_part(file_content,
110+
part.part_number,
111+
upload)
112+
113+
def finish_mirroring_file(self,
114+
file: File,
115+
upload_id: str,
116+
etags: Sequence[str]):
117+
"""
118+
Complete a multipart upload begun with :meth:`begin_mirroring_file`.
119+
"""
120+
upload = self._get_upload(file, upload_id)
121+
self._storage.complete_multipart_upload(upload, etags)
122+
123+
def get_mirror_url(self, file: File) -> str:
124+
return self._storage.get_presigned_url(key=self.mirror_object_key(file),
125+
file_name=file.name)
126+
127+
def mirror_object_key(self, file: File) -> str:
128+
return self._file_key('file', file)
129+
130+
def _file_key(self, prefix: str, file: File) -> str:
131+
digest, digest_type = file.digest()
132+
assert all(c in string.hexdigits for c in digest), R(
133+
'Expected a hexadecimal digest', digest)
134+
return f'{prefix}/{digest.lower()}.{digest_type}'
135+
136+
@cache
137+
def _get_repository_url(self, catalog: CatalogName, file: File):
138+
assert config.is_tdr_enabled(catalog), R('Only TDR catalogs are supported', catalog)
139+
assert file.drs_uri is not None, R('File cannot be downloaded', file.uuid)
140+
drs = self.repository_plugin(catalog).drs_client(authentication=None)
141+
access = drs.get_object(file.drs_uri, AccessMethod.gs)
142+
assert access.method is AccessMethod.https, access
143+
return access.url
144+
145+
def _download(self,
146+
catalog: CatalogName,
147+
file: File,
148+
part: FilePart | None = None
149+
) -> bytes:
150+
download_url = self._get_repository_url(catalog, file)
151+
start = time.time()
152+
if part is None:
153+
headers = {}
154+
size = file.size
155+
else:
156+
headers = {'Range': f'bytes={part.start}-{part.end}'}
157+
size = part.end - part.start + 1
158+
# Ideally we would stream the response, but boto only supports uploading
159+
# from streams that are seekable.
160+
response = self._http_client.request('GET', download_url, headers=headers)
161+
if response.status == 206:
162+
log.info('Downloaded %d bytes of file %r in %.3fs',
163+
size, file.uuid, time.time() - start)
164+
return response.data
165+
else:
166+
raise RuntimeError('Unexpected response from repository', response.status)
167+
168+
def _get_upload(self, file: File, upload_id: str) -> 'MultipartUpload':
169+
return self._storage.load_multipart_upload(object_key=self.mirror_object_key(file),
170+
upload_id=upload_id)

0 commit comments

Comments
 (0)