Skip to content

Commit 875f128

Browse files
authored
Compatibility test v2 (#17565)
1 parent 7e8c644 commit 875f128

File tree

4 files changed

+327
-64
lines changed

4 files changed

+327
-64
lines changed

ydb/tests/functional/compatibility/test_compatibility.py

Lines changed: 303 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,73 @@
11
# -*- coding: utf-8 -*-
22
import boto3
3-
4-
import os
5-
3+
import time
4+
import pytest
5+
import logging
66
import yatest
7+
import os
8+
import json
79
from ydb.tests.library.harness.kikimr_runner import KiKiMR
810
from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator
911
from ydb.tests.library.harness.param_constants import kikimr_driver_path
1012
from ydb.tests.library.common.types import Erasure
1113
from ydb.tests.oss.ydb_sdk_import import ydb
1214

15+
from decimal import Decimal
16+
17+
18+
last_stable_binary_path = yatest.common.binary_path("ydb/tests/library/compatibility/ydbd-last-stable")
19+
current_binary_path = kikimr_driver_path()
20+
21+
all_binary_combinations = [
22+
[last_stable_binary_path, current_binary_path],
23+
[last_stable_binary_path, [last_stable_binary_path, current_binary_path]],
24+
[current_binary_path, last_stable_binary_path],
25+
[current_binary_path, current_binary_path],
26+
]
27+
all_binary_combinations_ids = [
28+
"last_stable_to_current",
29+
"last_stable_to_current_mixed",
30+
"current_to_last_stable",
31+
"current_to_current",
32+
]
33+
34+
logger = logging.getLogger(__name__)
35+
1336

1437
class TestCompatibility(object):
15-
@classmethod
16-
def setup_class(cls):
17-
last_stable_path = yatest.common.binary_path("ydb/tests/library/compatibility/ydbd-last-stable")
18-
binary_paths = [kikimr_driver_path(), last_stable_path]
19-
cls.cluster = KiKiMR(KikimrConfigGenerator(erasure=Erasure.MIRROR_3_DC, binary_paths=binary_paths))
20-
cls.cluster.start()
21-
cls.endpoint = "%s:%s" % (
22-
cls.cluster.nodes[1].host, cls.cluster.nodes[1].port
38+
@pytest.fixture(autouse=True, params=all_binary_combinations, ids=all_binary_combinations_ids)
39+
def setup(self, request):
40+
self.all_binary_paths = request.param
41+
self.config = KikimrConfigGenerator(
42+
erasure=Erasure.MIRROR_3_DC,
43+
binary_paths=[self.all_binary_paths[0]],
44+
use_in_memory_pdisks=False,
45+
46+
extra_feature_flags={
47+
"suppress_compatibility_check": True,
48+
# "enable_table_datetime64": True # uncomment for 64 datetime in tpc-h/tpc-ds
49+
},
50+
column_shard_config={
51+
'disabled_on_scheme_shard': False,
52+
},
2353
)
24-
cls.driver = ydb.Driver(
54+
55+
self.cluster = KiKiMR(self.config)
56+
self.cluster.start()
57+
self.endpoint = "grpc://%s:%s" % ('localhost', self.cluster.nodes[1].port)
58+
output_path = yatest.common.test_output_path()
59+
self.output_f = open(os.path.join(output_path, "out.log"), "w")
60+
self.s3_config = self.setup_s3()
61+
62+
self.driver = ydb.Driver(
2563
ydb.DriverConfig(
2664
database='/Root',
27-
endpoint=cls.endpoint
65+
endpoint=self.endpoint
2866
)
2967
)
30-
cls.driver.wait()
31-
output_path = yatest.common.test_output_path()
32-
cls.output_f = open(os.path.join(output_path, "out.log"), "w")
33-
34-
cls.s3_config = cls.setup_s3()
35-
36-
@classmethod
37-
def teardown_class(cls):
38-
if hasattr(cls, 'driver'):
39-
cls.driver.stop()
40-
41-
if hasattr(cls, 'cluster'):
42-
cls.cluster.stop(kill=True) # TODO fix
68+
self.driver.wait()
69+
yield
70+
self.cluster.stop()
4371

4472
@staticmethod
4573
def setup_s3():
@@ -56,48 +84,259 @@ def setup_s3():
5684

5785
return s3_endpoint, s3_access_key, s3_secret_key, s3_bucket
5886

59-
def test_simple(self):
60-
session = ydb.retry_operation_sync(lambda: self.driver.table_client.session().create())
87+
def change_cluster_version(self, new_binary_paths):
88+
binary_path_before = self.config.get_binary_paths()
89+
versions_on_before = self.get_nodes_version()
90+
if isinstance(new_binary_paths, str):
91+
new_binary_paths = [new_binary_paths]
92+
elif not isinstance(new_binary_paths, list):
93+
raise ValueError("binary_paths must be a string or a list of strings")
94+
self.config.set_binary_paths(new_binary_paths)
95+
self.cluster.update_nodes_configurator(self.config)
96+
time.sleep(60)
97+
versions_on_after = self.get_nodes_version()
98+
if binary_path_before != new_binary_paths:
99+
assert versions_on_before != versions_on_after, f'Versions on before and after should be different: {versions_on_before} {versions_on_after}'
100+
else:
101+
assert versions_on_before == versions_on_after, f'Versions on before and after should be the same: {versions_on_before} {versions_on_after}'
102+
103+
def get_nodes_version(self):
104+
versions = []
105+
for node_id, node in enumerate(self.cluster.nodes.values()):
106+
node.get_config_version()
107+
get_version_command = [
108+
yatest.common.binary_path(os.getenv("YDB_CLI_BINARY")),
109+
"--verbose",
110+
"--endpoint",
111+
"grpc://localhost:%d" % node.grpc_port,
112+
"--database=/Root",
113+
"yql",
114+
"--script",
115+
f'select version() as node_{node_id}_version',
116+
'--format',
117+
'json-unicode'
118+
]
119+
result = yatest.common.execute(get_version_command, wait=True)
120+
result_data = json.loads(result.std_out.decode('utf-8'))
121+
logger.debug(f'node_{node_id}_version": {result_data}')
122+
node_version_key = f"node_{node_id}_version"
123+
if node_version_key in result_data:
124+
node_version = result_data[node_version_key]
125+
versions.append(node_version)
126+
else:
127+
print(f"Key {node_version_key} not found in the result.")
128+
return versions
129+
130+
def check_table_exists(driver, table_path):
131+
try:
132+
driver.scheme_client.describe_table(table_path)
133+
return True
134+
except ydb.SchemeError as e:
135+
if e.issue_code == ydb.IssueCode.SCHEME_ERROR_NO_SUCH_TABLE:
136+
return False
137+
else:
138+
raise
139+
140+
def exec_query(self, query: str):
141+
command = [
142+
yatest.common.binary_path(os.getenv("YDB_CLI_BINARY")),
143+
"--verbose",
144+
"-e",
145+
"grpc://localhost:%d" % self.cluster.nodes[1].port,
146+
"-d",
147+
"/Root",
148+
"yql",
149+
"--script",
150+
f"{query}"
151+
]
152+
yatest.common.execute(command, wait=True, stdout=self.output_f)
153+
154+
def execute_scan_query(self, query_body):
155+
query = ydb.ScanQuery(query_body, {})
156+
it = self.driver.table_client.scan_query(query)
157+
result_set = []
158+
159+
try:
160+
while True:
161+
result = next(it)
162+
result_set.extend(result.result_set.rows)
163+
except StopIteration:
164+
pass
165+
166+
return result_set
167+
168+
def log_database_scheme(self):
169+
get_scheme_command = [
170+
yatest.common.binary_path(os.getenv("YDB_CLI_BINARY")),
171+
"--verbose",
172+
"-e",
173+
"grpc://localhost:%d" % self.cluster.nodes[1].port,
174+
"-d",
175+
"/Root",
176+
"scheme",
177+
"ls",
178+
"-l",
179+
"-R"
180+
]
181+
yatest.common.execute(get_scheme_command, wait=True, stdout=self.output_f)
182+
183+
@pytest.mark.parametrize("store_type", ["row", "column"])
184+
def test_simple(self, store_type):
185+
def read_update_data(self, iteration_count=1, start_index=0):
186+
id_ = start_index
61187

62-
with ydb.SessionPool(self.driver, size=1) as pool:
63-
with pool.checkout() as session:
64-
session.execute_scheme(
65-
"create table `sample_table` (id Uint64, value Uint64, payload Utf8, PRIMARY KEY(id)) WITH (AUTO_PARTITIONING_BY_SIZE = ENABLED, AUTO_PARTITIONING_PARTITION_SIZE_MB = 1);"
188+
upsert_count = 200
189+
iteration_count = iteration_count
190+
for i in range(iteration_count):
191+
rows = []
192+
for j in range(upsert_count):
193+
row = {}
194+
row["id"] = id_
195+
row["value"] = 1
196+
row["payload"] = "DEADBEEF" * 1024 * 16 # 128 kb
197+
row["income"] = Decimal("123.001").quantize(Decimal('0.000000000'))
198+
199+
rows.append(row)
200+
id_ += 1
201+
202+
column_types = ydb.BulkUpsertColumns()
203+
column_types.add_column("id", ydb.PrimitiveType.Uint64)
204+
column_types.add_column("value", ydb.PrimitiveType.Uint64)
205+
column_types.add_column("payload", ydb.PrimitiveType.Utf8)
206+
column_types.add_column("income", ydb.DecimalType())
207+
self.driver.table_client.bulk_upsert(
208+
"Root/sample_table", rows, column_types
66209
)
67-
id_ = 0
68-
69-
upsert_count = 200
70-
iteration_count = 1
71-
for i in range(iteration_count):
72-
rows = []
73-
for j in range(upsert_count):
74-
row = {}
75-
row["id"] = id_
76-
row["value"] = 1
77-
row["payload"] = "DEADBEEF" * 1024 * 16 # 128 kb
78-
rows.append(row)
79-
id_ += 1
80-
81-
column_types = ydb.BulkUpsertColumns()
82-
column_types.add_column("id", ydb.PrimitiveType.Uint64)
83-
column_types.add_column("value", ydb.PrimitiveType.Uint64)
84-
column_types.add_column("payload", ydb.PrimitiveType.Utf8)
85-
self.driver.table_client.bulk_upsert(
86-
"Root/sample_table", rows, column_types
210+
211+
query_body = "SELECT SUM(value) as sum_value from `sample_table`"
212+
query = ydb.ScanQuery(query_body, {})
213+
it = self.driver.table_client.scan_query(query)
214+
result_set = []
215+
216+
while True:
217+
try:
218+
result = next(it)
219+
result_set = result_set + result.result_set.rows
220+
except StopIteration:
221+
break
222+
223+
for row in result_set:
224+
print(" ".join([str(x) for x in list(row.values())]))
225+
226+
assert len(result_set) == 1
227+
assert len(result_set[0]) == 1
228+
result = list(result_set)
229+
assert len(result) == 1
230+
assert result[0]['sum_value'] == upsert_count * iteration_count + start_index
231+
232+
def create_table_column(self):
233+
with ydb.SessionPool(self.driver, size=1) as pool:
234+
with pool.checkout() as session:
235+
session.execute_scheme(
236+
"""create table `sample_table` (
237+
id Uint64 NOT NULL, value Uint64,
238+
payload Utf8, income Decimal(22,9),
239+
PRIMARY KEY(id)
240+
) WITH (
241+
STORE = COLUMN,
242+
AUTO_PARTITIONING_BY_SIZE = ENABLED,
243+
AUTO_PARTITIONING_PARTITION_SIZE_MB = 1);"""
87244
)
88245

89-
query = "SELECT SUM(value) from sample_table"
90-
result_sets = session.transaction().execute(
91-
query, commit_tx=True
92-
)
93-
for row in result_sets[0].rows:
94-
print(" ".join([str(x) for x in list(row.values())]))
95-
96-
assert len(result_sets) == 1
97-
assert len(result_sets[0].rows) == 1
98-
result = list(result_sets[0].rows[0].values())
99-
assert len(result) == 1
100-
assert result[0] == upsert_count * iteration_count
246+
def create_table_row(self):
247+
with ydb.SessionPool(self.driver, size=1) as pool:
248+
with pool.checkout() as session:
249+
session.execute_scheme(
250+
"""create table `sample_table` (
251+
id Uint64, value Uint64,
252+
payload Utf8, income Decimal(22,9),
253+
PRIMARY KEY(id)
254+
) WITH (
255+
AUTO_PARTITIONING_BY_SIZE = ENABLED,
256+
AUTO_PARTITIONING_PARTITION_SIZE_MB = 1);"""
257+
)
258+
259+
create_table_row(self) if store_type == "row" else create_table_column(self)
260+
read_update_data(self)
261+
self.change_cluster_version(self.all_binary_paths[1])
262+
assert self.execute_scan_query('select count(*) as row_count from `sample_table`')[0]['row_count'] == 200, 'Expected 200 rows after update version'
263+
read_update_data(self, iteration_count=2, start_index=100)
264+
assert self.execute_scan_query('select count(*) as row_count from `sample_table`')[0]['row_count'] == 500, 'Expected 500 rows: update 100-200 rows and added 300 rows'
265+
266+
@pytest.mark.parametrize("store_type", ["row", "column"])
267+
def test_tpch1(self, store_type):
268+
result_json_path = os.path.join(yatest.common.test_output_path(), "result.json")
269+
query_output_path = os.path.join(yatest.common.test_output_path(), "query_output.json")
270+
init_command = [
271+
yatest.common.binary_path(os.getenv("YDB_CLI_BINARY")),
272+
"--verbose",
273+
"--endpoint",
274+
"grpc://localhost:%d" % self.cluster.nodes[1].port,
275+
"--database=/Root",
276+
"workload",
277+
"tpch",
278+
"-p",
279+
"tpch",
280+
"init",
281+
"--store={}".format(store_type),
282+
"--datetime", # use 32 bit dates instead of 64 (not supported in 24-4)
283+
"--partition-size=25",
284+
]
285+
import_command = [
286+
yatest.common.binary_path(os.getenv("YDB_CLI_BINARY")),
287+
"--verbose",
288+
"--endpoint",
289+
"grpc://localhost:%d" % self.cluster.nodes[1].port,
290+
"--database=/Root",
291+
"workload",
292+
"tpch",
293+
"-p",
294+
"tpch",
295+
"import",
296+
"generator",
297+
"--scale=1",
298+
]
299+
run_command = [
300+
yatest.common.binary_path(os.getenv("YDB_CLI_BINARY")),
301+
"--verbose",
302+
"--endpoint",
303+
"grpc://localhost:%d" % self.cluster.nodes[1].port,
304+
"--database=/Root",
305+
"workload",
306+
"tpch",
307+
"-p",
308+
"tpch",
309+
"run",
310+
"--scale=1",
311+
"--exclude",
312+
"17", # not working for row tables
313+
"--check-canonical",
314+
"--retries",
315+
"5", # in row tables we have to retry query by design
316+
"--json",
317+
result_json_path,
318+
"--output",
319+
query_output_path,
320+
]
321+
clean_command = [
322+
yatest.common.binary_path(os.getenv("YDB_CLI_BINARY")),
323+
"--verbose",
324+
"--endpoint",
325+
"grpc://localhost:%d" % self.cluster.nodes[1].port,
326+
"--database=/Root",
327+
"workload",
328+
"tpch",
329+
"-p",
330+
"tpch",
331+
"clean"
332+
]
333+
334+
yatest.common.execute(init_command, wait=True, stdout=self.output_f)
335+
yatest.common.execute(import_command, wait=True, stdout=self.output_f)
336+
yatest.common.execute(run_command, wait=True, stdout=self.output_f)
337+
self.change_cluster_version(self.all_binary_paths[1])
338+
yatest.common.execute(run_command, wait=True, stdout=self.output_f)
339+
yatest.common.execute(clean_command, wait=True, stdout=self.output_f)
101340

102341
def test_export(self):
103342
s3_endpoint, s3_access_key, s3_secret_key, s3_bucket = self.s3_config

0 commit comments

Comments
 (0)