Skip to content

Commit a85174c

Browse files
authored
Merge pull request #690 from viduship/vim-rgw-accounts
CEPH-83591689: RGW Accounts: scale bidirectional uploads & sync validation with Elbencho
2 parents 25339fd + 695e4a8 commit a85174c

File tree

3 files changed

+327
-6
lines changed

3 files changed

+327
-6
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# script: test_rgw_account_management.py
2+
# CEPH-83591689
3+
config:
4+
user_count: 1
5+
bucket_count: 3
6+
test_ops:
7+
enable_versioning: true
8+
create_bucket: true
9+
create_object: false
10+
version_count: 4
11+
delete_marker: false
12+
test_account_ownership_change: false
13+
test_rgwUser_adoption_by_rgwAccount: false
14+
test_via_rgw_accounts: true
15+
tenant_name: tenant1
16+
region: shared
17+
objects_per_bucket: 2250
18+
object_size: 2K
19+
threads: 400
20+
enable_version: true
21+
put_object_elbencho: true
22+
test_bucket_sync: true
23+
local_zone_name: primary
24+
remote_zone_name: secondary
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
import json
2+
import logging
3+
import os
4+
import random
5+
import re
6+
import subprocess
7+
import threading
8+
import time
9+
10+
import boto
11+
import boto3
12+
import v2.utils.utils as utils
13+
from v2.lib.exceptions import SyncFailedError, TestExecError
14+
from v2.lib.rgw_config_opts import CephConfOp, ConfigOpts
15+
from v2.lib.s3.write_io_info import (
16+
AddUserInfo,
17+
BasicIOInfoStructure,
18+
BucketIoInfo,
19+
IOInfoInitialize,
20+
KeyIoInfo,
21+
)
22+
from v2.tests.s3_swift import reusable
23+
from v2.tests.s3_swift.reusables import rgw_accounts as accounts
24+
25+
log = logging.getLogger()
26+
27+
28+
def json_serial(obj):
29+
"""JSON serializer for objects not serializable by default json code."""
30+
if isinstance(obj, datetime):
31+
return obj.isoformat()
32+
raise TypeError(f"Type {type(obj)} not serializable")
33+
34+
35+
def get_endpoint_elbencho():
36+
"""Determines the appropriate endpoint for Elbencho based on HAProxy availability."""
37+
log.info("Checking HAProxy status")
38+
try:
39+
haproxy_status = utils.exec_shell_cmd("systemctl is-active haproxy")
40+
if haproxy_status.strip() == "active":
41+
log.info("HAProxy is active, retrieving hostname")
42+
hostname = utils.exec_shell_cmd("hostname -s").strip()
43+
return f"http://{hostname}:5000"
44+
except Exception as e:
45+
log.error(f"Failed to determine HAProxy status: {e}")
46+
return None
47+
48+
49+
def get_remote_endpoint_elbencho():
50+
"""Fetches the HAProxy-enabled hostname on the remote site."""
51+
try:
52+
remote_site_ssh_con = reusable.get_remote_conn_in_multisite()
53+
stdin, stdout, stderr = remote_site_ssh_con.exec_command(
54+
"sudo ceph orch host ls"
55+
)
56+
cmd_output = stdout.read().decode()
57+
58+
log.info(f"Remote site host list:\n{cmd_output}")
59+
60+
for line in cmd_output.split("\n"):
61+
if "ha_io" in line:
62+
remote_hostname = line.split()[0]
63+
remote_endpoint = f"http://{remote_hostname}:5000"
64+
log.info(f"Remote endpoint determined: {remote_endpoint}")
65+
return remote_endpoint
66+
67+
except Exception as e:
68+
log.error(f"Error fetching remote endpoint: {e}")
69+
70+
return None
71+
72+
73+
def install_elbencho(node_conn=None):
74+
"""Installs Elbencho if it is not already installed."""
75+
log.info("Checking if Elbencho is already installed")
76+
try:
77+
output = utils.exec_shell_cmd("/usr/local/bin/elbencho --version", node_conn)
78+
if output:
79+
log.info("Elbencho is already installed.")
80+
return
81+
except Exception as e:
82+
log.warning(f"Error checking Elbencho version: {e}")
83+
84+
log.info("Installing Elbencho...")
85+
cmds = [
86+
"wget https://github.com/breuner/elbencho/releases/download/v3.0-25/elbencho-static-x86_64.tar.gz",
87+
"tar -xf elbencho-static-x86_64.tar.gz",
88+
"sudo mv elbencho /usr/local/bin/",
89+
"sudo chmod +x /usr/local/bin/elbencho",
90+
"rm elbencho-static-x86_64.tar.gz",
91+
]
92+
for cmd in cmds:
93+
result = utils.exec_shell_cmd(cmd, node_conn)
94+
if result is False:
95+
log.error(f"Command failed: {cmd}")
96+
raise TestExecError(f"Failed to install Elbencho. Command '{cmd}' failed.")
97+
log.info("Elbencho installation complete.")
98+
99+
100+
def elbench_install_configure():
101+
"""Installs and configures Elbencho on the client and remote node if applicable."""
102+
install_elbencho()
103+
if utils.is_cluster_multisite():
104+
log.info("Cluster is multisite, installing Elbencho on remote node")
105+
try:
106+
remote_site_ssh_con = reusable.get_remote_conn_in_multisite()
107+
install_elbencho(remote_site_ssh_con)
108+
except Exception as e:
109+
log.error(f"Failed to install Elbencho on remote site: {e}")
110+
111+
112+
def run_elbencho(
113+
endpoint, zone_name, num_objects, buckets, each_user, threads, object_size
114+
):
115+
"""Runs Elbencho with specified parameters."""
116+
log.info(
117+
f"[{zone_name}] Running Elbencho workload for {num_objects} objects on buckets {buckets}"
118+
)
119+
bucket_prefix = "-".join(buckets[0].split("-")[:-1]) + "-"
120+
num_buckets = len(buckets)
121+
bucket_format = f"{bucket_prefix}{{0..{num_buckets-1}}}"
122+
elbencho_cmd = (
123+
f"time /usr/local/bin/elbencho --s3endpoints {endpoint} --s3key {each_user['access_key']} --s3secret {each_user['secret_key']} "
124+
f"-w -t {threads} -n0 -N {num_objects} -s {object_size} {bucket_format}"
125+
)
126+
output = utils.exec_shell_cmd(elbencho_cmd)
127+
if output is False:
128+
log.error(f"Elbencho execution failed on {zone_name}")
129+
return
130+
metrics = parse_elbencho_output(output)
131+
log.info(f"[{zone_name}] Performance metrics: {metrics}")
132+
133+
134+
def parse_elbencho_output(output):
135+
"""Parses Elbencho output and extracts performance metrics."""
136+
log.info("Parsing Elbencho output")
137+
if not isinstance(output, str):
138+
log.error("Invalid output received from Elbencho command.")
139+
return {}
140+
metrics = {}
141+
lines = output.split("\n")
142+
for line in lines:
143+
if "Throughput MiB/s" in line:
144+
metrics["Throughput"] = line.split()[-1]
145+
elif "IOPS" in line:
146+
metrics["IOPS"] = line.split()[-1]
147+
elif "Total MiB" in line:
148+
metrics["Total Data Written (MiB)"] = line.split()[-1]
149+
return metrics
150+
151+
152+
def verify_bucket_sync(buckets):
153+
"""Checks bucket stats on both local and remote sites to verify sync consistency."""
154+
max_retries = 480 # Retry up to 480 times (4 hours)
155+
sleep_interval = 30 # Sleep interval in seconds
156+
start_time = time.time() # Track start time
157+
158+
for bucket in buckets:
159+
if "tenant" in bucket:
160+
tenant_name, bucket_short_name = bucket.split(".", 1)
161+
bucket = f"{tenant_name}/{bucket}"
162+
163+
for attempt in range(1, max_retries + 1):
164+
165+
# Fetch local bucket stats
166+
local_stats_output = utils.exec_shell_cmd(
167+
f"radosgw-admin bucket stats --bucket {bucket}"
168+
)
169+
log.info(f"Raw local stats output for {bucket}: {local_stats_output}")
170+
171+
# Extract JSON part from local stats
172+
local_json_match = re.search(r"\{.*\}", local_stats_output, re.DOTALL)
173+
local_stats = (
174+
json.loads(local_json_match.group(0)) if local_json_match else None
175+
)
176+
177+
# Fetch remote bucket stats
178+
remote_site_ssh_con = reusable.get_remote_conn_in_multisite()
179+
stdin, stdout, stderr = remote_site_ssh_con.exec_command(
180+
f"radosgw-admin bucket stats --bucket {bucket}"
181+
)
182+
remote_stats_output = stdout.read().decode().strip()
183+
log.info(f"Raw remote stats output for {bucket}: {remote_stats_output}")
184+
185+
# Extract JSON part from remote stats
186+
remote_json_match = re.search(r"\{.*\}", remote_stats_output, re.DOTALL)
187+
remote_stats = (
188+
json.loads(remote_json_match.group(0))
189+
if remote_json_match
190+
else None
191+
)
192+
193+
if not local_stats or not remote_stats:
194+
raise SyncFailedError(
195+
f"Failed to fetch valid JSON stats for bucket: {bucket}"
196+
)
197+
198+
# Compare num_objects and size_actual across sites
199+
if (
200+
local_stats["usage"]["rgw.main"]["num_objects"]
201+
!= remote_stats["usage"]["rgw.main"]["num_objects"]
202+
):
203+
log.info(
204+
f"Waiting for bucket sync for {bucket} in {attempt}, sleep for {sleep_interval} secs and retry"
205+
)
206+
time.sleep(sleep_interval)
207+
else:
208+
log.info(
209+
f"sync got consistent for {bucket} in {attempt} attempts with a sleep of {sleep_interval} secs ."
210+
)
211+
break # No need to check further, already a mismatch
212+
if (attempt > max_retries) and (mismatched_buckets):
213+
raise SyncFailedError(
214+
f"sync status is not consistent across sites for the bucket {bucket}"
215+
)
216+
else:
217+
log.info(f"sync is consistent for {bucket}")
218+
219+
220+
def elbencho_run_put_workload(each_user, user_buckets, config):
221+
"""Runs an Elbencho PUT workload on an RGW S3 user with parallel execution."""
222+
log.info("Starting Elbencho PUT workload")
223+
elbench_install_configure()
224+
objects_per_bucket = config.test_ops.get("objects_per_bucket")
225+
object_size = config.test_ops.get("object_size")
226+
threads = config.test_ops.get("threads")
227+
is_multisite = utils.is_cluster_multisite()
228+
local_endpoint = get_endpoint_elbencho()
229+
remote_endpoint = get_remote_endpoint_elbencho() if is_multisite else None
230+
log.info(
231+
f"the local endpoint is {local_endpoint} and the remote endpoint is {remote_endpoint}"
232+
)
233+
objects_per_site = objects_per_bucket // 2 if is_multisite else objects_per_bucket
234+
235+
for version in range(config.test_ops.get("version_count", 1)):
236+
log.info(f"Running workload version {version + 1}")
237+
threads_list = []
238+
threads_list.append(
239+
threading.Thread(
240+
target=run_elbencho,
241+
args=(
242+
local_endpoint,
243+
"primary",
244+
objects_per_site,
245+
user_buckets,
246+
each_user,
247+
threads,
248+
object_size,
249+
),
250+
)
251+
)
252+
if is_multisite and remote_endpoint:
253+
threads_list.append(
254+
threading.Thread(
255+
target=run_elbencho,
256+
args=(
257+
remote_endpoint,
258+
"secondary",
259+
objects_per_site,
260+
user_buckets,
261+
each_user,
262+
threads,
263+
object_size,
264+
),
265+
)
266+
)
267+
268+
for t in threads_list:
269+
t.start()
270+
for t in threads_list:
271+
t.join()
272+
273+
if config.test_ops.get("test_bucket_sync", False) is True:
274+
verify_bucket_sync(user_buckets)
275+
276+
log.info("PUT workload completed.")

rgw/v2/tests/s3_swift/test_rgw_account_management.py

+27-6
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
3. Future work : testing the account quota management
66
The script is test_rgw_account_management.py
77
The Input yamls are
8-
# test_account_ownership_change_user_adoption.yaml
8+
# configs/test_account_ownership_change_user_adoption.yaml
9+
# multisite_configs/test_rgw_accounts_at_scale.yaml
910
1011
Usage is test_rgw_account_management.py -c <path_to_config_file>
1112
@@ -26,10 +27,11 @@
2627
from v2.lib.exceptions import RGWBaseException, TestExecError
2728
from v2.lib.resource_op import Config
2829
from v2.lib.s3.auth import Auth
29-
from v2.lib.s3.write_io_info import BasicIOInfoStructure, IOInfoInitialize
30+
from v2.lib.s3.write_io_info import BasicIOInfoStructure, BucketIoInfo, IOInfoInitialize
3031
from v2.tests.s3_swift import reusable
3132
from v2.tests.s3_swift.reusables import quota_management as quota_mgmt
3233
from v2.tests.s3_swift.reusables import rgw_accounts as accounts
34+
from v2.tests.s3_swift.reusables import rgw_s3_elbencho as elbencho
3335
from v2.utils.log import configure_logging
3436
from v2.utils.test_desc import AddTestInfo
3537

@@ -42,25 +44,41 @@
4244
def test_exec(config, ssh_con):
4345
io_info_initialize = IOInfoInitialize()
4446
basic_io_structure = BasicIOInfoStructure()
47+
write_bucket_io_info = BucketIoInfo()
48+
4549
io_info_initialize.initialize(basic_io_structure.initial())
4650

47-
log.info(f"Creating {config.user_count} users")
48-
all_users_info = s3lib.create_users(config.user_count)
51+
if config.test_ops.get("test_via_rgw_accounts", False) is True:
52+
# create rgw account, account root user, iam user and return iam user details
53+
tenant_name = config.test_ops.get("tenant_name")
54+
region = config.test_ops.get("region")
55+
all_users_info = accounts.create_rgw_account_with_iam_user(
56+
config,
57+
tenant_name,
58+
region,
59+
)
60+
else:
61+
log.info(f"Creating {config.user_count} users")
62+
all_users_info = s3lib.create_users(config.user_count)
4963

5064
for each_user in all_users_info:
5165
auth = Auth(each_user, ssh_con, ssl=config.ssl)
5266
rgw_conn = auth.do_auth()
5367
log.info(f"Creating {config.bucket_count} buckets for {each_user['user_id']}")
68+
user_buckets = [] # Store buckets for this user
5469

5570
for bc in range(config.bucket_count):
5671
bucket_name = utils.gen_bucket_name_from_userid(
5772
each_user["user_id"], rand_no=bc
5873
)
74+
user_buckets.append(bucket_name)
5975
bucket = reusable.create_bucket(bucket_name, rgw_conn, each_user)
6076

6177
if config.test_ops.get("enable_version", False):
6278
log.info("Enable bucket versioning")
63-
reusable.enable_versioning(bucket, rgw_conn, each_user)
79+
reusable.enable_versioning(
80+
bucket, rgw_conn, each_user, write_bucket_io_info
81+
)
6482

6583
# First executor for user adoption
6684
if config.test_ops.get("test_rgwUser_adoption_by_rgwAccount", False):
@@ -142,13 +160,16 @@ def test_exec(config, ssh_con):
142160
)
143161

144162
concurrent.futures.wait(futures)
145-
146163
if config.test_ops.get("test_account_ownership_change", False):
147164
log.info(
148165
"Test RGW account ownership change of 2 RGW users belonging to different accounts."
149166
)
150167
accounts.account_ownership_change(config)
151168

169+
if config.test_ops.get("put_object_elbencho", False):
170+
time.sleep(30)
171+
elbencho.elbencho_run_put_workload(each_user, user_buckets, config)
172+
152173
crash_info = reusable.check_for_crash()
153174
if crash_info:
154175
raise TestExecError("Ceph daemon crash found!")

0 commit comments

Comments
 (0)