Skip to content

Commit

Permalink
Add ETL transformer for metadata passing
Browse files Browse the repository at this point in the history
Signed-off-by: Tony Chen <a122774007@gmail.com>
  • Loading branch information
Nahemah1022 committed Feb 7, 2025
1 parent a39e013 commit 5e5e6d7
Show file tree
Hide file tree
Showing 8 changed files with 302 additions and 0 deletions.
18 changes: 18 additions & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,18 @@ build:md5:
when: always
- when: manual

build:hash_with_metadata:
extends: .build_template
script:
- cd transformers
- cd hash_with_metadata && make -B all
rules:
- changes:
- transformers/hash_with_metadata/**/*
- transformers/tests/test_hash_with_metadata.py
when: always
- when: manual

build:tar2tf:
extends: .build_template
script:
Expand Down Expand Up @@ -170,6 +182,12 @@ test:md5:
script:
- cd transformers/tests && pytest -v test_md5.py

test:hash_with_metadata:
extends: .test_template
needs: ["build:hash_with_metadata"]
script:
- cd transformers/tests && pytest -v test_hash_with_metadata.py

test:tar2tf:
extends: .test_template
needs: ["build:tar2tf"]
Expand Down
14 changes: 14 additions & 0 deletions transformers/hash_with_metadata/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM python:3.11-alpine

COPY requirements.txt requirements.txt
RUN pip3 install -r requirements.txt

RUN mkdir /code
WORKDIR /code
COPY server.py server.py

ENV PYTHONUNBUFFERED 1

EXPOSE 80

ENTRYPOINT [ "/code/server.py", "--listen", "0.0.0.0", "--port", "80" ]
15 changes: 15 additions & 0 deletions transformers/hash_with_metadata/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Default image tag is 'latest'
TAG := latest
ifeq ($(GIT_TEST), true)
TAG := test
endif

REGISTRY_URL ?= docker.io/aistorage

all: build push

build:
docker build -t $(REGISTRY_URL)/transformer_hash_with_metadata:$(TAG) .

push:
docker push $(REGISTRY_URL)/transformer_hash_with_metadata:$(TAG)
26 changes: 26 additions & 0 deletions transformers/hash_with_metadata/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Hash with Metadata Transformer

A simple hash transformer that processes objects (bytes) by extracting ETL metadata from an inline transform request and using it as a seed value to compute a seeded hash. This example demonstrates how to pass custom metadata for each individual object through an ETL inline transform and utilize it within your pod.

### Initializing ETL with AIStore CLI

The following steps demonstrate how to initialize the `transformer-hash-with-metadata` with using the [AIStore CLI](https://github.com/NVIDIA/aistore/blob/master/docs/cli.md):

```!bash
$ cd transformers/hash_with_metadata
$ # Mention communication type b/w target and container
$ export COMMUNICATION_TYPE='hpull://'
# Substitute env variables in spec file
$ envsubst < pod.yaml > init_spec.yaml
$ # Initialize ETL
$ ais etl init spec --from-file init_spec.yaml --name <etl-name> --comm-type "hpull://"
$ # Put an object
$ ais object put <your-file> ais://<bck-name>
$ # Transform and retrieve objects from the bucket using this ETL with metadata
$ curl -L -X GET "${AIS_ENDPOINT}/v1/objects/<bck-name>/<your-file>?etl_name=<etl-name>&etl_meta=100000"
```
24 changes: 24 additions & 0 deletions transformers/hash_with_metadata/pod.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
apiVersion: v1
kind: Pod
metadata:
name: transformer-hash-with-metadata
annotations:
# Values it can take ["hpull://","hrev://","hpush://"]
communication_type: ${COMMUNICATION_TYPE:-"\"hpull://\""}
wait_timeout: 5m
spec:
containers:
- name: server
image: aistorage/transformer_hash_with_metadata:latest
imagePullPolicy: IfNotPresent
ports:
- name: default
containerPort: 80
command: ['/code/server.py', '--listen', '0.0.0.0', '--port', '80']
readinessProbe:
httpGet:
path: /health
port: default
env:
- name: SEED_DEFAULT
value: "0"
2 changes: 2 additions & 0 deletions transformers/hash_with_metadata/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
requests
xxhash
113 changes: 113 additions & 0 deletions transformers/hash_with_metadata/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#!/usr/bin/env python

import argparse
import xxhash
import requests
import os
import logging
from urllib.parse import urlparse, parse_qs
from http.server import HTTPServer, BaseHTTPRequestHandler
from socketserver import ThreadingMixIn

host_target = os.environ['AIS_TARGET_URL']
seed_default = int(os.getenv("SEED_DEFAULT", "0"))

# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)

class Handler(BaseHTTPRequestHandler):
def log_request(self, code='-', size='-'):
# Don't log successful requests info. Unsuccessful logged by log_error().
pass

def _set_headers(self):
self.send_response(200)
self.send_header("Content-Type", "text/plain")
self.end_headers()

def do_PUT(self):
try:
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
parsed_url = urlparse(self.path)
seed = seed_default
params = parse_qs(parsed_url.query)
if "etl_meta" in params:
seed = int(params["etl_meta"][0])

hash_result = self.calculate_xxhash(post_data, seed)
self._set_headers()
self.wfile.write(hash_result.encode())
except Exception as e:
logging.error("Error in PUT request: %s", e)
self.send_error(500, f"Internal Server Error: {e}")

def do_GET(self):
if self.path == "/health":
self._set_headers()
self.wfile.write(b"Running")
return

try:
parsed_url = urlparse(self.path)
x = requests.get(host_target + self.path)

seed = seed_default
params = parse_qs(parsed_url.query)
if "etl_meta" in params:
seed = int(params["etl_meta"][0])

hash_result = self.calculate_xxhash(x.content, seed)
self._set_headers()
self.wfile.write(hash_result.encode())
except requests.HTTPError as http_err:
logging.error("HTTP error in GET request: %s", http_err)
self.send_error(502, f"Bad Gateway: {http_err}")
except Exception as e:
logging.error("Error in GET request: %s", e)
self.send_error(500, f"Internal Server Error: {e}")

def calculate_xxhash(self, data, seed):
hasher = xxhash.xxh64(seed=seed)
hasher.update(data)
return hasher.hexdigest()


class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
"""Handle requests in a separate thread."""


def run(addr="localhost", port=8000):
"""Start the threaded HTTP server."""
logging.info("Starting HTTP server on %s:%s", addr, port)
try:
server = ThreadedHTTPServer((addr, port), Handler)
server.serve_forever()
except KeyboardInterrupt:
logging.info("Shutting down the server.")
except Exception as e:
logging.error("Unexpected server error: %s", e)
finally:
logging.info("Server stopped.")


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run a simple HTTP server")
parser.add_argument(
"-l",
"--listen",
default="localhost",
help="Specify the IP address on which the server listens",
)
parser.add_argument(
"-p",
"--port",
type=int,
default=8000,
help="Specify the port on which the server listens",
)
args = parser.parse_args()
run(addr=args.listen, port=args.port)
90 changes: 90 additions & 0 deletions transformers/tests/test_hash_with_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# pylint: disable=missing-class-docstring, missing-function-docstring, missing-module-docstring

import xxhash
import random

from aistore.sdk.etl.etl_const import ETL_COMM_HPULL, ETL_COMM_HPUSH, ETL_COMM_HREV

from tests.utils import git_test_mode_format_image_tag_test
from tests.base import TestBase

HASH_WITH_METADATA_SPEC_TEMPLATE = """
apiVersion: v1
kind: Pod
metadata:
name: transformer-hash-with-metadata
annotations:
# Values it can take ["hpull://","hrev://","hpush://"]
communication_type: "{communication_type}://"
wait_timeout: 5m
spec:
containers:
- name: server
image: aistorage/transformer_hash_with_metadata:latest
imagePullPolicy: Always
ports:
- name: default
containerPort: 80
command: ['/code/server.py', '--listen', '0.0.0.0', '--port', '80']
readinessProbe:
httpGet:
path: /health
port: default
env:
- name: SEED_DEFAULT
value: "{seed_default}"
"""

class TestHashWithMetadataTransformer(TestBase):
def setUp(self):
super().setUp()
self.test_image_filename = "test-image.jpg"
self.test_image_source = "./resources/test-image.jpg"
self.test_text_filename = "test-text.txt"
self.test_text_source = "./resources/test-text.txt"
self.test_bck.object(self.test_image_filename).get_writer().put_file(self.test_image_source)
self.test_bck.object(self.test_text_filename).get_writer().put_file(self.test_text_source)

def seeded_hash_file(self, filepath, seed):
with open(filepath, "rb") as file:
file_content = file.read()
hasher = xxhash.xxh64(seed=seed)
hasher.update(file_content)
return hasher.hexdigest()

def compare_transformed_data_with_seeded_hash(self, filename, original_filepath, seed):
transformed_data_bytes = (
self.test_bck.object(filename).get_reader(etl_name=self.test_etl.name).read_all()
)
original_file_hash = self.seeded_hash_file(original_filepath, seed)
self.assertEqual(transformed_data_bytes.decode("utf-8"), original_file_hash)

def run_seeded_hash_test(self, communication_type):
seed_default=random.randint(0, 1000)
template = HASH_WITH_METADATA_SPEC_TEMPLATE.format(communication_type=communication_type, seed_default=seed_default)

if self.git_test_mode == "true":
template = git_test_mode_format_image_tag_test(template, "hash-with-metadata")

self.test_etl.init_spec(
template=template, communication_type=communication_type
)

self.compare_transformed_data_with_seeded_hash(
self.test_image_filename, self.test_image_source, seed_default
)
self.compare_transformed_data_with_seeded_hash(
self.test_text_filename, self.test_text_source, seed_default
)

def test_seeded_hash_hpull(self):
self.run_seeded_hash_test(ETL_COMM_HPULL)

def test_seeded_hash_hpush(self):
self.run_seeded_hash_test(ETL_COMM_HPUSH)

def test_seeded_hash_hrev(self):
self.run_seeded_hash_test(ETL_COMM_HREV)

0 comments on commit 5e5e6d7

Please sign in to comment.