Skip to content

Commit 90192fc

Browse files
committed
[CI/UT][PD Disaggreate] Initialize PD Disaggreate UT
Signed-off-by: MengqingCao <cmq0113@163.com>
1 parent e2a0c19 commit 90192fc

File tree

9 files changed

+326
-16
lines changed

9 files changed

+326
-16
lines changed

.github/workflows/vllm_ascend_test_pd.yaml

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,6 @@ jobs:
5555
options: >-
5656
--device /dev/davinci0
5757
--device /dev/davinci1
58-
--device /dev/davinci2
59-
--device /dev/davinci3
60-
--device /dev/davinci4
61-
--device /dev/davinci5
62-
--device /dev/davinci6
63-
--device /dev/davinci7
6458
--device /dev/davinci_manager
6559
--device /dev/devmm_svm
6660
--device /dev/hisi_hdc
@@ -105,3 +99,7 @@ jobs:
10599
run: |
106100
pip install -r requirements-dev.txt
107101
pip install -v -e .
102+
103+
- name: Run vllm-project/vllm-ascend PD Disaggregation test
104+
run: |
105+
pytest -sv tests/e2e/pd_disaggreate/test_pd_e2e.py

examples/disaggregated_prefill/disaggregated_prefill_offline.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from multiprocessing import Event, Process
1414

1515
kv_connector_extra_config = {
16-
"prompt_device_ips": ["1.2.3.1", "1.2.3.2"],
16+
"prefill_device_ips": ["1.2.3.1", "1.2.3.2"],
1717
"decode_device_ips": ["1.2.3.9", "1.2.3.10"],
1818
"llmdatadist_comm_port": 26000,
1919
}

examples/disaggregated_prefill/p2p_disaggrefated_prefill_proxy.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,13 @@ async def handle_request():
181181

182182

183183
if __name__ == "__main__":
184-
t = start_service_discovery("0.0.0.0", 30001)
185-
app.run(host="0.0.0.0", port=10001)
184+
import argparse
185+
parser = argparse.ArgumentParser(
186+
description="args of disaggregated-prefill proxy")
187+
parser.add_argument("--http-port", type=int, default=10001)
188+
parser.add_argument("--register-port", type=int, default=10002)
189+
args = parser.parse_args()
190+
191+
t = start_service_discovery("0.0.0.0", args.register_port)
192+
app.run(host="0.0.0.0", port=args.http_port)
186193
t.join()

packages.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
11
git
22
vim
33
wget
4+
jq
5+
curl

requirements-dev.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,5 @@ types-jsonschema
1010
xgrammar
1111
zmq
1212
numba
13+
quart
14+
types-psutil

tests/e2e/pd_disaggreate/setup_pd.sh

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
#!/bin/bash
2+
3+
#
4+
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
# This file is a part of the vllm-ascend project.
18+
#
19+
20+
function run_prefill_instance() {
21+
local model_name=$1
22+
local tp_size=$2
23+
local prefill_port=$3
24+
local register_port=$4
25+
local prefill_device_ips=$5
26+
local decode_device_ips=$6
27+
28+
echo "================================"
29+
echo "Testing model: $model_name"
30+
echo "================================"
31+
# Start prefill instance
32+
33+
KV_CONFIG=$(jq -n \
34+
--arg kv_connector "AscendSimpleConnector" \
35+
--arg kv_buffer_device "npu" \
36+
--arg kv_role "kv_producer" \
37+
--argjson kv_parallel_size 8 \
38+
--arg kv_port "11001" \
39+
--argjson prefill_device_ips "$prefill_device_ips" \
40+
--argjson decode_device_ips "$decode_device_ips" \
41+
--argjson llmdatadist_comm_port "26000" \
42+
--arg proxy_ip "0.0.0.0" \
43+
--argjson proxy_port "$register_port" \
44+
--argjson http_port "$prefill_port" \
45+
'{
46+
"kv_connector": $kv_connector,
47+
"kv_buffer_device": $kv_buffer_device,
48+
"kv_role": $kv_role,
49+
"kv_parallel_size": $kv_parallel_size,
50+
"kv_port": $kv_port,
51+
"kv_connector_extra_config": {
52+
"prefill_device_ips": $prefill_device_ips,
53+
"decode_device_ips": $decode_device_ips,
54+
"llmdatadist_comm_port": $llmdatadist_comm_port,
55+
"proxy_ip": $proxy_ip,
56+
"proxy_port": $proxy_port,
57+
"http_port": $http_port
58+
}
59+
}')
60+
61+
# start prefill instance
62+
ASCEND_RT_VISIBLE_DEVICES=0 vllm serve $model_name \
63+
--host 0.0.0.0 \
64+
--port $prefill_port \
65+
--tensor-parallel-size $tp_size \
66+
--served-model-name Deepseek \
67+
--max-model-len 2000 \
68+
--trust-remote-code \
69+
--kv-transfer-config "$KV_CONFIG" &
70+
}
71+
72+
73+
74+
function run_decode_instance() {
75+
# Start decode instance
76+
local model_name=$1
77+
local tp_size=$2
78+
local decode_port=$3
79+
local register_port=$4
80+
local prefill_device_ips=$5
81+
local decode_device_ips=$6
82+
83+
KV_CONFIG=$(jq -n \
84+
--arg kv_connector "AscendSimpleConnector" \
85+
--arg kv_buffer_device "npu" \
86+
--arg kv_role "kv_consumer" \
87+
--argjson kv_parallel_size 8 \
88+
--arg kv_port "21001" \
89+
--argjson prefill_device_ips "$prefill_device_ips" \
90+
--argjson decode_device_ips "$decode_device_ips" \
91+
--argjson llmdatadist_comm_port "26000" \
92+
--arg proxy_ip "0.0.0.0" \
93+
--argjson proxy_port "$register_port" \
94+
--argjson http_port "$decode_port" \
95+
'{
96+
"kv_connector": $kv_connector,
97+
"kv_buffer_device": $kv_buffer_device,
98+
"kv_role": $kv_role,
99+
"kv_parallel_size": $kv_parallel_size,
100+
"kv_port": $kv_port,
101+
"kv_connector_extra_config": {
102+
"prefill_device_ips": $prefill_device_ips,
103+
"decode_device_ips": $decode_device_ips,
104+
"llmdatadist_comm_port": $llmdatadist_comm_port,
105+
"proxy_ip": $proxy_ip,
106+
"proxy_port": $proxy_port,
107+
"http_port": $http_port
108+
}
109+
}')
110+
111+
# start decode instance
112+
ASCEND_RT_VISIBLE_DEVICES=1 vllm serve $model_name \
113+
--host 0.0.0.0 \
114+
--port $decode_port \
115+
--tensor-parallel-size $tp_size \
116+
--seed 1024 \
117+
--served-model-name Deepseek \
118+
--max-model-len 2000 \
119+
--max-num-batched-tokens 2000 \
120+
--trust-remote-code \
121+
--gpu-memory-utilization 0.9 \
122+
--kv-transfer-config "$KV_CONFIG" &
123+
}
124+
125+
function run_proxy_server() {
126+
# Build the command for the proxy server with all the hosts and ports
127+
register_port=$1
128+
proxy_port=$2
129+
PROXY_CMD="python examples/disaggregated_prefill/p2p_disaggrefated_prefill_proxy.py --http-port $proxy_port --register-port $register_port"
130+
131+
# Start the proxy server
132+
echo "Starting proxy server with command: $PROXY_CMD"
133+
$PROXY_CMD &
134+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
#
2+
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
# This file is a part of the vllm-ascend project.
16+
#
17+
18+
import os
19+
import signal
20+
import subprocess
21+
import time
22+
23+
import psutil
24+
import requests
25+
26+
27+
def kill_process_and_children(pid):
28+
try:
29+
parent = psutil.Process(pid)
30+
children = parent.children(recursive=True)
31+
for child in children:
32+
print(f"Killing child process {child.pid}")
33+
child.kill()
34+
print(f"Killing parent process {pid}")
35+
parent.kill()
36+
except psutil.NoSuchProcess:
37+
pass
38+
39+
40+
def kill_all_vllm_related():
41+
current_pid = os.getpid()
42+
43+
for proc in psutil.process_iter(['pid', 'cmdline']):
44+
try:
45+
if proc.pid == current_pid:
46+
continue
47+
cmd = ' '.join(proc.info['cmdline'])
48+
if "vllm" in cmd or "proxy" in cmd or "engine_worker" in cmd:
49+
kill_process_and_children(proc.pid)
50+
except Exception:
51+
continue
52+
53+
54+
PROXY_PORT = 10102
55+
DECODE_PORT = 8002
56+
57+
SCRIPT_PATH = os.path.abspath("./tests/e2e/run_disagg_pd.sh")
58+
59+
60+
def wait_for_port(port, timeout=30):
61+
import socket
62+
start = time.time()
63+
while time.time() - start < timeout:
64+
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
65+
if sock.connect_ex(("127.0.0.1", port)) == 0:
66+
return True
67+
time.sleep(1)
68+
raise TimeoutError(f"Port {port} not ready after {timeout}s")
69+
70+
71+
def start_and_test_pipeline():
72+
print("Launching bash script to run vLLM PD setup...")
73+
proc = subprocess.Popen(["bash", SCRIPT_PATH])
74+
try:
75+
print("Waiting for proxy port to be available...")
76+
wait_for_port(PROXY_PORT, 180)
77+
wait_for_port(DECODE_PORT, 600)
78+
79+
# request
80+
payload = {
81+
"model": "Deepseek",
82+
"prompt": "The future of AI is",
83+
"max_tokens": 64,
84+
"temperature": 0,
85+
}
86+
response = requests.post(
87+
f"http://localhost:{PROXY_PORT}/v1/completions",
88+
headers={"Content-Type": "application/json"},
89+
json=payload,
90+
timeout=10)
91+
assert response.status_code == 200, f"HTTP failed: {response.status_code}"
92+
result = response.json()
93+
print("Response:", result)
94+
assert "text" in result["choices"][0]
95+
assert len(result["choices"][0]["text"].strip()) > 0
96+
97+
finally:
98+
# clean up subprocesses
99+
print("Cleaning up subprocess...")
100+
proc.send_signal(signal.SIGINT)
101+
try:
102+
proc.wait(timeout=10)
103+
except subprocess.TimeoutExpired:
104+
proc.kill()
105+
kill_all_vllm_related()
106+
107+
108+
def test_disaggregated_pd_pipeline():
109+
start_and_test_pipeline()

tests/e2e/run_disagg_pd.sh

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
#!/bin/bash
2+
3+
#
4+
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
# This file is a part of the vllm-ascend project.
18+
#
19+
20+
set -eo errexit
21+
22+
. $(dirname "$0")/common.sh
23+
. $(dirname "$0")/pd_disaggreate/setup_pd.sh
24+
25+
export VLLM_USE_MODELSCOPE="True"
26+
27+
MODEL_NAME="deepseek-ai/DeepSeek-V2-Lite"
28+
# TODO: add tp case
29+
TP_SIZE=1
30+
31+
# TODO: support multi-card
32+
prefill_ip=$(/usr/local/Ascend/driver/tools/hccn_tool -i 0 -ip -g | grep "ipaddr" | awk -F: '{print $2}' | xargs)
33+
PREFILL_DEVICE_IPS="[\"$prefill_ip\"]"
34+
35+
decode_ip=$(/usr/local/Ascend/driver/tools/hccn_tool -i 1 -ip -g | grep "ipaddr" | awk -F: '{print $2}' | xargs)
36+
DECODE_DEVICE_IPS="[\"$decode_ip\"]"
37+
38+
_info "====> Start pd disaggregated test"
39+
REGISTER_PORT=10101
40+
PREOXY_PORT=10102
41+
run_proxy_server $REGISTER_PORT $PREOXY_PORT
42+
_info "Started pd disaggregated proxy server"
43+
44+
PREFILL_PROC_NAME="Prefill-instance"
45+
PREFILL_PORT=8001
46+
run_prefill_instance $MODEL_NAME $TP_SIZE $PREFILL_PORT $REGISTER_PORT $PREFILL_DEVICE_IPS $DECODE_DEVICE_IPS
47+
_info "Starting prefill instance"
48+
49+
wait_url_ready $PREFILL_PROC_NAME "http://localhost:${PREFILL_PORT}/v1/completions"
50+
51+
DECODE_PROC_NAME="Decode-instance"
52+
DECODE_PORT=8002
53+
run_decode_instance $MODEL_NAME $TP_SIZE $DECODE_PORT $REGISTER_PORT $PREFILL_DEVICE_IPS $DECODE_DEVICE_IPS
54+
_info "Starting decode instance"
55+
56+
wait_url_ready $DECODE_PROC_NAME "http://localhost:${DECODE_PORT}/v1/completions"
57+
58+
_info "pd disaggregated system is ready for handling request"

vllm_ascend/distributed/kv_transfer/simple_pipe.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,22 +61,22 @@ def __init__(
6161
raise NotImplementedError(
6262
"kv_role should be inside [kv_producer, kv_consumer]")
6363

64-
prompt_device_ips = kv_connector_extra_config.get(
65-
"prompt_device_ips", None)
64+
prefill_device_ips = kv_connector_extra_config.get(
65+
"prefill_device_ips", None)
6666
decode_device_ips = kv_connector_extra_config.get(
6767
"decode_device_ips", None)
68-
if prompt_device_ips is None or decode_device_ips is None:
68+
if prefill_device_ips is None or decode_device_ips is None:
6969
raise ValueError(
70-
"Please specify prompt_device_ips and decode_device_ips"
70+
"Please specify prefill_device_ips and decode_device_ips"
7171
"in kv_transfer_config.kv_connector_extra_config")
72-
p_device_num = len(prompt_device_ips)
72+
p_device_num = len(prefill_device_ips)
7373
d_device_num = len(decode_device_ips)
7474
# When number of devices in P and D is not equal,
7575
# we assume that device in D can be mapped to any device in P.
7676
self.p_device_rank = self.rank % p_device_num
7777
self.d_device_rank = self.rank % d_device_num
7878

79-
self.prompt_ip_list = prompt_device_ips
79+
self.prompt_ip_list = prefill_device_ips
8080
self.decode_ip_list = decode_device_ips
8181
self.llmdatadist_comm_port = kv_connector_extra_config.get(
8282
"llmdatadist_comm_port", 26000)
@@ -98,7 +98,7 @@ def __init__(
9898
if proxy_ip == "" or proxy_port == "":
9999
self.proxy_address = ""
100100
else:
101-
self.proxy_address = proxy_ip + ":" + proxy_port
101+
self.proxy_address = proxy_ip + ":" + str(proxy_port)
102102

103103
self._register_thread = None
104104
if port_offset == 0 and self.proxy_address != "":

0 commit comments

Comments
 (0)