Skip to content

Commit 66b1d3f

Browse files
committed
Encapsulate file mirroring in service class
1 parent b9247d8 commit 66b1d3f

File tree

2 files changed

+125
-51
lines changed

2 files changed

+125
-51
lines changed

scripts/mirror_file.py

Lines changed: 11 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
"""
66
import argparse
77
import logging
8-
import string
98
import sys
109

1110
from azul import (
@@ -16,24 +15,15 @@
1615
from azul.args import (
1716
AzulArgumentHelpFormatter,
1817
)
19-
from azul.azulclient import (
20-
AzulClient,
21-
)
22-
from azul.deployment import (
23-
aws,
24-
)
25-
from azul.drs import (
26-
AccessMethod,
27-
)
2818
from azul.http import (
2919
http_client,
3020
)
21+
from azul.indexer.mirror_service import (
22+
MirrorService,
23+
)
3124
from azul.logging import (
3225
configure_script_logging,
3326
)
34-
from azul.plugins import (
35-
File,
36-
)
3727
from azul.plugins.metadata.hca import (
3828
HCAFile,
3929
)
@@ -46,9 +36,6 @@
4636
from azul.service.source_service import (
4737
SourceService,
4838
)
49-
from azul.service.storage_service import (
50-
StorageService,
51-
)
5239

5340
log = logging.getLogger(__name__)
5441

@@ -69,45 +56,18 @@ def get_file(catalog: CatalogName, file_uuid: str) -> HCAFile:
6956
return file
7057

7158

72-
def get_download_url(catalog: CatalogName, file: File) -> str:
73-
drs_uri = file.drs_uri
74-
drs = AzulClient().repository_plugin(catalog).drs_client()
75-
access = drs.get_object(drs_uri, AccessMethod.gs)
76-
assert access.method is AccessMethod.https, access
77-
return access.url
78-
79-
80-
def object_key(file: File) -> str:
81-
digest, digest_type = file.digest()
82-
assert all(c in string.hexdigits for c in digest), R(
83-
'Expected a hexadecimal digest', digest)
84-
return f'file/{digest.lower()}.{digest_type}'
85-
86-
8759
def mirror_file(catalog: CatalogName, file_uuid: str, part_size: int) -> str:
8860
assert config.enable_mirroring, R('Mirroring must be enabled')
8961
assert config.is_tdr_enabled(catalog), R('Only TDR catalogs are supported')
90-
assert config.is_hca_enabled(catalog), R('Only HCA catalogs are supported')
9162
file = get_file(catalog, file_uuid)
92-
download_url = get_download_url(catalog, file)
93-
key = object_key(file)
94-
storage = StorageService(bucket_name=aws.mirror_bucket)
95-
upload = storage.create_multipart_upload(key, content_type=file.content_type)
96-
97-
def upload_part(part: File.PartRange) -> str:
98-
response = http.request('GET',
99-
download_url,
100-
headers={'Range': f'bytes={part.start}-{part.end}'})
101-
if response.status == 206:
102-
return storage.upload_multipart_part(response.data,
103-
part.part_number + 1,
104-
upload)
105-
else:
106-
raise RuntimeError('Unexpected response from repository', response.status)
107-
108-
etags = list(map(upload_part, file.partition(part_size)))
109-
storage.complete_multipart_upload(upload, etags)
110-
return storage.get_presigned_url(key, file.name)
63+
service = MirrorService()
64+
upload_id = service.begin_mirroring_file(file)
65+
etags = [
66+
service.mirror_file_part(catalog, file, part, upload_id)
67+
for part in file.partition(part_size)
68+
]
69+
service.finish_mirroring_file(file, upload_id, etags=etags)
70+
return service.get_mirror_url(file)
11171

11272

11373
def main(argv):

src/azul/indexer/mirror_service.py

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
import string
2+
from typing import (
3+
TYPE_CHECKING,
4+
)
5+
6+
from azul import (
7+
CatalogName,
8+
cache,
9+
cached_property,
10+
config,
11+
)
12+
from azul.deployment import (
13+
R,
14+
aws,
15+
)
16+
from azul.drs import (
17+
AccessMethod,
18+
)
19+
from azul.http import (
20+
HasCachedHttpClient,
21+
)
22+
from azul.plugins import (
23+
File,
24+
RepositoryPlugin,
25+
)
26+
from azul.plugins.metadata.hca import (
27+
HCAFile,
28+
)
29+
from azul.service.storage_service import (
30+
StorageService,
31+
)
32+
33+
if TYPE_CHECKING:
34+
from mypy_boto3_s3.service_resource import (
35+
MultipartUpload,
36+
)
37+
38+
39+
class MirrorService(HasCachedHttpClient):
40+
41+
@cached_property
42+
def _storage(self) -> StorageService:
43+
return StorageService(bucket_name=aws.mirror_bucket)
44+
45+
@cache
46+
def repository_plugin(self, catalog: CatalogName) -> RepositoryPlugin:
47+
return RepositoryPlugin.load(catalog).create(catalog)
48+
49+
def begin_mirroring_file(self, file: File) -> str:
50+
"""
51+
Initiate a multipart upload of the given file and return the upload ID.
52+
"""
53+
assert isinstance(file, HCAFile), R('Only HCA catalogs are supported')
54+
upload = self._storage.create_multipart_upload(object_key=self.mirror_object_key(file),
55+
content_type=file.content_type)
56+
return upload.upload_id
57+
58+
def mirror_file_part(self,
59+
catalog: CatalogName,
60+
file: File,
61+
part: File.PartRange,
62+
upload_id: str):
63+
"""
64+
Upload a part of a file to a multipart upload begun with
65+
:meth:`begin_mirroring_file`
66+
"""
67+
download_url = self._get_repository_url(catalog, file)
68+
upload = self._get_upload(file, upload_id)
69+
response = self._http_client.request('GET',
70+
download_url,
71+
headers={'Range': f'bytes={part.start}-{part.end}'})
72+
if response.status == 206:
73+
return self._storage.upload_multipart_part(response.data,
74+
part.part_number + 1,
75+
upload)
76+
else:
77+
raise RuntimeError('Unexpected response from repository', response.status)
78+
79+
def finish_mirroring_file(self,
80+
file: File,
81+
upload_id: str,
82+
*,
83+
etags: list[str] | None):
84+
"""
85+
Complete a multipart upload begun with :meth:`begin_mirroring_file`.
86+
If ETags are not provided, the caller is responsible for ensuring that
87+
all previous calls to :meth:`mirror_file_part` were successful.
88+
"""
89+
upload = self._get_upload(file, upload_id)
90+
self._storage.complete_multipart_upload(upload, etags)
91+
92+
def get_mirror_url(self, file: File) -> str:
93+
return self._storage.get_presigned_url(key=self.mirror_object_key(file),
94+
file_name=file.name)
95+
96+
def mirror_object_key(self, file: File) -> str:
97+
return self._file_key('file', file)
98+
99+
def _file_key(self, prefix: str, file: File) -> str:
100+
digest, digest_type = file.digest()
101+
assert all(c in string.hexdigits for c in digest), R(
102+
'Expected a hexadecimal digest', digest)
103+
return f'{prefix}/{digest.lower()}.{digest_type}'
104+
105+
def _get_repository_url(self, catalog: CatalogName, file: File):
106+
assert config.is_tdr_enabled(catalog), R('Only TDR catalogs are supported')
107+
drs = self.repository_plugin(catalog).drs_client(authentication=None)
108+
access = drs.get_object(file.drs_uri, AccessMethod.gs)
109+
assert access.method is AccessMethod.https, access
110+
return access.url
111+
112+
def _get_upload(self, file: File, upload_id: str) -> 'MultipartUpload':
113+
return self._storage.load_multipart_upload(object_key=self.mirror_object_key(file),
114+
upload_id=upload_id)

0 commit comments

Comments
 (0)