diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 5e7ab58..b047435 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -91,6 +91,18 @@ build:md5: when: always - when: manual +build:hash_with_metadata: + extends: .build_template + script: + - cd transformers + - cd hash_with_metadata && make -B all + rules: + - changes: + - transformers/hash_with_metadata/**/* + - transformers/tests/test_hash_with_metadata.py + when: always + - when: manual + build:tar2tf: extends: .build_template script: @@ -170,6 +182,12 @@ test:md5: script: - cd transformers/tests && pytest -v test_md5.py +test:hash_with_metadata: + extends: .test_template + needs: ["build:hash_with_metadata"] + script: + - cd transformers/tests && pytest -v test_hash_with_metadata.py + test:tar2tf: extends: .test_template needs: ["build:tar2tf"] diff --git a/transformers/hash_with_metadata/Dockerfile b/transformers/hash_with_metadata/Dockerfile new file mode 100644 index 0000000..11698bb --- /dev/null +++ b/transformers/hash_with_metadata/Dockerfile @@ -0,0 +1,14 @@ +FROM python:3.11-alpine + +COPY requirements.txt requirements.txt +RUN pip3 install -r requirements.txt + +RUN mkdir /code +WORKDIR /code +COPY server.py server.py + +ENV PYTHONUNBUFFERED 1 + +EXPOSE 80 + +ENTRYPOINT [ "/code/server.py", "--listen", "0.0.0.0", "--port", "80" ] diff --git a/transformers/hash_with_metadata/Makefile b/transformers/hash_with_metadata/Makefile new file mode 100644 index 0000000..2187613 --- /dev/null +++ b/transformers/hash_with_metadata/Makefile @@ -0,0 +1,15 @@ +# Default image tag is 'latest' +TAG := latest +ifeq ($(GIT_TEST), true) + TAG := test +endif + +REGISTRY_URL ?= docker.io/aistorage + +all: build push + +build: + docker build -t $(REGISTRY_URL)/transformer_hash_with_metadata:$(TAG) . + +push: + docker push $(REGISTRY_URL)/transformer_hash_with_metadata:$(TAG) diff --git a/transformers/hash_with_metadata/README.md b/transformers/hash_with_metadata/README.md new file mode 100644 index 0000000..0a8641e --- /dev/null +++ b/transformers/hash_with_metadata/README.md @@ -0,0 +1,26 @@ +# Hash with Metadata Transformer + +A simple hash transformer that processes objects (bytes) by extracting ETL metadata from an inline transform request and using it as a seed value to compute a seeded hash. This example demonstrates how to pass custom metadata for each individual object through an ETL inline transform and utilize it within your pod. + +### Initializing ETL with AIStore CLI + +The following steps demonstrate how to initialize the `transformer-hash-with-metadata` with using the [AIStore CLI](https://github.com/NVIDIA/aistore/blob/master/docs/cli.md): + +```!bash +$ cd transformers/hash_with_metadata + +$ # Mention communication type b/w target and container +$ export COMMUNICATION_TYPE='hpull://' + +# Substitute env variables in spec file +$ envsubst < pod.yaml > init_spec.yaml + +$ # Initialize ETL +$ ais etl init spec --from-file init_spec.yaml --name --comm-type "hpull://" + +$ # Put an object +$ ais object put ais:// + +$ # Transform and retrieve objects from the bucket using this ETL with metadata +$ curl -L -X GET "${AIS_ENDPOINT}/v1/objects//?etl_name=&etl_meta=100000" +``` \ No newline at end of file diff --git a/transformers/hash_with_metadata/pod.yaml b/transformers/hash_with_metadata/pod.yaml new file mode 100644 index 0000000..0aa6b73 --- /dev/null +++ b/transformers/hash_with_metadata/pod.yaml @@ -0,0 +1,24 @@ +apiVersion: v1 +kind: Pod +metadata: + name: transformer-hash-with-metadata + annotations: + # Values it can take ["hpull://","hrev://","hpush://"] + communication_type: ${COMMUNICATION_TYPE:-"\"hpull://\""} + wait_timeout: 5m +spec: + containers: + - name: server + image: aistorage/transformer_hash_with_metadata:latest + imagePullPolicy: IfNotPresent + ports: + - name: default + containerPort: 80 + command: ['/code/server.py', '--listen', '0.0.0.0', '--port', '80'] + readinessProbe: + httpGet: + path: /health + port: default + env: + - name: SEED_DEFAULT + value: "0" diff --git a/transformers/hash_with_metadata/requirements.txt b/transformers/hash_with_metadata/requirements.txt new file mode 100644 index 0000000..c25c766 --- /dev/null +++ b/transformers/hash_with_metadata/requirements.txt @@ -0,0 +1,2 @@ +requests +xxhash \ No newline at end of file diff --git a/transformers/hash_with_metadata/server.py b/transformers/hash_with_metadata/server.py new file mode 100755 index 0000000..957363f --- /dev/null +++ b/transformers/hash_with_metadata/server.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python + +import argparse +import xxhash +import requests +import os +import logging +from urllib.parse import urlparse, parse_qs +from http.server import HTTPServer, BaseHTTPRequestHandler +from socketserver import ThreadingMixIn + +host_target = os.environ['AIS_TARGET_URL'] +seed_default = int(os.getenv("SEED_DEFAULT", "0")) + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", +) + +class Handler(BaseHTTPRequestHandler): + def log_request(self, code='-', size='-'): + # Don't log successful requests info. Unsuccessful logged by log_error(). + pass + + def _set_headers(self): + self.send_response(200) + self.send_header("Content-Type", "text/plain") + self.end_headers() + + def do_PUT(self): + try: + content_length = int(self.headers['Content-Length']) + post_data = self.rfile.read(content_length) + parsed_url = urlparse(self.path) + seed = seed_default + params = parse_qs(parsed_url.query) + if "etl_meta" in params: + seed = int(params["etl_meta"][0]) + + hash_result = self.calculate_xxhash(post_data, seed) + self._set_headers() + self.wfile.write(hash_result.encode()) + except Exception as e: + logging.error("Error in PUT request: %s", e) + self.send_error(500, f"Internal Server Error: {e}") + + def do_GET(self): + if self.path == "/health": + self._set_headers() + self.wfile.write(b"Running") + return + + try: + parsed_url = urlparse(self.path) + x = requests.get(host_target + self.path) + + seed = seed_default + params = parse_qs(parsed_url.query) + if "etl_meta" in params: + seed = int(params["etl_meta"][0]) + + hash_result = self.calculate_xxhash(x.content, seed) + self._set_headers() + self.wfile.write(hash_result.encode()) + except requests.HTTPError as http_err: + logging.error("HTTP error in GET request: %s", http_err) + self.send_error(502, f"Bad Gateway: {http_err}") + except Exception as e: + logging.error("Error in GET request: %s", e) + self.send_error(500, f"Internal Server Error: {e}") + + def calculate_xxhash(self, data, seed): + hasher = xxhash.xxh64(seed=seed) + hasher.update(data) + return hasher.hexdigest() + + +class ThreadedHTTPServer(ThreadingMixIn, HTTPServer): + """Handle requests in a separate thread.""" + + +def run(addr="localhost", port=8000): + """Start the threaded HTTP server.""" + logging.info("Starting HTTP server on %s:%s", addr, port) + try: + server = ThreadedHTTPServer((addr, port), Handler) + server.serve_forever() + except KeyboardInterrupt: + logging.info("Shutting down the server.") + except Exception as e: + logging.error("Unexpected server error: %s", e) + finally: + logging.info("Server stopped.") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run a simple HTTP server") + parser.add_argument( + "-l", + "--listen", + default="localhost", + help="Specify the IP address on which the server listens", + ) + parser.add_argument( + "-p", + "--port", + type=int, + default=8000, + help="Specify the port on which the server listens", + ) + args = parser.parse_args() + run(addr=args.listen, port=args.port) diff --git a/transformers/tests/test_hash_with_metadata.py b/transformers/tests/test_hash_with_metadata.py new file mode 100644 index 0000000..10dfc43 --- /dev/null +++ b/transformers/tests/test_hash_with_metadata.py @@ -0,0 +1,90 @@ +# +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# pylint: disable=missing-class-docstring, missing-function-docstring, missing-module-docstring + +import xxhash +import random + +from aistore.sdk.etl.etl_const import ETL_COMM_HPULL, ETL_COMM_HPUSH, ETL_COMM_HREV + +from tests.utils import git_test_mode_format_image_tag_test +from tests.base import TestBase + +HASH_WITH_METADATA_SPEC_TEMPLATE = """ +apiVersion: v1 +kind: Pod +metadata: + name: transformer-hash-with-metadata + annotations: + # Values it can take ["hpull://","hrev://","hpush://"] + communication_type: "{communication_type}://" + wait_timeout: 5m +spec: + containers: + - name: server + image: aistorage/transformer_hash_with_metadata:latest + imagePullPolicy: Always + ports: + - name: default + containerPort: 80 + command: ['/code/server.py', '--listen', '0.0.0.0', '--port', '80'] + readinessProbe: + httpGet: + path: /health + port: default + env: + - name: SEED_DEFAULT + value: "{seed_default}" +""" + +class TestHashWithMetadataTransformer(TestBase): + def setUp(self): + super().setUp() + self.test_image_filename = "test-image.jpg" + self.test_image_source = "./resources/test-image.jpg" + self.test_text_filename = "test-text.txt" + self.test_text_source = "./resources/test-text.txt" + self.test_bck.object(self.test_image_filename).get_writer().put_file(self.test_image_source) + self.test_bck.object(self.test_text_filename).get_writer().put_file(self.test_text_source) + + def seeded_hash_file(self, filepath, seed): + with open(filepath, "rb") as file: + file_content = file.read() + hasher = xxhash.xxh64(seed=seed) + hasher.update(file_content) + return hasher.hexdigest() + + def compare_transformed_data_with_seeded_hash(self, filename, original_filepath, seed): + transformed_data_bytes = ( + self.test_bck.object(filename).get_reader(etl_name=self.test_etl.name).read_all() + ) + original_file_hash = self.seeded_hash_file(original_filepath, seed) + self.assertEqual(transformed_data_bytes.decode("utf-8"), original_file_hash) + + def run_seeded_hash_test(self, communication_type): + seed_default=random.randint(0, 1000) + template = HASH_WITH_METADATA_SPEC_TEMPLATE.format(communication_type=communication_type, seed_default=seed_default) + + if self.git_test_mode == "true": + template = git_test_mode_format_image_tag_test(template, "hash-with-metadata") + + self.test_etl.init_spec( + template=template, communication_type=communication_type + ) + + self.compare_transformed_data_with_seeded_hash( + self.test_image_filename, self.test_image_source, seed_default + ) + self.compare_transformed_data_with_seeded_hash( + self.test_text_filename, self.test_text_source, seed_default + ) + + def test_seeded_hash_hpull(self): + self.run_seeded_hash_test(ETL_COMM_HPULL) + + def test_seeded_hash_hpush(self): + self.run_seeded_hash_test(ETL_COMM_HPUSH) + + def test_seeded_hash_hrev(self): + self.run_seeded_hash_test(ETL_COMM_HREV)