Skip to content

Commit

Permalink
add nemo-ffmpeg transformer
Browse files Browse the repository at this point in the history
Signed-off-by: Abhishek Gaikwad <gaikwadabhishek1997@gmail.com>
  • Loading branch information
gaikwadabhishek committed Oct 18, 2024
1 parent 2df6cae commit 4d67899
Show file tree
Hide file tree
Showing 7 changed files with 391 additions and 0 deletions.
16 changes: 16 additions & 0 deletions transformers/NeMo/FFMPEG/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM python:3.11-alpine

# Install ffmpeg
RUN apk add --no-cache ffmpeg

COPY requirements.txt requirements.txt
RUN pip3 install -r requirements.txt

RUN mkdir /code
WORKDIR /code
COPY server.py server.py

ENV PYTHONUNBUFFERED 1

EXPOSE 80
ENTRYPOINT [ "/code/server.py", "--listen", "0.0.0.0", "--port", "80" ]
15 changes: 15 additions & 0 deletions transformers/NeMo/FFMPEG/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Default image tag is 'latest'
TAG := latest
ifeq ($(GIT_TEST), true)
TAG := test
endif

REGISTRY_URL ?= docker.io/aistorage

all: build push

build:
docker build -t $(REGISTRY_URL)/transformer_nemo_ffmpeg:$(TAG) .

push:
docker push $(REGISTRY_URL)/transformer_nemo_ffmpeg:$(TAG)
103 changes: 103 additions & 0 deletions transformers/NeMo/FFMPEG/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# NeMo FFMPEG Transformer

This transformer is based on [NeMo's Speech Data Processor (SDP) Toolkit](https://github.com/NVIDIA/NeMo-speech-data-processor). It is used to transform audio files into WAV format with control over Audio Channels (`AC`) and Audio Rate (`AR`).

To transform your audio files using this ETL, follow these steps:

## Initialize the ETL

1. **Navigate to the Directory**
Go to the directory where the specification ([`pod.yaml`](pod.yaml)) file exists.

```bash
cd ais-etl/transformers/NeMo/FFMPEG/
```

2. **Configure AIStore Endpoint**
Ensure your `AIS_ENDPOINT` is pointed to the correct AIStore cluster.

3. **Edit Configuration**
Edit the `AR` (Audio Rate) and `AC` (Audio Channels) values in the [`pod.yaml`](pod.yaml) file to match your desired output settings.

4. **Initialize the ETL**
Run the following command to create the ETL in the AIStore cluster:

```bash
ais etl init spec --from-file pod.yaml --comm-type <communication-type> --name <etl-name>
```
- `<etl-name>`: Name for your ETL.
- `<communication-type>`: Communication type for your ETL. (ref: [Communication Mechanisms](https://github.com/NVIDIA/aistore/blob/main/docs/etl.md#communication-mechanisms)). Example: `hpull`, `hpush`, etc.

### Arg Type: FQN (Fully Qualified Name)

When initializing the ETL, you can specify the argument type as Fully Qualified Name (FQN) by adding `--arg-type fqn` to the command. Using FQN means that the AIStore target will send the file path of the object to the transformation pod, rather than the object data itself. The transformation pod will then be responsible for opening, reading, transforming, and closing the corresponding file—in this case, the audio files.

**Initialization with FQN:**

```bash
ais etl init spec --from-file pod_with_fqn.yaml --comm-type hpull --arg-type fqn --name <etl-name>
```

- Replace `<etl-name>` with a name for your ETL.
- Use `pod_with_fqn.yaml` as your specification file, which includes the necessary disk attachments.

**Advantages:**

- **Performance Improvement**: Using FQN can provide a slight performance boost because it avoids transferring the object data over the network to the transformation pod.

**Disadvantages:**

- **Disk Attachment Required**: You must attach all the disks that are attached to the AIStore target to the transformation pod. This requires updating the pod specification as shown in [`pod_with_fqn.yaml`](pod_with_fqn.yaml).

## Transform Data Using the ETL

There are two ways to transform data using this ETL:

### 1. Inline Transformation (During GET Request)

Transform a single object and save the output to a file:

```bash
ais etl object <etl-name> <bucket-name>/<object-name> <output-file>.wav
```

- `<etl-name>`: Name of the ETL you initialized.
- `<bucket-name>`: Name of the bucket containing your audio file.
- `<object-name>`: Name of the audio file to transform.
- `<output-file>.wav`: Filename for the transformed WAV file.

This command transforms the specified object and saves it as a WAV file.

### 2. Offline Transformation (Batch Processing)

Transform multiple objects in parallel and save them to another bucket. This method is faster and leverages AIStore's parallelization capabilities.

#### Get Help on the Command

To see all options for the `bucket` command, run:

```bash
ais etl bucket -h
```

#### Sample Command

```bash
ais etl bucket <etl-name> <source-bucket> <destination-bucket> \
--cont-on-err \
--num-workers 500 \
--ext "{wav:wav,opus:wav,m4a:wav}" \
--prefix=<virtual-sub-directory> \
--prepend="transformed/"
```

- `<etl-name>`: Name of the ETL you initialized.
- `<source-bucket>`: Bucket containing the original audio files.
- `<destination-bucket>`: Bucket where transformed files will be saved.
- `--cont-on-err`: Continue processing even if errors occur.
- `--ext "{wav:wav,opus:wav,m4a:wav}"`: Specify input and output file extensions. If you dont specify this, the transformed objects will have the same name and extension.
- `--num-workers 500`: (Optional) Number of parallel workers (adjust as needed).
- `--prefix=<virtual-sub-directory>`: (Optional) Process only files within this sub-directory.
- `--prepend="transformed/"`: (Optional) Prepend this path to the destination objects.

This command transforms all data in the `<source-bucket>` (optionally within the specified virtual sub-directory) and saves it to the `<destination-bucket>`, optionally under the `transformed/` sub-directory.
28 changes: 28 additions & 0 deletions transformers/NeMo/FFMPEG/pod.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
apiVersion: v1
kind: Pod
metadata:
name: transformer-nemo-ffmpeg
annotations:
# Values it can take ["hpull://","hrev://","hpush://"]
communication_type: "hpull://"
wait_timeout: 5m
spec:
containers:
- name: server
image: aistorage/transformer_nemo_ffmpeg:latest
imagePullPolicy: IfNotPresent
ports:
- name: default
containerPort: 80
command: ['/code/server.py', '--listen', '0.0.0.0', '--port', '80']
readinessProbe:
httpGet:
path: /health
port: default
env:
- name: AR
value: "16000" # Sample rate
- name: AC
value: "1" # Audio channels
- name: ARG_TYPE
value: ""
91 changes: 91 additions & 0 deletions transformers/NeMo/FFMPEG/pod_with_fqn.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
apiVersion: v1
kind: Pod
metadata:
name: transformer-nemo-ffmpeg-fqn
annotations:
# Values it can take ["hpull://","hrev://","hpush://"]
communication_type: "hpull://"
wait_timeout: 5m
spec:
containers:
- name: server
image: aistorage/transformer_nemo_ffmpeg:latest
imagePullPolicy: IfNotPresent
ports:
- name: default
containerPort: 80
command: ['/code/server.py', '--listen', '0.0.0.0', '--port', '80']
readinessProbe:
httpGet:
path: /health
port: default
env:
- name: AR
value: "16000" # Sample rate
- name: AC
value: "1" # Audio channels
- name: ARG_TYPE
value: "fqn" # Set to "fqn" for using fully qualified name. This will open the file locally instead of making a request to the server
# Add volume mounts and volumes for each of the disks in your targets
volumeMounts:
- name: sda
mountPath: /ais/sda
- name: sdb
mountPath: /ais/sdb
- name: sdc
mountPath: /ais/sdc
- name: sdd
mountPath: /ais/sdd
- name: sde
mountPath: /ais/sde
- name: sdf
mountPath: /ais/sdf
- name: sdg
mountPath: /ais/sdg
- name: sdh
mountPath: /ais/sdh
- name: sdi
mountPath: /ais/sdi
- name: sdj
mountPath: /ais/sdj
volumes:
- name: sda
hostPath:
path: /ais/sda
type: Directory
- name: sdb
hostPath:
path: /ais/sdb
type: Directory
- name: sdc
hostPath:
path: /ais/sdc
type: Directory
- name: sdd
hostPath:
path: /ais/sdd
type: Directory
- name: sde
hostPath:
path: /ais/sde
type: Directory
- name: sdf
hostPath:
path: /ais/sdf
type: Directory
- name: sdg
hostPath:
path: /ais/sdg
type: Directory
- name: sdh
hostPath:
path: /ais/sdh
type: Directory
- name: sdi
hostPath:
path: /ais/sdi
type: Directory
- name: sdj
hostPath:
path: /ais/sdj
type: Directory
1 change: 1 addition & 0 deletions transformers/NeMo/FFMPEG/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
requests
137 changes: 137 additions & 0 deletions transformers/NeMo/FFMPEG/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
#!/usr/bin/env python

import argparse
import requests
import os
from http.server import HTTPServer, BaseHTTPRequestHandler
from socketserver import ThreadingMixIn
import subprocess
import io
import wave
from urllib.parse import unquote

# Fetch environment variables with defaults
HOST_TARGET = os.getenv("AIS_TARGET_URL", "")
AR = int(os.getenv("AR", 44100))
AC = int(os.getenv("AC", 1))
ARG_TYPE = os.getenv("ARG_TYPE", "").lower()


# Define the transform function for audio processing
def transform(input_bytes: bytes, ac: int = AC, ar: int = AR) -> bytes:
process_args = [
"ffmpeg",
"-nostdin",
"-loglevel",
"error",
"-i",
"pipe:0",
"-map",
"0:a",
"-ac",
str(ac),
"-ar",
str(ar),
"-c:a",
"pcm_s16le",
"-f",
"s16le", # Output raw PCM data
"-y",
"pipe:1",
]

# Run ffmpeg and capture raw PCM data
process = subprocess.Popen(
process_args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)

raw_audio_data, stderr = process.communicate(input=input_bytes)

if process.returncode != 0:
raise RuntimeError(f"FFmpeg process failed: {stderr.decode()}")

# Create a WAV file in memory
with io.BytesIO() as wav_io:
with wave.open(wav_io, "wb") as wav_file:
wav_file.setnchannels(ac)
wav_file.setsampwidth(2) # 16-bit audio
wav_file.setframerate(ar)
wav_file.writeframes(raw_audio_data)
output_bytes = wav_io.getvalue()

return output_bytes


class RequestHandler(BaseHTTPRequestHandler):
def log_request(self, code="-", size="-"):
# Suppress request logs; error logs will be handled separately
pass

def _set_headers(self, content_length=None, content_type="audio/wav"):
self.send_response(200)
self.send_header("Content-Type", content_type)
if content_length is not None:
self.send_header("Content-Length", str(content_length))
self.end_headers()

def do_PUT(self):
content_length = int(self.headers["Content-Length"])
post_data = self.rfile.read(content_length)
try:
output_bytes = transform(post_data)
self._set_headers(content_length=len(output_bytes))
self.wfile.write(output_bytes)
except RuntimeError as error:
self.send_error(500, str(error))

def do_GET(self):
if self.path == "/health":
response = b"Running"
self._set_headers(content_length=len(response), content_type="text/plain")
self.wfile.write(response)
return

try:
if ARG_TYPE == "fqn":
decoded_path = unquote(self.path)
safe_path = os.path.normpath(
os.path.join("/", decoded_path.lstrip("/"))
)
with open(safe_path, "rb") as file:
file_content = file.read()
output_bytes = transform(file_content)
else:
response = requests.get(HOST_TARGET + self.path)
response.raise_for_status()
output_bytes = transform(response.content)
self._set_headers(content_length=len(output_bytes))
self.wfile.write(output_bytes)
except requests.HTTPError as http_err:
self.send_error(502, f"Error fetching data: {http_err}")
except RuntimeError as error:
self.send_error(500, str(error))


class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
"""Handle requests in a separate thread."""


def run_server(addr="localhost", port=8000):
server = ThreadedHTTPServer((addr, port), RequestHandler)
print(f"Starting HTTP server on {addr}:{port}")
server.serve_forever()


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run a simple HTTP server")
parser.add_argument(
"-l", "--listen", default="localhost", help="IP address to listen on"
)
parser.add_argument(
"-p", "--port", type=int, default=8000, help="Port to listen on"
)
args = parser.parse_args()
run_server(addr=args.listen, port=args.port)

0 comments on commit 4d67899

Please sign in to comment.