diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md new file mode 100644 index 0000000..91afc97 --- /dev/null +++ b/.github/pull_request_template.md @@ -0,0 +1,27 @@ +## Ticket + +[JIR-123](https://intterragroup.atlassian.net/browse/JIR-123) + +## Background + +What's the intent of this PR? What problem does it solve? + +What's the scope of the work? + +## Code Review + +### Points of Focus + +What should reviewers pay particular attention to (new patterns implemented, questions about how something should work, other parts of the app impacted, etc.)? + +### Test Plan + +Describe how reviewers should know your code works as expected. + +1. Do this. +2. Look at that. +3. Done! + +## Optional Fun + +Add a GIF indicating how the PR makes you feel diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 0000000..0b28453 --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,36 @@ +name: Builds + +on: + push: + tags: + - "v*" + +jobs: + build: + runs-on: windows-latest + + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v3 + with: + python-version: 3.x + + - name: Install dependencies + run: | + python -m pip install --upgrade pip setuptools pyinstaller + python -m pip install . + + - name: Build executable + run: | + pyinstaller --onefile main.py -n airborne-dsa + + - name: Upload executable + uses: svenstaro/upload-release-action@2.7.0 + with: + repo_token: ${{ secrets.GITHUB_TOKEN }} + file: dist/airborne-dsa.exe + asset_name: airborne-dsa-windows-amd64.exe + tag: ${{ github.ref }} diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..b47b0cc --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,28 @@ +name: Tests + +on: + pull_request: + branches: + - main + +jobs: + test: + name: Run Tests + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v3 + with: + python-version: 3.x + + - name: Install dependencies + run: | + python3 -m pip install --upgrade pip + pip3 install . + + - name: Run tests + run: python3 -m unittest diff --git a/.gitignore b/.gitignore index 1934270..95f9cc6 100644 --- a/.gitignore +++ b/.gitignore @@ -137,4 +137,8 @@ dmypy.json # Cython debug symbols cython_debug/ -*.txt \ No newline at end of file +*.txt + +# Custom +missions +config.json \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json index cc67606..e0009a2 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,4 +1,3 @@ { - "python.linting.pylintEnabled": true, - "python.linting.enabled": true + "python.analysis.typeCheckingMode": "basic" } \ No newline at end of file diff --git a/README.md b/README.md index ea377ac..32e3923 100644 --- a/README.md +++ b/README.md @@ -1,14 +1,31 @@ # airborne-dsa-lite -Data shipping script for airborne products + +Data shipping script for airborne products + +## Local Development + +### Prerequisites + +- Python 3 installed +- Python 3 venv installed +- `python3 -m venv env` +- `source env/bin/activate` +- `python3 -m pip install --upgrade pip setuptools pyinstaller` + +### Running locally ## Getting started -* Make sure you have python 3 installed -* `pip install awscli boto3` -* `aws configure` -* Set the correct bucket name in `settings.py` -Run: `python main.py` or `intterra-airborne-dsa.bat` +- `source env/bin/activate` +- `python3 -m pip install -e .` +- `python3 main.py` + +For running locally, add "storageMode": "local" to config.json + +## Generating build -Drop KMLs or tifs (*.tif and *.tiff) into this directory +- `pyinstaller main.py --onefile -n airborne-dsa` +## Running unit tests +- `python3 -m unittest` diff --git a/intterra-airborne-dsa.bat b/intterra-airborne-dsa.bat deleted file mode 100644 index 134732e..0000000 --- a/intterra-airborne-dsa.bat +++ /dev/null @@ -1,2 +0,0 @@ -python main.py -pause \ No newline at end of file diff --git a/main.py b/main.py index f1d6737..dec5e97 100644 --- a/main.py +++ b/main.py @@ -1,146 +1,225 @@ -#!/usr/bin/env python3 +"""Main file""" -# README -# pip install awscli -# aws configure - -import boto3 -from datetime import datetime +from datetime import datetime, timezone +from pathlib import Path +import re import sys -import glob import os import time -import re -import settings -from signal import signal, SIGINT -s3 = boto3.client('s3') # TODO: pass keys from json config (optionally) - - -def run(): - # init - bucket = settings.BUCKET # TODO: Convert to persistant .json config - now = datetime.utcnow() - dir_path = os.path.dirname(os.path.realpath(__file__)) - welcome() - - # Get current working directory (CWD) and mission name from args - mission_name, mission_timestamp, mission_file_path = find_mission() - if mission_name == None: - mission_name = get_mission_from_input() - create_mission(mission_name, bucket) - else: - answer = '' - while answer != 'yes' and answer != 'no': +from typing import Tuple +from watchdog.observers import Observer +from models.product import Product +from services.config_manager import ConfigManager + +from services.file_watcher import FileWatcher +from services.local_file_manager import LocalFileManager +from services.s3_file_manager import S3FileManager + +# from airborne_dsa.config_manager import ConfigManager + +# If running from executable file, path is determined differently +root_directory = os.path.dirname( + os.path.realpath(sys.executable) + if getattr(sys, "frozen", False) + else os.path.realpath(__file__) +) + + +def get_mission_details() -> Tuple[str, datetime]: + """Get mission name and time from input""" + + RESET = "\033[0m" # Reset all formatting + GREEN = "\033[92m" # Green text + + print(f"{GREEN}Enter Mission Name:{RESET}") + # Replace special characters in input with a dash + mission_name = re.sub(r"[^a-zA-Z0-9\s-]", "-", input().replace(" ", "-")) + print() + print(f"{GREEN}Enter local time (format: YYYY-MM-DD HH:MM) [default now]:{RESET}") + try: + mission_time = ( + datetime.now() + .replace(second=0) + .replace(microsecond=0) + .astimezone(timezone.utc) + .replace(tzinfo=None) + ) + + # If input provided, use that instead of current time + if mission_time_input := input(): + mission_time = ( + datetime.strptime(mission_time_input, "%Y-%m-%d %H:%M") + .astimezone(timezone.utc) + .replace(tzinfo=None) + ) + + except ValueError: + print("Invalid datetime provided") + sys.exit(1) + print() + + return mission_name, mission_time + + +def mkdir_ignore_file_exist(file_path: str) -> None: + """Creates a directory using a file path and ignores FileExistsError""" + try: + Path(file_path).mkdir() + except FileExistsError: + pass + + +def create_mission_scaffolding(mission_name: str, mission_time: datetime) -> str: + """Create mission folder scaffolding for upload. Returns the mission base path""" + + mkdir_ignore_file_exist(f"{root_directory}/missions") + mission_base_path = f"{root_directory}/missions/{mission_time.isoformat()[:-3].replace(':', '')}_{mission_name}" + mkdir_ignore_file_exist(mission_base_path) + + mkdir_ignore_file_exist(f"{mission_base_path}/images") + mkdir_ignore_file_exist(f"{mission_base_path}/images/EO") + mkdir_ignore_file_exist(f"{mission_base_path}/images/HS") + mkdir_ignore_file_exist(f"{mission_base_path}/images/IR") + + mkdir_ignore_file_exist(f"{mission_base_path}/tactical") + mkdir_ignore_file_exist(f"{mission_base_path}/tactical/Detection") + mkdir_ignore_file_exist(f"{mission_base_path}/tactical/DPS") + mkdir_ignore_file_exist(f"{mission_base_path}/tactical/HeatPerim") + mkdir_ignore_file_exist(f"{mission_base_path}/tactical/IntenseHeat") + mkdir_ignore_file_exist(f"{mission_base_path}/tactical/IsolatedHeat") + mkdir_ignore_file_exist(f"{mission_base_path}/tactical/ScatteredHeat") + + mkdir_ignore_file_exist(f"{mission_base_path}/videos") + + return mission_base_path + + +def create_product_from_file_path(file_path: str) -> Product: + """Takes in a file path and returns a Product""" + + product = None + last_modified_on = datetime.fromtimestamp(os.path.getmtime(file_path)).astimezone( + timezone.utc + ) + + if "images" in file_path: + if "EO" in file_path: + product = Product("image", "EO", last_modified_on) + if "HS" in file_path: + product = Product("image", "HS", last_modified_on) + if "IR" in file_path: + product = Product("image", "IR", last_modified_on) + elif "tactical" in file_path: + if "Detection" in file_path: + product = Product("tactical", "Detection", last_modified_on) + if "DPS" in file_path: + product = Product("tactical", "DPS", last_modified_on) + if "HeatPerim" in file_path: + product = Product("tactical", "HeatPerim", last_modified_on) + if "IntenseHeat" in file_path: + product = Product("tactical", "IntenseHeat", last_modified_on) + if "IsolatedHeat" in file_path: + product = Product("tactical", "IsolatedHeat", last_modified_on) + if "ScatteredHeat" in file_path: + product = Product("tactical", "ScatteredHeat", last_modified_on) + elif "videos" in file_path: + product = Product("video", None, last_modified_on) + + if product is None: + raise ValueError(f"Failed to map product: {os.path.basename(file_path)}") + + return product + + +def get_product_s3_key(mission_name: str, product: Product, file_extension: str) -> str: + folder = None + product_subtype = None + + if product.type == "image": + folder = "IMAGERY" + + if product.subtype == "EO": + product_subtype = "EOimage" + elif product.subtype == "HS": + product_subtype = "HSimage" + elif product.subtype == "IR": + product_subtype = "IRimage" + + elif product.type == "tactical": + folder = "TACTICAL" + product_subtype = product.subtype + elif product.type == "video": + folder = "VIDEO" + product_subtype = "Video" + + return f"{folder}/{product.timestamp.strftime('%Y%m%d_%H%M%SZ')}_{mission_name}_{product_subtype}{file_extension}" + + +def main() -> None: + """Entry point""" + + # Setup + config = ConfigManager("config.json") + file_manager = ( + S3FileManager( + config.aws_access_key_id, config.aws_secret_access_key, config.bucket + ) + if config.storage_mode == "remote" + else LocalFileManager() + ) + + mission_name, mission_time = get_mission_details() + # Create mission file + try: + file_manager.upload_empty_file( + f"MISSION/{mission_name}_{mission_time.strftime('%Y%m%d_%H%M')}Z.txt" + ) + print(f"Created mission: {mission_name}") + except Exception as error: + print(f"Failed to create mission: {str(error)}") + sys.exit(1) + + mission_base_path = create_mission_scaffolding(mission_name, mission_time) + + # Handle new files + def upload_product(file_path: str) -> None: + try: + product = create_product_from_file_path(file_path) + key = get_product_s3_key( + mission_name, product, os.path.splitext(file_path)[1] + ) + + print(f"Uploading {os.path.basename(file_path)}") + file_manager.upload_file(file_path, key) print( - f'Use current mission (yes/no): "{mission_name}" from {mission_timestamp.ctime()}?') - answer = sys.stdin.readline().rstrip('\n').rstrip('\n') - if answer == 'no': - mission_name = get_mission_from_input() - os.remove(mission_file_path) - create_mission(mission_name, bucket) - - # listen for files - print( - f'Listening for files in mission "{mission_name}" at {now.ctime()} UTC in bucket "{bucket}"...') - print('(CTRL + C to exit)') - while True: - upload_kmls(mission_name, dir_path, bucket) - upload_tifs(mission_name, dir_path, bucket) - time.sleep(15) - - -def get_mission_from_input(): - name = '' - while not re.match(r'^[0-9a-z]+$', name, re.IGNORECASE): - print('Please enter mission name (alphanumeric): ') - name = sys.stdin.readline().rstrip('\n').rstrip('\n') - return name - - -def create_mission(mission_name, bucket): - # create mission - now = datetime.utcnow() - dir_path = os.path.dirname(os.path.realpath(__file__)) - mission_file_name = f'{mission_name}_{now.strftime("%Y%m%d")}_{now.strftime("%H%M")}Z.txt' - mission_path = f'{dir_path}/{mission_file_name}' - with open(mission_path, 'w+') as f: - f.write('') - s3.upload_file(mission_path, bucket, f'MISSION/{mission_file_name}') - time.sleep(1) - - -def find_mission(): - dir_path = os.path.dirname(os.path.realpath(__file__)) - for file_path in glob.iglob(f'{dir_path}/*.txt', recursive=False): - file_name = os.path.basename(file_path) - m = re.match( - r'([a-z0-9]+)_([0-9]{8})_([0-9]{4})z\.txt', file_name, re.IGNORECASE) - if m != None: - mission_timestamp = datetime.strptime( - f'{m.group(2)}{m.group(3)}', '%Y%m%d%H%M') - return (m.group(1), mission_timestamp, file_name) - return None, None, None - - -def upload_kmls(mission_name, path, bucket): - # find KMLs (eg: 20200831_193612Z_Crawl1_IntenseHeat.kml) - for file_path in glob.iglob(f'{path}/*.kml', recursive=False): - - now = datetime.utcnow() - new_obj = f'TACTICAL/{now.strftime("%Y%m%d")}_{now.strftime("%H%M%S")}Z_{mission_name}_IntenseHeat.kml' - print(f'Uploading {file_path} to {new_obj}...') - - # Strip points with regex - with open(file_path, 'r+') as f: - contents = f.read() - with open(file_path, 'w') as f: - modified_contents = re.sub( - r'[-.\n\t<>a-z0-9,\/]+<\/Point>', '', contents, flags=re.MULTILINE) - f.write(modified_contents) - - s3.upload_file(file_path, bucket, new_obj) - os.remove(file_path) - print('done!') - time.sleep(1) - - -def upload_tifs(mission_name, path, bucket): - # find IRs (eg: 20200818_031612Z_Crawl1_IRimage.tif) - for file_path in glob.iglob(f'{path}/*.tif', recursive=False): - now = datetime.utcnow() - new_obj = f'IMAGERY/{now.strftime("%Y%m%d")}_{now.strftime("%H%M%S")}Z_{mission_name}_IRimage.tif' - print(f'Uploading {file_path} to {new_obj}...') - s3.upload_file(file_path, bucket, new_obj) - os.remove(file_path) - print('done!') - time.sleep(1) - - for file_path in glob.iglob(f'{path}/*.tiff', recursive=False): - now = datetime.utcnow() - new_obj = f'IMAGERY/{now.strftime("%Y%m%d")}_{now.strftime("%H%M%S")}Z_{mission_name}_IRimage.tif' - print(f'Uploading {file_path} to {new_obj}...') - s3.upload_file(file_path, bucket, new_obj) - os.remove(file_path) - print('done!') - time.sleep(1) - - -def welcome(): - print(" _____ .__ ___. ________ _________ _____ ") - print(" / _ \\ |__|_____\\_ |__ ___________ ____ ____ \\______ \\ / _____/ / _ \\ ") - print(" / /_\\ \\| \\_ __ \\ __ \\ / _ \\_ __ \\/ \\_/ __ \\ | | \\ \\_____ \\ / /_\\ \\ ") - print("/ | \\ || | \\/ \\_\\ ( <_> ) | \\/ | \\ ___/ | ` \\/ \\/ | \\") - print("\\____|__ /__||__| |___ /\\____/|__| |___| /\\___ > /_______ /_______ /\\____|__ /") - print(" \\/ \\/ \\/ \\/ \\/ \\/ \\/ ") - print("Airborne API Data Shipping App powered by Intterra") - print('\n') - - -def handler(signal_received, frame): - print('Goodbye!') - sys.exit(0) + f"Successfully uploaded {os.path.basename(file_path)} as {key} to {config.bucket}" + ) + + except Exception as error: + print(error) + + # Scan mission folder for new files + file_watcher = FileWatcher(upload_product) + observer = Observer() + observer.schedule(file_watcher, mission_base_path, recursive=True) + observer.start() + + print(f"Watching for new files in ${mission_base_path}") + print() + + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + observer.stop() + observer.join() if __name__ == "__main__": - signal(SIGINT, handler) - run() + try: + main() + except Exception as error: + log_file = open("ERROR.txt", "w") + log_file.write(str(error)) + log_file.close() + print(error) diff --git a/models/__init__.py b/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/models/product.py b/models/product.py new file mode 100644 index 0000000..17b0e8b --- /dev/null +++ b/models/product.py @@ -0,0 +1,51 @@ +from datetime import datetime + + +class Product: + def __init__(self, type: str, subtype: str | None, timestamp: datetime) -> None: + if type not in ["image", "tactical", "video"]: + raise ValueError(f"Invalid product type: {type}]") + self._type = type + + if type == "image": + if subtype not in ["EO", "HS", "IR"]: + raise ValueError(f"Invalid image product subtype: {subtype}]") + elif type == "tactical": + if subtype not in [ + "Detection", + "DPS", + "HeatPerimeter", + "IntenseHeat", + "IsolatedHeat", + "ScatteredHeat", + ]: + raise ValueError(f"Invalid tactical product subtype: {subtype}]") + self._subtype = subtype + + self._timestamp = timestamp + + @property + def type(self) -> str: + """Product type""" + return self._type + + @property + def subtype(self) -> str | None: + """Product subtype""" + return self._subtype + + @property + def timestamp(self) -> datetime: + """Product timestamp""" + return self._timestamp + + def __str__(self) -> str: + return f"Product(type='{self._type}', subtype='{self._subtype}', timestamp='{self._timestamp}'])" + + def __eq__(self, __value: object) -> bool: + return ( + isinstance(__value, Product) + and self._type == __value.type + and self._subtype == __value.subtype + and self._timestamp == __value.timestamp + ) diff --git a/services/__init__.py b/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/services/config_manager.py b/services/config_manager.py new file mode 100644 index 0000000..8565e1a --- /dev/null +++ b/services/config_manager.py @@ -0,0 +1,50 @@ +"""Loads and validates config.json file against a config json schema""" +import locale +import json +from typing import Literal +import jsonschema + + +class ConfigManager: + """Loads and validates config.json file against a config json schema""" + + def __init__(self, file_path: str) -> None: + config_schema = { + "type": "object", + "properties": { + "bucket": {"type": "string"}, + "awsAccessKeyId": {"type": "string"}, + "awsSecretAccessKey": {"type": "string"}, + "storageMode": {"type": "string", "enum": ["local", "remote"]}, + }, + "required": ["bucket", "awsAccessKeyId", "awsSecretAccessKey"], + "additionalProperties": False, + } + + with open( + file_path, "r", encoding=locale.getpreferredencoding() + ) as config_file: + self.config = json.load(config_file) + jsonschema.validate(instance=self.config, schema=config_schema) + + @property + def bucket(self) -> str: + """The bucket to upload to""" + return self.config["bucket"] + + @property + def aws_access_key_id(self) -> str: + """Your AWS access key""" + return self.config["awsAccessKeyId"] + + @property + def aws_secret_access_key(self) -> str: + """Your AWS secret key""" + return self.config["awsSecretAccessKey"] + + @property + def storage_mode(self) -> Literal["local", "remote"]: + """Whether to use local or remote storage""" + return ( + "remote" if "storageMode" not in self.config else self.config["storageMode"] + ) diff --git a/services/file_watcher.py b/services/file_watcher.py new file mode 100644 index 0000000..0776a8a --- /dev/null +++ b/services/file_watcher.py @@ -0,0 +1,13 @@ +from typing import Callable +from watchdog.events import FileSystemEventHandler, FileSystemEvent + + +class FileWatcher(FileSystemEventHandler): + def __init__(self, callback: Callable[[str], None]): + self.callback = callback + + def on_created(self, event: FileSystemEvent): + if event.is_directory: + return + + self.callback(event.src_path) diff --git a/services/local_file_manager.py b/services/local_file_manager.py new file mode 100644 index 0000000..5415dcc --- /dev/null +++ b/services/local_file_manager.py @@ -0,0 +1,10 @@ +class LocalFileManager: + def __init__(self) -> None: + print("Using faux file manager. No files will be uploaded") + print() + + def upload_file(self, file_path: str, key: str): + pass + + def upload_empty_file(self, file_key: str) -> None: + pass diff --git a/services/s3_file_manager.py b/services/s3_file_manager.py new file mode 100644 index 0000000..bf6dfb1 --- /dev/null +++ b/services/s3_file_manager.py @@ -0,0 +1,20 @@ +import boto3 + + +class S3FileManager: + def __init__(self, aws_access_key_id: str, aws_secret_access_key: str, bucket: str): + self.aws_access_key_id = aws_access_key_id + self.aws_secret_access_key = aws_secret_access_key + self.bucket = bucket + + self.s3_client = boto3.client( + "s3", + aws_access_key_id=self.aws_access_key_id, + aws_secret_access_key=self.aws_secret_access_key, + ) + + def upload_file(self, file_path: str, s3_key: str): + self.s3_client.upload_file(file_path, self.bucket, s3_key) + + def upload_empty_file(self, file_key: str) -> None: + self.s3_client.put_object(Bucket=self.bucket, Key=file_key, Body="") diff --git a/settings.py b/settings.py deleted file mode 100644 index c1c1cfd..0000000 --- a/settings.py +++ /dev/null @@ -1 +0,0 @@ -BUCKET = "intterrademoinputs" diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..398a43f --- /dev/null +++ b/setup.py @@ -0,0 +1,15 @@ +from setuptools import find_packages, setup + +setup( + name="airborne_dsa", + version="1.0", + description="Data shipping app to support uploading files to Intterra airborne buckets", + author="Intterra", + author_email="devs@intterragroup.com", + packages=find_packages(), # same as name + install_requires=[ + "boto3==1.28.26", + "jsonschema==4.19.0", + "watchdog==3.0.0", + ], +) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/bad_config.json b/tests/bad_config.json new file mode 100644 index 0000000..2ff6ddf --- /dev/null +++ b/tests/bad_config.json @@ -0,0 +1,3 @@ +{ + "buck": "hi" +} \ No newline at end of file diff --git a/tests/good_config.json b/tests/good_config.json new file mode 100644 index 0000000..f3dc897 --- /dev/null +++ b/tests/good_config.json @@ -0,0 +1,5 @@ +{ + "bucket": "hi", + "awsAccessKeyId": "", + "awsSecretAccessKey": "" +} \ No newline at end of file diff --git a/tests/test_config.py b/tests/test_config.py new file mode 100644 index 0000000..7dd0dd7 --- /dev/null +++ b/tests/test_config.py @@ -0,0 +1,17 @@ +import unittest +import os +from jsonschema import ValidationError + +from services.config_manager import ConfigManager + + +class TestConfig(unittest.TestCase): + def test_bad_schema(self): + root_directory = os.path.dirname(os.path.realpath(__file__)) + with self.assertRaises(ValidationError): + ConfigManager(root_directory + "/bad_config.json") + + def test_good_schema(self): + root_directory = os.path.dirname(os.path.realpath(__file__)) + config = ConfigManager(root_directory + "/good_config.json") + self.assertEqual(config.bucket, "hi") diff --git a/tests/test_main.py b/tests/test_main.py new file mode 100644 index 0000000..3550909 --- /dev/null +++ b/tests/test_main.py @@ -0,0 +1,95 @@ +from datetime import datetime, timezone +import unittest +from unittest.mock import patch + +from main import create_product_from_file_path, get_mission_details, get_product_s3_key +from models.product import Product + + +class TestMain(unittest.TestCase): + def test_get_mission_details_success(self): + now = datetime.now().replace(microsecond=0) + with patch( + "builtins.input", + side_effect=["Buckwheat Ridge", now.strftime("%Y-%m-%d %H:%M")], + ): + mission_name, mission_time = get_mission_details() + self.assertEqual(mission_name, "Buckwheat-Ridge") + self.assertEqual( + mission_time, + now.astimezone(timezone.utc).replace(second=0).replace(tzinfo=None), + ) + + @patch("sys.exit") + def test_get_mission_details_date_failure(self, mock_exit): + with patch( + "builtins.input", + side_effect=["Buckwheat Ridge", "1"], + ): + get_mission_details() + mock_exit.assert_called_once_with(1) + + @patch("os.path.getmtime", return_value=1234567890.0) + def test_create_product_from_file_path_image(self, mock_getmtime): + file_path = "images/EO/some_image.tif" + expected_product = Product( + "image", "EO", datetime.fromtimestamp(1234567890.0, tz=timezone.utc) + ) + created_product = create_product_from_file_path(file_path) + print(created_product) + print(expected_product) + self.assertEqual(created_product, expected_product) + + mock_getmtime.assert_called_once_with(file_path) + + @patch("os.path.getmtime", return_value=1234567890.0) + def test_create_product_from_file_path_tactical(self, mock_getmtime): + file_path = "tactical/Detection/some_detection.kml" + expected_product = Product( + "tactical", + "Detection", + datetime.fromtimestamp(1234567890.0, tz=timezone.utc), + ) + created_product = create_product_from_file_path(file_path) + self.assertEqual(created_product, expected_product) + + mock_getmtime.assert_called_once_with(file_path) + + @patch("os.path.getmtime", return_value=1234567890.0) + def test_create_product_from_file_path_video(self, mock_getmtime): + file_path = "videos/some_video.ts" + expected_product = Product( + "video", None, datetime.fromtimestamp(1234567890.0, tz=timezone.utc) + ) + created_product = create_product_from_file_path(file_path) + self.assertEqual(created_product, expected_product) + + mock_getmtime.assert_called_once_with(file_path) + + @patch("os.path.getmtime", return_value=1234567890.0) + def test_create_product_from_file_path_invalid(self, mock_getmtime): + file_path = "invalid_path/some_file.txt" + with self.assertRaises(ValueError): + create_product_from_file_path(file_path) + + mock_getmtime.assert_called_once_with(file_path) + + def test_get_product_s3_key_image_eo(self): + product = Product("image", "EO", datetime.now(timezone.utc)) + s3_key = get_product_s3_key("Mission123", product, ".tif") + expected_s3_key = f"IMAGERY/{product.timestamp.strftime('%Y%m%d_%H%M%SZ')}_Mission123_EOimage.tif" + self.assertEqual(s3_key, expected_s3_key) + + def test_get_product_s3_key_tactical_detection(self): + product = Product("tactical", "Detection", datetime.now(timezone.utc)) + s3_key = get_product_s3_key("Mission456", product, ".kml") + expected_s3_key = f"TACTICAL/{product.timestamp.strftime('%Y%m%d_%H%M%SZ')}_Mission456_Detection.kml" + self.assertEqual(s3_key, expected_s3_key) + + def test_get_product_s3_key_video(self): + product = Product("video", None, datetime.now(timezone.utc)) + s3_key = get_product_s3_key("Mission789", product, ".ts") + expected_s3_key = ( + f"VIDEO/{product.timestamp.strftime('%Y%m%d_%H%M%SZ')}_Mission789_Video.ts" + ) + self.assertEqual(s3_key, expected_s3_key)