Skip to content

Commit 37c4900

Browse files
committed
fixup! Encapsulate file mirroring in service class
1 parent 43f61a8 commit 37c4900

File tree

2 files changed

+54
-25
lines changed

2 files changed

+54
-25
lines changed

scripts/mirror_file.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def mirror_file(catalog: CatalogName, file_uuid: str, part_size: int) -> str:
6565
upload_id = service.begin_mirroring_file(file)
6666

6767
def mirror_parts():
68-
part = FilePart.head(file, part_size)
68+
part = FilePart.first(file, part_size)
6969
while part is not None:
7070
yield service.mirror_file_part(catalog, file, part, upload_id)
7171
part = part.next(file)

src/azul/indexer/mirror_service.py

+53-24
Original file line numberDiff line numberDiff line change
@@ -49,40 +49,69 @@
4949

5050
@attrs.frozen(auto_attribs=True, kw_only=True)
5151
class FilePart(SerializableAttrs):
52-
part_number: int # Starts at 1
53-
start: int
54-
end: int # Included in the part
55-
56-
# https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
52+
"""
53+
A part of a mirrored file
54+
"""
55+
#: The part number, starting at 0 for the first part. Note that the S3 API
56+
#: numbers parts starting at 1.
57+
#:
58+
index: int
59+
60+
#: Offset of the first byte of this part, relative to the start of the file
61+
offset: int
62+
63+
#: The size of this part
64+
#:
65+
size: int
66+
67+
#: Various quotas related to parts and part sizes
68+
#: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
69+
#:
5770
min_size: ClassVar[int] = 5 * 1024 ** 2
58-
default_size: ClassVar[int] = 50 * 1024 ** 2
5971
max_size: ClassVar[int] = 5 * 1024 ** 3
60-
max_number: ClassVar[int] = 10000
61-
62-
@property
63-
def size(self) -> int:
64-
return self.end - self.start + 1
72+
max_num_parts: ClassVar[int] = 10000
73+
74+
#: We observe a download rate of ~14 MB/s. Download time should ideally be
75+
#: 1/4 of the Lambda timeout. Since we track the ETag of each part in SQS
76+
#: messages, message size becomes another constraint: we observe ETags to be
77+
#: 32 byte hexadecimal strings which, if represented in a JSON array, take
78+
#: up 35 bytes per item, 36 if the comma is followed by a space. With a
79+
#: maximum SQS message size of 256 KiB, we can store approximately 7280
80+
#: ETags in an SQS messages, so the largest file we can mirror using a part
81+
#: size of 256 MiB is 1.5 TiB.
82+
#:
83+
default_size: ClassVar[int] = 256 * 1024 ** 2
6584

6685
@classmethod
67-
def head(cls, file: File, part_size: int) -> Self:
68-
assert file.size is not None, R('File size unknown', file)
69-
assert cls.min_size <= part_size <= cls.max_size, R('Invalid part size', part_size)
86+
def first(cls, file: File, part_size: int) -> Self:
87+
"""
88+
The first part of the given file, using the given part size.
89+
"""
90+
assert file.size is not None, R(
91+
'File size unknown', file)
92+
assert cls.min_size <= part_size <= cls.max_size, R(
93+
'Invalid part size', part_size)
7094
part_count = math.ceil(file.size / part_size)
71-
assert part_count <= 10000, R(
95+
assert part_count <= cls.max_num_parts, R(
7296
'Part size is too small for this file', part_size, file)
73-
return cls(part_number=1, start=0, end=min(part_size, file.size))
97+
return cls(index=0, offset=0, size=min(part_size, file.size))
7498

7599
def next(self, file: File) -> Self | None:
100+
"""
101+
The part following this part in the given file, or None if this is the
102+
last part.
103+
"""
76104
assert file.size is not None, R('File size unknown', file)
77-
if self.end == file.size:
105+
stop = self.offset + self.size
106+
if stop == file.size:
78107
return None
79-
elif self.end < file.size:
108+
elif 0 < stop < file.size:
80109
return attr.evolve(self,
81-
part_number=self.part_number + 1,
82-
start=self.end + 1,
83-
end=min(self.end + self.size, file.size))
110+
index=self.index + 1,
111+
offset=stop,
112+
size=min(self.size, file.size - stop))
84113
else:
85-
assert False, R('Invalid part range for this file', self, file)
114+
assert False, R('Part range exceeds file size', self, file)
86115

87116

88117
class MirrorService(HasCachedHttpClient):
@@ -116,7 +145,7 @@ def mirror_file_part(self,
116145
upload = self._get_upload(file, upload_id)
117146
file_content = self._download(catalog, file, part)
118147
return self._storage.upload_multipart_part(file_content,
119-
part.part_number,
148+
part.index + 1,
120149
upload)
121150

122151
def finish_mirroring_file(self,
@@ -163,7 +192,7 @@ def _download(self,
163192
size = file.size
164193
expected_status = 200
165194
else:
166-
headers = {'Range': f'bytes={part.start}-{part.end}'}
195+
headers = {'Range': f'bytes={part.offset}-{part.offset + part.size + 1}'}
167196
size = part.size
168197
expected_status = 206
169198
# Ideally we would stream the response, but boto only supports uploading

0 commit comments

Comments
 (0)