diff --git a/docs/distributed-ml-training-architecture.png b/docs/distributed-ml-training-architecture.png new file mode 100644 index 00000000..92522742 Binary files /dev/null and b/docs/distributed-ml-training-architecture.png differ diff --git a/terraform/ec2-examples/distributed-ml-training/README.md b/terraform/ec2-examples/distributed-ml-training/README.md new file mode 100644 index 00000000..e84fc22d --- /dev/null +++ b/terraform/ec2-examples/distributed-ml-training/README.md @@ -0,0 +1,171 @@ +# ECS machine learning distributed training + +This solution blueprint creates the infrastructure to run distributed training jobs using a [Ray cluster](https://docs.ray.io/en/latest/cluster/getting-started.html) and [PyTorch](https://pytorch.org/). + +![Distributed ML architecture](../../../docs/distributed-ml-training-architecture.png) + +By default, this blueprint uses g5.xlarge (with 1 GPU) instances to showcase a multi-node, data parallel distributed training. You can modify this blueprint to use larger instances from the local variable **instance_type_workers** if you need more GPUs. - if you change the instance type, you need to also modify the worker task and service definition memory, CPU and GPUs and the container command parameters. The [training script example](./training_example.py) assumes 2 machines with a single GPU each, but can be changed via the **num_workers** variable. + +## Components + +* Service discovery: The head node is registered to a private DNS using local zones via cloud map. This allows worker tasks to discover the head task and join the cluster on start up. +* 2 autoscaling groups: One for the head instance and another one for the worker instances +* ECS service definition: + * Head service: runs singleton processes responsible for cluster management + * Worker service: runs training jobs +* S3 bucket to store the results + +## Deployment + +1. Deploy core-infra resources + +```shell +cd ./terraform/ec2-examples/core-infra +terraform init +terraform apply -target=module.vpc -target=aws_service_discovery_private_dns_namespace.this +``` + +2. Deploy this blueprint + +```shell +cd ../distributed-ml-training +terraform init +terraform apply +``` + +## Example: training the resnet model with the FashionMNIST dataset + +Once the cluster is deployed, you can connect to the EC2 instance running the head container using SSM, and open a bash shell in the container from there. This is only for demonstration purposes - Using notebooks with [SageMaker](https://aws.amazon.com/sagemaker/) or [Cloud 9](https://aws.amazon.com/cloud9/) provide a better user experience to run training jobs in python than using the bash shell + +1. Connect to the instance +```bash +HEAD_INSTANCE_ID=$(aws ec2 describe-instances \ + --filters 'Name=tag:Name,Values=ecs-demo-distributed-ml-training-head' 'Name=instance-state-name,Values=running' \ + --query 'Reservations[*].Instances[0].InstanceId' --output text --region us-west-2 +) + +aws ssm start-session --target $HEAD_INSTANCE_ID --region us-west-2 +``` + +2. Connect to the container + +Due to the size of the container images, it might take several minutes until the containers reach a running state. The following command will fail if the contains is not running. + +``` +CONTAINER_ID=$(sudo docker ps -qf "name=.*-rayhead-.*") +sudo docker exec -it $CONTAINER_ID bash +``` + +3. Inside the container shell, check the cluster status. 3 nodes should be listed as healthy with 2.0 GPUs available - If you do not see 2.0 GPUs, the workers have not started yet. + +```bash +ray status +``` + +Example output: + +``` +(...) +Node status +--------------------------------------------------------------- +Healthy: + 1 node_a3d74b6d5089c52f9848c1529349ba5c4966edaa633374b0566c7d69 + 1 node_a5a1aa596068c73e17e029ca221bfad7a7b0085a0273da3c7ad86096 + 1 node_3ae0c0cabb682158fef418bbabdf2ea63820e8b68e4ae2f4b24c8e66 +Pending: + (no pending nodes) +Recent failures: + (no failures) + +(...) + +Resources +--------------------------------------------------------------- +Usage: + 0.0/6.0 CPU + 0.0/2.0 GPU + 0B/38.00GiB memory + 0B/11.87GiB object_store_memory + +Demands: + (no resource demands) + +``` + +4. Run the [training script example](./training_example.py) - It uses distributed data parallel to split the data between GPUs. You can look at the comments inside the python script to learn more about each step. A bucket is created as part of the terraform plan (Bucket name is printed as output). Make sure to add the name of that bucket (starts with "dt-results-") as argument of the training_example.py script + +```bash +export RAY_DEDUP_LOGS=0 # Makes the logs verbose per each process in the training +wget https://raw.githubusercontent.com/aws-ia/ecs-blueprints/main/terraform/ec2-examples/distributed-ml-training/training_example.py +python training_example.py YOUR_BUCKET_NAME +``` + +Example output: + +``` +(...) +Wrapping provided model in DistributedDataParallel. +(...) + +(RayTrainWorker pid=227, ip=10.0.2.255) [Epoch 0 | GPU0: Process rank 0 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.660568237304688] +(RayTrainWorker pid=234, ip=10.0.15.42) [Epoch 0 | GPU0: Process rank 1 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.65453052520752] +(RayTrainWorker pid=227, ip=10.0.2.255) [Epoch 1 | GPU0: Process rank 0 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.172431230545044] +(RayTrainWorker pid=234, ip=10.0.15.42) [Epoch 1 | GPU0: Process rank 1 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.17476797103882] +(RayTrainWorker pid=227, ip=10.0.2.255) [Epoch 2 | GPU0: Process rank 0 | Batchsize: 128 | Steps: 235 | Total epoch time: 16.807305574417114] +(RayTrainWorker pid=234, ip=10.0.15.42) [Epoch 2 | GPU0: Process rank 1 | Batchsize: 128 | Steps: 235 | Total epoch time: 16.807661056518555] +(RayTrainWorker pid=227, ip=10.0.2.255) [Epoch 3 | GPU0: Process rank 0 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.16184115409851] +(RayTrainWorker pid=234, ip=10.0.15.42) [Epoch 3 | GPU0: Process rank 1 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.164414882659912] +(RayTrainWorker pid=227, ip=10.0.2.255) [Epoch 4 | GPU0: Process rank 0 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.43423628807068] +(RayTrainWorker pid=234, ip=10.0.15.42) [Epoch 4 | GPU0: Process rank 1 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.430140495300293] +(RayTrainWorker pid=234, ip=10.0.15.42) [Epoch 5 | GPU0: Process rank 1 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.319995880126953] +(RayTrainWorker pid=227, ip=10.0.2.255) [Epoch 5 | GPU0: Process rank 0 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.331279277801514] +(RayTrainWorker pid=234, ip=10.0.15.42) [Epoch 6 | GPU0: Process rank 1 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.402108669281006] +(RayTrainWorker pid=227, ip=10.0.2.255) [Epoch 6 | GPU0: Process rank 0 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.385886192321777] +(RayTrainWorker pid=234, ip=10.0.15.42) [Epoch 7 | GPU0: Process rank 1 | Batchsize: 128 | Steps: 235 | Total epoch time: 16.865890741348267] +(RayTrainWorker pid=227, ip=10.0.2.255) [Epoch 7 | GPU0: Process rank 0 | Batchsize: 128 | Steps: 235 | Total epoch time: 16.86034846305847] +(RayTrainWorker pid=234, ip=10.0.15.42) [Epoch 8 | GPU0: Process rank 1 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.0880389213562] +(RayTrainWorker pid=227, ip=10.0.2.255) [Epoch 8 | GPU0: Process rank 0 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.094018697738647] +(RayTrainWorker pid=234, ip=10.0.15.42) [Epoch 9 | GPU0: Process rank 1 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.191094160079956] +(RayTrainWorker pid=227, ip=10.0.2.255) [Epoch 9 | GPU0: Process rank 0 | Batchsize: 128 | Steps: 235 | Total epoch time: 17.189364910125732] + +(..) +╭───────────────────────────────╮ +│ Training result │ +├───────────────────────────────┤ +│ checkpoint_dir_name │ +│ time_this_iter_s 182.976 │ +│ time_total_s 182.976 │ +│ training_iteration 1 │ +│ accuracy 0.8852 │ +│ loss 0.41928 │ +╰───────────────────────────────╯ + +(...) Total running time: 3min 7s + +Result( + metrics={'loss': 0.4192830347106792, 'accuracy': 0.8852}, + path='dt-results-EXAMPLE/ecs_dt_results/TorchTrainer_d1824_00000_0_(...)', + filesystem='s3', + checkpoint=None +) +``` + +## Clean up + +1. Destroy this blueprint + +```shell +terraform destroy +``` + +1. Destroy core-infra resources + +```shell +cd ../core-infra +terraform destroy + +``` + +## Support + +Please open an issue for questions or unexpected behavior diff --git a/terraform/ec2-examples/distributed-ml-training/main.tf b/terraform/ec2-examples/distributed-ml-training/main.tf new file mode 100644 index 00000000..db34935f --- /dev/null +++ b/terraform/ec2-examples/distributed-ml-training/main.tf @@ -0,0 +1,468 @@ +provider "aws" { + region = local.region +} + +data "aws_caller_identity" "current" {} + +locals { + name = "ecs-demo-distributed-ml-training" + region = "us-west-2" + instance_type_workers = "g5.xlarge" + instance_type_head = "m5.xlarge" + ray_head_container_image = "docker.io/rayproject/ray-ml:2.7.1.artur.c9f4c6-py38" + ray_worker_container_image = "docker.io/rayproject/ray-ml:2.7.1.artur.c9f4c6-py38-gpu" + + user_data_head = <<-EOT + #!/bin/bash + cat <<'EOF' >> /etc/ecs/ecs.config + ECS_CLUSTER=${local.name} + EOF + EOT + + user_data_workers = <<-EOT + #!/bin/bash + cat <<'EOF' >> /etc/ecs/ecs.config + ECS_CLUSTER=${local.name} + EOF + echo "ip_resolve=4" >> /etc/yum.conf + EOT + + tags = { + Blueprint = local.name + GithubRepo = "github.com/aws-ia/ecs-blueprints" + } +} + +################################################################################ +# ECS Blueprint +################################################################################ + +module "ecs_cluster" { + source = "terraform-aws-modules/ecs/aws//modules/cluster" + version = "~> 5.0" + + cluster_name = local.name + # Capacity provider - autoscaling groups + default_capacity_provider_use_fargate = false + autoscaling_capacity_providers = { + distributed_ml_training_head = { + auto_scaling_group_arn = module.autoscaling_head.autoscaling_group_arn + + managed_scaling = { + maximum_scaling_step_size = 1 + minimum_scaling_step_size = 1 + status = "ENABLED" + target_capacity = 60 + } + + default_capacity_provider_strategy = { + weight = 1 + base = 1 + } + }, + distributed_ml_training_workers = { + auto_scaling_group_arn = module.autoscaling_workers.autoscaling_group_arn + managed_scaling = { + maximum_scaling_step_size = 1 + minimum_scaling_step_size = 1 + _scaling_step_size = 1 + status = "ENABLED" + target_capacity = 60 + } + }, + } + + # Shared task execution role + create_task_exec_iam_role = false + tags = local.tags +} + +resource "aws_service_discovery_service" "this" { + name = "head" + dns_config { + namespace_id = data.aws_service_discovery_dns_namespace.core_infra.id + dns_records { + ttl = 300 + type = "A" + } + routing_policy = "MULTIVALUE" + } +} + + +resource "aws_placement_group" "workers" { + name = "ml-training" + strategy = "cluster" +} + +module "autoscaling_head" { + source = "terraform-aws-modules/autoscaling/aws" + version = "~> 6.5" + + name = "${local.name}-head" + + image_id = jsondecode(data.aws_ssm_parameter.ecs_optimized_ami.value)["image_id"] + instance_type = local.instance_type_head + + security_groups = [module.autoscaling_sg.security_group_id] + user_data = base64encode(local.user_data_head) + ignore_desired_capacity_changes = true + + create_iam_instance_profile = true + iam_role_name = local.name + iam_role_description = "ECS role for ${local.name}" + iam_role_policies = { + AmazonEC2ContainerServiceforEC2Role = "arn:aws:iam::aws:policy/service-role/AmazonEC2ContainerServiceforEC2Role" + AmazonSSMManagedEC2InstanceDefaultPolicy = "arn:aws:iam::aws:policy/AmazonSSMManagedEC2InstanceDefaultPolicy" + } + + vpc_zone_identifier = data.aws_subnets.private.ids + health_check_type = "EC2" + min_size = 1 + max_size = 1 + desired_capacity = 1 + + # https://github.com/hashicorp/terraform-provider-aws/issues/12582 + autoscaling_group_tags = { + AmazonECSManaged = true + } + + tags = local.tags +} + +module "autoscaling_workers" { + source = "terraform-aws-modules/autoscaling/aws" + version = "~> 6.5" + + name = "${local.name}-workers" + + placement_group = aws_placement_group.workers.name + image_id = jsondecode(data.aws_ssm_parameter.ecs_gpu_optimized_ami.value)["image_id"] + instance_type = local.instance_type_workers + + security_groups = [module.autoscaling_sg.security_group_id] + user_data = base64encode(local.user_data_workers) + ignore_desired_capacity_changes = true + + create_iam_instance_profile = true + iam_role_name = local.name + iam_role_description = "ECS role for ${local.name}" + iam_role_policies = { + AmazonEC2ContainerServiceforEC2Role = "arn:aws:iam::aws:policy/service-role/AmazonEC2ContainerServiceforEC2Role", + AmazonSSMManagedEC2InstanceDefaultPolicy = "arn:aws:iam::aws:policy/AmazonSSMManagedEC2InstanceDefaultPolicy" + } + + vpc_zone_identifier = data.aws_subnets.private.ids + health_check_type = "EC2" + min_size = 2 + max_size = 2 + desired_capacity = 2 + + # https://github.com/hashicorp/terraform-provider-aws/issues/12582 + autoscaling_group_tags = { + AmazonECSManaged = true + } + block_device_mappings = [ + { + # Root volume + device_name = "/dev/xvda" + no_device = 0 + ebs = { + delete_on_termination = true + encrypted = false + volume_size = 50 + volume_type = "gp2" + } + } + ] + + tags = local.tags +} + +module "autoscaling_sg" { + source = "terraform-aws-modules/security-group/aws" + version = "~> 4.0" + + name = local.name + description = "Autoscaling group security group" + vpc_id = data.aws_vpc.core_infra.id + + ingress_with_cidr_blocks = [ + { + from_port = -1 + to_port = -1 + protocol = -1 + description = "Allow all from VPC CIDR block" + cidr_blocks = data.aws_vpc.core_infra.cidr_block + }, + ] + + egress_rules = ["all-all"] + + tags = local.tags +} + + +module "ecs_service_head" { + source = "terraform-aws-modules/ecs/aws//modules/service" + version = "~> 5.0" + + name = "distributed_ml_training_head_service" + desired_count = 1 + cluster_arn = module.ecs_cluster.arn + enable_autoscaling = false + memory = 10240 + cpu = 3072 + # Task Definition + + requires_compatibilities = ["EC2"] + capacity_provider_strategy = { + default = { + capacity_provider = "distributed_ml_training_head" # needs to match name of capacity provider + weight = 1 + base = 1 + } + } + + task_exec_iam_role_arn = aws_iam_role.task_execution_role.arn + tasks_iam_role_name = "taskRole" + tasks_iam_role_description = "Task role for ${local.name}" + tasks_iam_role_policies = { + ReadOnlyAccess = "arn:aws:iam::aws:policy/ReadOnlyAccess" + } + tasks_iam_role_statements = [ + { + actions = ["s3:*"] + resources = [ + "arn:aws:s3:::${aws_s3_bucket.results.bucket}", + "arn:aws:s3:::${aws_s3_bucket.results.bucket}/*" + ] + } + ] + create_task_exec_iam_role = false + enable_execute_command = false + deployment_minimum_healthy_percent = 0 + container_definitions = { + + ray_head = { + readonly_root_filesystem = false + image = local.ray_head_container_image + user = 1000 + cpu = 3072 + memory = 10240 + memory_reservation = 10240 + command = ["/bin/bash", "-lc", "--", "ulimit -n 65536; ray start --head --dashboard-host=0.0.0.0 --metrics-export-port=8080 --num-cpus=0 --memory=10737418240 --block"] + linux_parameters = { + sharedMemorySize = 20480 + } + mount_points = [{ + sourceVolume = "tmp" + containerPath = "/tmp" + readOnly = false + }] + } + } + volume = { + "tmp" = { + dockerVolumeConfiguration = { + scope = "shared", + driver = "local", + autoprovision = true + } + } + } + + service_registries = { + registry_arn = aws_service_discovery_service.this.arn + } + + network_mode = "awsvpc" + subnet_ids = data.aws_subnets.private.ids + security_group_rules = { + ingress_private_ips = { + type = "ingress" + from_port = 0 + to_port = 0 + protocol = "-1" + cidr_blocks = ["10.0.0.0/8"] + } + egress_all = { + type = "egress" + from_port = 0 + to_port = 0 + protocol = "-1" + cidr_blocks = ["0.0.0.0/0"] + } + } + + tags = local.tags +} + + +module "ecs_service_workers" { + source = "terraform-aws-modules/ecs/aws//modules/service" + version = "~> 5.0" + deployment_minimum_healthy_percent = 0 + name = "distributed_ml_training_worker_service" + desired_count = 2 + cluster_arn = module.ecs_cluster.arn + enable_autoscaling = false + memory = 15360 + cpu = 3072 + # Task Definition + + requires_compatibilities = ["EC2"] + capacity_provider_strategy = { + default = { + capacity_provider = "distributed_ml_training_workers" # needs to match name of capacity provider + weight = 1 + base = 1 + } + } + + task_exec_iam_role_arn = aws_iam_role.task_execution_role.arn + tasks_iam_role_name = "taskRole" + tasks_iam_role_description = "Task role for ${local.name}" + tasks_iam_role_policies = { + ReadOnlyAccess = "arn:aws:iam::aws:policy/ReadOnlyAccess" + } + tasks_iam_role_statements = [ + { + actions = ["s3:*"] + resources = [ + "arn:aws:s3:::${aws_s3_bucket.results.bucket}", + "arn:aws:s3:::${aws_s3_bucket.results.bucket}/*" + ] + } + ] + + create_task_exec_iam_role = false + enable_execute_command = false + + container_definitions = { + ray_work = { + readonly_root_filesystem = false + image = local.ray_worker_container_image + user = 1000 + cpu = 3072 + memory = 15360 + memory_reservation = 15360 + command = ["/bin/bash", "-lc", "--", "ulimit -n 65536; ray start --block --num-cpus=3 --num-gpus=1 --address=head.default.core-infra.local:6379 --metrics-export-port=8080 --memory=15032385536"] + linux_parameters = { + sharedMemorySize = 10240 + } + resource_requirements = [{ + type = "GPU" + value = 1 + }] + mount_points = [{ + sourceVolume = "tmp" + containerPath = "/tmp" + readOnly = false + }] + } + } + volume = { + "tmp" = { + dockerVolumeConfiguration = { + scope = "shared", + driver = "local", + autoprovision = true + } + } + } + + network_mode = "awsvpc" + subnet_ids = data.aws_subnets.private.ids + security_group_rules = { + ingress_private_ips = { + type = "ingress" + from_port = 0 + to_port = 0 + protocol = "-1" + cidr_blocks = ["10.0.0.0/8"] + } + egress_all = { + type = "egress" + from_port = 0 + to_port = 0 + protocol = "-1" + cidr_blocks = ["0.0.0.0/0"] + } + } + tags = local.tags +} + +resource "random_id" "bucket_name" { + byte_length = 8 +} + +resource "aws_s3_bucket" "results" { + bucket = "dt-results-${random_id.bucket_name.hex}" + tags = local.tags + force_destroy = true +} + +resource "aws_iam_role" "task_execution_role" { + name = "distributed_training_task_execution_role" + + # Terraform's "jsonencode" function converts a + # Terraform expression result to valid JSON syntax. + assume_role_policy = jsonencode({ + "Version" : "2012-10-17", + "Statement" : [ + { + "Sid" : "ECSTasksAssumeRole", + "Effect" : "Allow", + "Principal" : { + "Service" : "ecs-tasks.amazonaws.com" + }, + "Action" : "sts:AssumeRole", + "Condition" : { + "StringEquals" : { + "aws:SourceAccount" : data.aws_caller_identity.current.account_id + }, + "ArnLike" : { + "aws:SourceArn" : "arn:aws:ecs:${local.region}:${data.aws_caller_identity.current.account_id}:*" + } + } + } + ] + }) + managed_policy_arns = [ + "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy", + ] + tags = local.tags +} + +################################################################################ +# Supporting Resources +################################################################################ + +# https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ecs-optimized_AMI.html#ecs-optimized-ami-linux +data "aws_ssm_parameter" "ecs_optimized_ami" { + name = "/aws/service/ecs/optimized-ami/amazon-linux-2/recommended" +} + +data "aws_ssm_parameter" "ecs_gpu_optimized_ami" { + name = "/aws/service/ecs/optimized-ami/amazon-linux-2/gpu/recommended" +} + +data "aws_subnets" "private" { + filter { + name = "tag:Name" + values = ["core-infra-private-${local.region}a"] + } +} + +data "aws_vpc" "core_infra" { + filter { + name = "tag:Name" + values = ["core-infra"] + } +} + +data "aws_service_discovery_dns_namespace" "core_infra" { + name = "default.core-infra.local" + type = "DNS_PRIVATE" +} diff --git a/terraform/ec2-examples/distributed-ml-training/outputs.tf b/terraform/ec2-examples/distributed-ml-training/outputs.tf new file mode 100644 index 00000000..b0889634 --- /dev/null +++ b/terraform/ec2-examples/distributed-ml-training/outputs.tf @@ -0,0 +1,17 @@ +################################################################################ +# Cluster +################################################################################ + +output "cluster_arn" { + description = "ARN that identifies the cluster" + value = module.ecs_cluster.arn +} + +################################################################################ +# S3 +################################################################################ + +output "s3_bucket" { + description = "Bucket name for results" + value = aws_s3_bucket.results.id +} diff --git a/terraform/ec2-examples/distributed-ml-training/training_example.py b/terraform/ec2-examples/distributed-ml-training/training_example.py new file mode 100644 index 00000000..c9f435bd --- /dev/null +++ b/terraform/ec2-examples/distributed-ml-training/training_example.py @@ -0,0 +1,110 @@ + +# import required torch and ray libraries +import torch +from torchvision.models import resnet18 +from torchvision.datasets import FashionMNIST +from torchvision.transforms import ToTensor, Normalize, Compose +from torch.utils.data import DataLoader +from torch.optim import Adam +from torch.nn import CrossEntropyLoss +from ray.train.torch import TorchTrainer +from ray.train import ScalingConfig, RunConfig +from filelock import FileLock +import ray +import time +import argparse + +# Get arguments + +parser = argparse.ArgumentParser() +parser.add_argument("bucket_name", help="Bucket to publish results.", type=str) +args = parser.parse_args() + + +# Connect to the Ray cluster +ray.init() + +# Download the data in the shared storage +transform = Compose([ToTensor(), Normalize((0.5,), (0.5,))]) +train_data = FashionMNIST(root='./data', + train=True, download=True, + transform=transform) + +# Define the training function that the distributed processes will run +def train_func(config): + import os + # The NVIDIA Collective Communications Library (NCCL) implements multi-GPU + # and multi-node communication primitives optimized for NVIDIA GPUs. + # Since containers can have multiple interfaces, we explicitly set which one + # NCCL should use. + os.environ['NCCL_SOCKET_IFNAME']='eth0' + #os.environ['NCCL_DEBUG']='INFO' Uncomment this line if you want to debug NCCL + # Set up the model + model = resnet18(num_classes=10) + model.conv1 = torch.nn.Conv2d(1, 64, kernel_size=(7, 7), + stride=(2, 2), + padding=(3, 3), + bias=False) + # Prepare model for distributed training + model = ray.train.torch.prepare_model(model) + # Setup loss and optimizer + criterion = CrossEntropyLoss() + optimizer = Adam(model.parameters(), lr=0.001) + # Retrieve the data from the shared storage. + transform = Compose([ToTensor(), Normalize((0.5,), (0.5,))]) + with FileLock(os.path.expanduser("./data.lock")): + train_data = FashionMNIST(root='./data', train=True, download=True, transform=transform) + # Download test data from open datasets + test_data = FashionMNIST(root="./data",train=False,download=True,transform=transform) + batch_size=128 + train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True) + test_loader = DataLoader(test_data, batch_size=batch_size) + # Prepare dataloader for distributed training + train_loader = ray.train.torch.prepare_data_loader(train_loader) + test_loader = ray.train.torch.prepare_data_loader(test_loader) + # Define training loop + for epoch in range(10): + start = time.time() + model.train() + for images, labels in train_loader: + outputs = model(images) + loss = criterion(outputs, labels) + optimizer.zero_grad() + loss.backward() + optimizer.step() + print(f"[Epoch {epoch} | GPU{torch.cuda.current_device()}: Process rank {torch.distributed.get_rank()} | Batchsize: {128} | Steps: {len(train_loader)} | Total epoch time: {time.time()-start}]") + model.eval() + test_loss, num_correct, num_total = 0, 0, 0 + + # Calculate loss and accuaricy in the 10th epoch + # you might want to do this in each epoch to detect overfitting as early as possible + if epoch == 9: + with torch.no_grad(): + for images, labels in test_loader: + prediction = model(images) + loss = criterion(prediction, labels) + test_loss += loss.item() + num_total += labels.shape[0] + num_correct += (prediction.argmax(1) == labels).sum().item() + + test_loss /= len(test_loader) + accuracy = num_correct / num_total + # Report metrics and checkpoint to Ray. + ray.train.report(metrics={"loss": test_loss, "accuracy": accuracy}) + +# The scaling config defines how many worker processes to use for the training. Usually equals to the number of GPUs +scaling_config = ScalingConfig(num_workers=2, use_gpu=True) + +# Create the trainer instance +trainer = TorchTrainer(train_func, + scaling_config=scaling_config, + run_config=RunConfig( + storage_path=f"s3://{args.bucket_name}/", + name="ecs_dt_results") + ) + +# Run the training +result = trainer.fit() + +# Print the results of the training +print(result) diff --git a/terraform/ec2-examples/distributed-ml-training/versions.tf b/terraform/ec2-examples/distributed-ml-training/versions.tf new file mode 100644 index 00000000..c60e8f58 --- /dev/null +++ b/terraform/ec2-examples/distributed-ml-training/versions.tf @@ -0,0 +1,14 @@ +terraform { + required_version = ">= 1.0" + + required_providers { + aws = { + source = "hashicorp/aws" + version = ">= 5.0" + } + random = { + source = "hashicorp/random" + version = ">= 3.6.0" + } + } +}