Skip to content

Commit 7a10a86

Browse files
authored
Merge pull request #13 from AllenInstitute/feature/add-fsx-utils
Adding FSx Utilities
2 parents 3f11f7b + ee78b99 commit 7a10a86

File tree

5 files changed

+393
-2
lines changed

5 files changed

+393
-2
lines changed

pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ dependencies = [
2424
[project.optional-dependencies]
2525
dev = [
2626
"aibs-informatics-test-resources @ git+ssh://git@github.com/AllenInstitute/aibs-informatics-test-resources.git@main",
27-
"boto3-stubs[athena,apigateway,batch,ecr,ecs,efs,essential,logs,secretsmanager,ses,sns,ssm,sts,stepfunctions]",
27+
"boto3-stubs[athena,apigateway,batch,ecr,ecs,efs,essential,fsx,logs,secretsmanager,ses,sns,ssm,sts,stepfunctions]",
2828
"moto[all] ~= 5.0",
2929
]
3030

src/aibs_informatics_aws_utils/batch.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ class BatchJobBuilder:
263263
mount_points: List[MountPointTypeDef] = field(default_factory=list)
264264
volumes: List[VolumeTypeDef] = field(default_factory=list)
265265
privileged: bool = field(default=False)
266+
linux_parameters: Optional[Dict[str, Any]] = field(default=None)
266267
env_base: EnvBase = field(default_factory=EnvBase.from_env)
267268

268269
def __post_init__(self):
@@ -272,7 +273,7 @@ def __post_init__(self):
272273

273274
@property
274275
def container_properties(self) -> ContainerPropertiesTypeDef:
275-
return ContainerPropertiesTypeDef(
276+
container_props = ContainerPropertiesTypeDef(
276277
image=self.image,
277278
command=self.command,
278279
environment=to_key_value_pairs(self.environment),
@@ -281,6 +282,9 @@ def container_properties(self) -> ContainerPropertiesTypeDef:
281282
volumes=self.volumes,
282283
privileged=self.privileged,
283284
)
285+
if self.linux_parameters:
286+
container_props["linuxParameters"] = self.linux_parameters
287+
return container_props
284288

285289
@property
286290
def container_overrides(self) -> ContainerOverridesTypeDef:

src/aibs_informatics_aws_utils/core.py

+4
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
from mypy_boto3_ecr import ECRClient
3434
from mypy_boto3_ecs import ECSClient
3535
from mypy_boto3_efs import EFSClient
36+
from mypy_boto3_fsx import FSxClient
3637
from mypy_boto3_lambda import LambdaClient
3738
from mypy_boto3_logs import CloudWatchLogsClient
3839
from mypy_boto3_s3 import S3Client, S3ServiceResource
@@ -54,6 +55,7 @@
5455
ECRClient = object
5556
ECSClient = object
5657
EFSClient = object
58+
FSxClient = object
5759
GetCallerIdentityResponseTypeDef = dict
5860
LambdaClient = object
5961
S3Client, S3ServiceResource = object, object
@@ -194,6 +196,7 @@ def client_error_code_check(client_error: ClientError, *error_codes: str) -> boo
194196
"ecr",
195197
"ecs",
196198
"efs",
199+
"fsx",
197200
"lambda",
198201
"logs",
199202
"s3",
@@ -329,6 +332,7 @@ class AWSService:
329332
ECR = AWSServiceProvider[ECRClient]("ecr")
330333
ECS = AWSServiceProvider[ECSClient]("ecs")
331334
EFS = AWSServiceProvider[EFSClient]("efs")
335+
FSX = AWSServiceProvider[FSxClient]("fsx")
332336
LAMBDA = AWSServiceProvider[LambdaClient]("lambda")
333337
LOGS = AWSServiceProvider[CloudWatchLogsClient]("logs")
334338
S3 = AWSServiceAndResourceProvider[S3Client, S3ServiceResource]("s3")

src/aibs_informatics_aws_utils/fsx.py

+262
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
from __future__ import annotations
2+
3+
import re
4+
from math import ceil
5+
from re import Pattern
6+
7+
__all__ = [
8+
"get_file_system",
9+
"list_file_systems",
10+
]
11+
import logging
12+
from pathlib import Path
13+
from typing import TYPE_CHECKING, ClassVar, Dict, List, Optional, Tuple, Union
14+
15+
from aibs_informatics_core.collections import ValidatedStr
16+
from aibs_informatics_core.utils.decorators import retry
17+
from aibs_informatics_core.utils.tools.dicttools import remove_null_values
18+
from botocore.exceptions import ClientError
19+
20+
from aibs_informatics_aws_utils.core import AWSService, client_error_code_check
21+
22+
if TYPE_CHECKING: # pragma: no cover
23+
from mypy_boto3_fsx.type_defs import (
24+
DataRepositoryAssociationTypeDef,
25+
FileSystemTypeDef,
26+
FilterTypeDef,
27+
TagTypeDef,
28+
)
29+
else:
30+
FileSystemTypeDef = dict
31+
TagTypeDef = dict
32+
33+
34+
logger = logging.getLogger(__name__)
35+
36+
get_fsx_client = AWSService.FSX.get_client
37+
38+
StrPath = Union[Path, str]
39+
40+
41+
class FSxFileSystemId(ValidatedStr):
42+
regex_pattern: ClassVar[Pattern] = re.compile(r"fs-[0-9a-f]{17}")
43+
44+
45+
FileSystemIdOrName = Union[FSxFileSystemId, str]
46+
FileSystemNameOrId = Union[FSxFileSystemId, str]
47+
48+
49+
def split_name_and_id(
50+
id_or_name: Optional[FileSystemNameOrId],
51+
) -> Tuple[Optional[str], Optional[FSxFileSystemId]]:
52+
"""Identify file system identifier as name or id
53+
54+
Examples:
55+
INP: NAME1
56+
OUT: (NAME1, None)
57+
58+
INP: ID1
59+
OUT: (None, ID1)
60+
61+
INP: None
62+
OUT: (None, None)
63+
64+
Args:
65+
id_or_name (Optional[FileSystemNameOrId]): File system id or name.
66+
67+
68+
Returns:
69+
Tuple[Optional[str], Optional[FSxFileSystemId]]: Tuple of name and id.
70+
"""
71+
if not id_or_name:
72+
return None, None
73+
elif FSxFileSystemId.is_valid(id_or_name):
74+
return None, FSxFileSystemId(id_or_name)
75+
return id_or_name, None
76+
77+
78+
def split_name_and_ids(
79+
names_or_ids: List[FileSystemNameOrId],
80+
) -> Tuple[List[str], List[FSxFileSystemId]]:
81+
"""Split file system combined list of names and ids into separate lists.
82+
83+
Example:
84+
INP: [NAME1, ID1, NAME2, NAME3, ID2]
85+
OUT: ([NAME1, NAME2, NAME3], [ID1, ID2])
86+
87+
Args:
88+
names_or_ids (List[FileSystemNameOrId]): List of names and/or ids.
89+
90+
Returns:
91+
Tuple[List[str], List[FSxFileSystemId]]: Tuple of name and id lists.
92+
"""
93+
if not names_or_ids:
94+
return [], []
95+
names, file_system_ids = zip(*(split_name_and_id(name_or_id) for name_or_id in names_or_ids))
96+
return [name for name in names if name], [
97+
file_system_id for file_system_id in file_system_ids if file_system_id
98+
]
99+
100+
101+
def resolve_file_system_ids(*name_or_ids: FileSystemNameOrId) -> List[FSxFileSystemId]:
102+
"""Resolve file system ids from file system names or ids.
103+
104+
Args:
105+
name_or_ids (Tuple[FileSystemNameOrId]): File system names or ids.
106+
107+
Returns:
108+
str: File system id.
109+
"""
110+
file_system_ids: List[FSxFileSystemId] = []
111+
for name_or_id in name_or_ids:
112+
name, file_system_id = split_name_and_id(name_or_id)
113+
if file_system_id:
114+
file_system_ids.append(file_system_id)
115+
else:
116+
file_system = get_file_system(name)
117+
if not file_system:
118+
raise ValueError(f"File system not found with name: {name}")
119+
file_system_ids.append(FSxFileSystemId(file_system.get("FileSystemId", "")))
120+
return file_system_ids
121+
122+
123+
def get_file_system(
124+
name_or_id: Optional[FileSystemNameOrId] = None,
125+
tags: Optional[Dict[str, str]] = None,
126+
) -> FileSystemTypeDef:
127+
"""Get FSx file system.
128+
129+
Args:
130+
file_system_id (Optional[str], optional): File system id.
131+
name (Optional[str], optional): File system name.
132+
tags (Optional[Dict[str, str]], optional): File system tags.
133+
134+
Returns:
135+
FileSystemDescriptionTypeDef: File system description.
136+
"""
137+
if not name_or_id and not tags:
138+
raise ValueError("At least one of file_system_id, name or tags must be provided.")
139+
140+
file_systems = list_file_systems(name_or_ids=[name_or_id] if name_or_id else None, tags=tags)
141+
if len(file_systems) > 1:
142+
raise ValueError(
143+
f"Multiple file systems found with name/id: {name_or_id} and tags: {tags}"
144+
)
145+
if not file_systems:
146+
raise ValueError(f"File system not found with name/id: {name_or_id} and tags: {tags}")
147+
return file_systems[0]
148+
149+
150+
def list_file_systems(
151+
name_or_ids: Optional[List[FileSystemNameOrId]] = None,
152+
tags: Optional[Dict[str, str]] = None,
153+
**kwargs,
154+
) -> List[FileSystemTypeDef]:
155+
"""List FSx file systems.
156+
157+
You can filter on id, name and tags.
158+
159+
Args:
160+
file_system_id (Optional[str], optional): Optionally filter on file system id.
161+
name (Optional[str], optional): Optionally filter on name.
162+
tags (Optional[Dict[str, str]], optional): Optionally filter on tags.
163+
164+
Returns:
165+
List[FileSystemDescriptionTypeDef]: List of file systems.
166+
"""
167+
client = get_fsx_client(**kwargs)
168+
paginator = client.get_paginator("describe_file_systems")
169+
names, file_system_ids = split_name_and_ids(name_or_ids or [])
170+
if file_system_ids:
171+
response_iter = paginator.paginate(FileSystemIds=file_system_ids)
172+
else:
173+
response_iter = paginator.paginate()
174+
file_systems: List[FileSystemTypeDef] = []
175+
for response in response_iter:
176+
filtered_file_systems = response["FileSystems"]
177+
if not names and not tags:
178+
file_systems.extend(filtered_file_systems)
179+
continue
180+
old_filtered_file_systems = filtered_file_systems
181+
filtered_file_systems = []
182+
for fs in old_filtered_file_systems:
183+
fs_tags_dict = {tag["Key"]: tag["Value"] for tag in fs.get("Tags", [])}
184+
if names and not fs_tags_dict.get("Name") in names:
185+
continue
186+
if tags and not all(
187+
tag in fs_tags_dict and value == fs_tags_dict[tag] for tag, value in tags.items()
188+
):
189+
continue
190+
filtered_file_systems.append(fs)
191+
file_systems.extend(filtered_file_systems)
192+
return file_systems
193+
194+
195+
def list_data_repository_associations(
196+
name_or_id: Optional[FileSystemIdOrName],
197+
filters: Optional[List[FilterTypeDef]] = None,
198+
# TODO: should I include data repository paths?
199+
data_repository_paths: Optional[List[str]] = None,
200+
**kwargs,
201+
) -> List[DataRepositoryAssociationTypeDef]:
202+
"""List data repository associations for a file system.
203+
204+
Args:
205+
file_system_id (str): File system id.
206+
207+
Returns:
208+
List[FileSystemTypeDef]: List of data repository associations.
209+
"""
210+
client = get_fsx_client(**kwargs)
211+
if name_or_id:
212+
file_system_id = resolve_file_system_ids(name_or_id)
213+
if filters:
214+
if id_filter := next(
215+
(filter for filter in filters if filter.get("Name") == "file-system-id"), None
216+
):
217+
id_filter["Values"] = list(id_filter.get("Values", [])) + [file_system_id]
218+
else:
219+
filters.append({"Name": "file-system-id", "Values": file_system_id})
220+
else:
221+
filters = [{"Name": "file-system-id", "Values": file_system_id}]
222+
if not filters:
223+
filters = []
224+
associations: List[DataRepositoryAssociationTypeDef] = []
225+
response = client.describe_data_repository_associations(Filters=filters)
226+
while response["Associations"]:
227+
new_associations = response["Associations"]
228+
if data_repository_paths:
229+
new_associations = [
230+
association
231+
for association in new_associations
232+
if association.get("DataRepositoryPath") in data_repository_paths
233+
]
234+
associations.extend(new_associations)
235+
if not (next_token := response.get("NextToken")):
236+
break
237+
response = client.describe_data_repository_associations(
238+
Filters=filters, NextToken=next_token
239+
)
240+
return associations
241+
242+
243+
def calculate_size_required(bytes_required: int) -> int:
244+
"""Calculate size of file system for the given bytes specified.
245+
246+
FSx file systems are created with a size of
247+
- 1.2 TB,
248+
- 2.4 TB
249+
- any multiple 2.4 TB.
250+
251+
Args:
252+
bytes_required (int): Bytes required.
253+
254+
Returns:
255+
int: Size required.
256+
"""
257+
BYTES_IN_TB = 1024 * 1024 * 1024 * 1024
258+
if bytes_required <= 1.2 * BYTES_IN_TB:
259+
return ceil(1.2 * BYTES_IN_TB)
260+
if bytes_required <= 2.4 * BYTES_IN_TB:
261+
return ceil(2.4 * BYTES_IN_TB)
262+
return ceil((bytes_required // (2.4 * BYTES_IN_TB) + 1) * 2.4 * BYTES_IN_TB)

0 commit comments

Comments
 (0)