Skip to content

Enhance benchmark #1531

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 7 additions & 12 deletions ChatQnA/benchmark_chatqna.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,22 +67,17 @@ deploy:

benchmark:
# http request behavior related fields
concurrency: [1, 2, 4]
totoal_query_num: [2048, 4096]
duration: [5, 10] # unit minutes
query_num_per_concurrency: [4, 8, 16]
possion: True
possion_arrival_rate: 1.0
user_queries: [1, 2, 4]
load_shape_type: "poisson" # "constant" or "poisson"
concurrent_level: 5
poisson_arrival_rate: 1.0
warmup_iterations: 10
seed: 1024

# workload, all of the test cases will run for benchmark
test_cases:
- chatqnafixed
- chatqna_qlist_pubmed:
dataset: pub_med10 # pub_med10, pub_med100, pub_med1000
user_queries: [1, 2, 4]
query_token_size: 128 # if specified, means fixed query token size will be sent out
bench_target: [chatqnafixed, chatqna_qlist_pubmed] # specify the bench_target for benchmark
dataset: ["/home/sdp/upload_file.txt", "/home/sdp/pubmed_10000.txt"] # specify the absolute path to the dataset file
prompt: [10, 1000] # set the prompt length for the chatqna_qlist_pubmed workload, set to 10 for chatqnafixed workload

llm:
# specify the llm output token size
Expand Down
12 changes: 10 additions & 2 deletions README-deploy-benchmark.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,19 @@ Before running benchmarks, you need to:

1. **Prepare Test Data**

- Download the retrieval file:
- Testing for general benchmark target:

Download the retrieval file using the command below for data ingestion in RAG:

```bash
wget https://github.com/opea-project/GenAIEval/tree/main/evals/benchmark/data/upload_file.txt
```
- For the `chatqna_qlist_pubmed` test case, prepare `pubmed_${max_lines}.txt` by following this [README](https://github.com/opea-project/GenAIEval/blob/main/evals/benchmark/stresscli/README_Pubmed_qlist.md)

- Testing for pubmed benchmark target:

For the `chatqna_qlist_pubmed` test case, prepare `pubmed_${max_lines}.txt` by following this [README](https://github.com/opea-project/GenAIEval/blob/main/evals/benchmark/stresscli/README_Pubmed_qlist.md)

After the data is prepared, please update the `absolute path` of this file in the benchmark.yaml file. For example, in the `ChatQnA/benchmark_chatqna.yaml` file, `/home/sdp/upload_file.txt` should be replaced by your file path.

2. **Prepare Model Files (Recommended)**
```bash
Expand Down
156 changes: 113 additions & 43 deletions benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
# SPDX-License-Identifier: Apache-2.0

import os
import sys
from datetime import datetime

import requests
import yaml
from evals.benchmark.stresscli.commands.load_test import locust_runtests
from kubernetes import client, config
Expand All @@ -25,17 +25,15 @@ def construct_benchmark_config(test_suite_config):
"""Extract relevant data from the YAML based on the specified test cases."""

return {
"concurrency": test_suite_config.get("concurrency", []),
"totoal_query_num": test_suite_config.get("user_queries", []),
"duration:": test_suite_config.get("duration:", []),
"query_num_per_concurrency": test_suite_config.get("query_num_per_concurrency", []),
"possion": test_suite_config.get("possion", False),
"possion_arrival_rate": test_suite_config.get("possion_arrival_rate", 1.0),
"user_queries": test_suite_config.get("user_queries", [1]),
"load_shape_type": test_suite_config.get("load_shape_type", "constant"),
"concurrent_level": test_suite_config.get("concurrent_level", 5),
"poisson_arrival_rate": test_suite_config.get("poisson_arrival_rate", 1.0),
"warmup_iterations": test_suite_config.get("warmup_iterations", 10),
"seed": test_suite_config.get("seed", None),
"test_cases": test_suite_config.get("test_cases", ["chatqnafixed"]),
"user_queries": test_suite_config.get("user_queries", [1]),
"query_token_size": test_suite_config.get("query_token_size", 128),
"bench_target": test_suite_config.get("bench_target", ["chatqnafixed"]),
"dataset": test_suite_config.get("dataset", ["upload_file.txt"]),
"prompt": test_suite_config.get("prompt", [10]),
"llm_max_token_size": test_suite_config.get("llm", {}).get("max_token_size", [128]),
}

Expand Down Expand Up @@ -116,14 +114,16 @@ def _create_yaml_content(service, base_url, bench_target, test_phase, num_querie
print(spec)

# get folder path of opea-eval
eval_path = None
import pkg_resources

for dist in pkg_resources.working_set:
if "opea-eval" in dist.project_name:
eval_path = dist.location
# TODO: use environment variable for now until related issues been fixed in opea-eval
eval_path = os.getenv("EVAL_PATH", "")
# import pkg_resources

# for dist in pkg_resources.working_set:
# if "opea-eval" in dist.project_name:
# eval_path = dist.location
# break
if not eval_path:
print("Fail to load opea-eval package. Please install it first.")
print("Fail to find the opea-eval package. Please install/download it first.")
exit(1)

yaml_content = {
Expand All @@ -134,7 +134,7 @@ def _create_yaml_content(service, base_url, bench_target, test_phase, num_querie
"locustfile": os.path.join(eval_path, "evals/benchmark/stresscli/locust/aistress.py"),
"host": base_url,
"stop-timeout": test_params["query_timeout"],
"processes": 2,
"processes": test_params["concurrent_level"],
"namespace": test_params["namespace"],
"bench-target": bench_target,
"service-metric-collect": test_params["collect_service_metric"],
Expand Down Expand Up @@ -162,25 +162,18 @@ def _create_stresscli_confs(case_params, test_params, test_phase, num_queries, b
"""Create a stresscli configuration file and persist it on disk."""
stresscli_confs = []
# Get the workload
test_cases = test_params["test_cases"]
for test_case in test_cases:
bench_target = test_params["bench_target"]
for i, b_target in enumerate(bench_target):
stresscli_conf = {}
print(test_case)
if isinstance(test_case, str):
bench_target = test_case
elif isinstance(test_case, dict):
bench_target = list(test_case.keys())[0]
dataset_conf = test_case[bench_target]
if bench_target == "chatqna_qlist_pubmed":
max_lines = dataset_conf["dataset"].split("pub_med")[-1]
stresscli_conf["envs"] = {"DATASET": f"pubmed_{max_lines}.txt", "MAX_LINES": max_lines}
print(f"[OPEA BENCHMARK] 🚀 Running test for {b_target} in phase {test_phase} for {num_queries} queries")
stresscli_conf["envs"] = {"DATASET": test_params["dataset"][i], "MAX_LINES": str(test_params["prompt"][i])}
# Generate the content of stresscli configuration file
stresscli_yaml = _create_yaml_content(case_params, base_url, bench_target, test_phase, num_queries, test_params)
stresscli_yaml = _create_yaml_content(case_params, base_url, b_target, test_phase, num_queries, test_params)

# Dump the stresscli configuration file
service_name = case_params.get("service_name")
run_yaml_path = os.path.join(
test_params["test_output_dir"], f"run_{service_name}_{ts}_{test_phase}_{num_queries}_{bench_target}.yaml"
test_params["test_output_dir"], f"run_{service_name}_{ts}_{test_phase}_{num_queries}_{b_target}.yaml"
)
with open(run_yaml_path, "w") as yaml_file:
yaml.dump(stresscli_yaml, yaml_file)
Expand Down Expand Up @@ -215,7 +208,61 @@ def create_stresscli_confs(service, base_url, test_suite_config, index):
return stresscli_confs


def _run_service_test(example, service, test_suite_config):
def ingest_data_to_db(service, dataset, namespace):
"""Ingest data into the database."""
for service_name in service.get("service_list"):
if "data" in service_name:
# Ingest data into the database
print(f"[OPEA BENCHMARK] 🚀 Ingesting data into the database for {service_name}...")
try:
svc_ip, port = _get_service_ip(service_name, "k8s", None, None, namespace)
url = f"http://{svc_ip}:{port}/v1/dataprep/ingest"

files = {"files": open(dataset, "rb")}

response = requests.post(url, files=files)
if response.status_code != 200:
print(f"Error ingesting data: {response.text}. Status code: {response.status_code}")
return False
if "Data preparation succeeded" not in response.text:
print(f"Error ingesting data: {response.text}. Response: {response}")
return False

except Exception as e:
print(f"Error ingesting data: {e}")
return False
print(f"[OPEA BENCHMARK] 🚀 Data ingestion completed for {service_name}.")
break
return True


def clear_db(service, namespace):
"""Delete all files from the database."""
for service_name in service.get("service_list"):
if "data" in service_name:
# Delete data from the database
try:
svc_ip, port = _get_service_ip(service_name, "k8s", None, None, namespace)
url = f"http://{svc_ip}:{port}/v1/dataprep/delete"
data = {"file_path": "all"}
print(f"[OPEA BENCHMARK] 🚀 Deleting data from the database for {service_name} with {url}")

response = requests.post(url, json=data, headers={"Content-Type": "application/json"})
if response.status_code != 200:
print(f"Error deleting data: {response.text}. Status code: {response.status_code}")
return False
if "true" not in response.text:
print(f"Error deleting data: {response.text}. Response: {response}")
return False
except Exception as e:
print(f"Error deleting data: {e}")
return False
print(f"[OPEA BENCHMARK] 🚀 Data deletion completed for {service_name}.")
break
return True


def _run_service_test(example, service, test_suite_config, namespace):
"""Run the test for a specific service and example."""
print(f"[OPEA BENCHMARK] 🚀 Example: [ {example} ] Service: [ {service.get('service_name')} ], Running test...")

Expand Down Expand Up @@ -251,12 +298,31 @@ def _run_service_test(example, service, test_suite_config):
run_yaml_path = stresscli_conf["run_yaml_path"]
print(f"[OPEA BENCHMARK] 🚀 The {index} time test is running, run yaml: {run_yaml_path}...")
os.environ["MAX_TOKENS"] = str(service.get("max_output"))

dataset = None
if stresscli_conf.get("envs") is not None:
for key, value in stresscli_conf.get("envs").items():
os.environ[key] = value

if key == "DATASET":
dataset = value
if not dataset:
print(f"[OPEA BENCHMARK] 🚀 Dataset is not specified for {service_name}. Check the benchmark.yaml again.")

# Ingest data into the database for single run of benchmark
result = ingest_data_to_db(service, dataset, namespace)
if not result:
print(f"[OPEA BENCHMARK] 🚀 Data ingestion failed for {service_name}.")
exit(1)

# Run the benchmark test and append the output folder to the list
output_folders.append(locust_runtests(None, run_yaml_path))

# Delete all files from the database after the test
result = clear_db(service, namespace)
if not result:
print(f"[OPEA BENCHMARK] 🚀 Data deletion failed for {service_name}.")
exit(1)

print(f"[OPEA BENCHMARK] 🚀 Test completed for {service_name} at {url}")
return output_folders

Expand All @@ -279,16 +345,21 @@ def run_benchmark(benchmark_config, chart_name, namespace, llm_model=None, repor
"service_port": None, # Leave as None for k8s, specify for Docker
"test_output_dir": os.getcwd() + "/benchmark_output", # The directory to store the test output
"load_shape": {
"name": "constant",
"params": {"constant": {"concurrent_level": 4}, "poisson": {"arrival_rate": 1.0}},
"name": parsed_data["load_shape_type"],
"params": {
"constant": {"concurrent_level": parsed_data["concurrent_level"]},
"poisson": {"arrival_rate": parsed_data["poisson_arrival_rate"]},
},
},
"concurrent_level": 4,
"arrival_rate": 1.0,
"concurrent_level": parsed_data["concurrent_level"],
"arrival_rate": parsed_data["poisson_arrival_rate"],
"query_timeout": 120,
"warm_ups": parsed_data["warmup_iterations"],
"seed": parsed_data["seed"],
"namespace": namespace,
"test_cases": parsed_data["test_cases"],
"bench_target": parsed_data["bench_target"],
"dataset": parsed_data["dataset"],
"prompt": parsed_data["prompt"],
"llm_max_token_size": parsed_data["llm_max_token_size"],
}

Expand All @@ -313,15 +384,14 @@ def run_benchmark(benchmark_config, chart_name, namespace, llm_model=None, repor
"chatqna-retriever-usvc",
"chatqna-tei",
"chatqna-teirerank",
"chatqna-tgi",
"chatqna-vllm",
],
"test_cases": parsed_data["test_cases"],
# Activate if random_prompt=true: leave blank = default dataset(WebQuestions) or sharegpt
"prompts": query_data,
"max_output": llm_max_token, # max number of output tokens
"k": 1, # number of retrieved documents
}
output_folder = _run_service_test(chart_name, case_data, test_suite_config)
output_folder = _run_service_test(chart_name, case_data, test_suite_config, namespace)

print(f"[OPEA BENCHMARK] 🚀 Test Finished. Output saved in {output_folder}.")

Expand All @@ -339,5 +409,5 @@ def run_benchmark(benchmark_config, chart_name, namespace, llm_model=None, repor


if __name__ == "__main__":
benchmark_config = load_yaml("./benchmark.yaml")
run_benchmark(benchmark_config=benchmark_config, chart_name="chatqna", namespace="deploy-benchmark")
benchmark_config = load_yaml("./ChatQnA/benchmark_chatqna.yaml")
run_benchmark(benchmark_config=benchmark_config, chart_name="chatqna", namespace="benchmark")