diff --git a/ydb/tests/functional/compatibility/test_compatibility.py b/ydb/tests/functional/compatibility/test_compatibility.py index 10d8e15bc33a..c59ee879b462 100644 --- a/ydb/tests/functional/compatibility/test_compatibility.py +++ b/ydb/tests/functional/compatibility/test_compatibility.py @@ -20,10 +20,10 @@ current_binary_path = kikimr_driver_path() all_binary_combinations = [ - [last_stable_binary_path, current_binary_path], - [last_stable_binary_path, [last_stable_binary_path, current_binary_path]], - [current_binary_path, last_stable_binary_path], - [current_binary_path, current_binary_path], + [[last_stable_binary_path], [current_binary_path]], + [[last_stable_binary_path], [last_stable_binary_path, current_binary_path]], + [[current_binary_path], [last_stable_binary_path]], + [[current_binary_path], [current_binary_path]], ] all_binary_combinations_ids = [ "last_stable_to_current", @@ -41,7 +41,7 @@ def setup(self, request): self.all_binary_paths = request.param self.config = KikimrConfigGenerator( erasure=Erasure.MIRROR_3_DC, - binary_paths=[self.all_binary_paths[0]], + binary_paths=self.all_binary_paths[0], use_in_memory_pdisks=False, extra_feature_flags={ @@ -95,71 +95,10 @@ def _execute_command_and_get_result(self, command): return result def change_cluster_version(self, new_binary_paths): - binary_path_before = self.config.get_binary_paths() - versions_on_before = self.get_nodes_version() - if isinstance(new_binary_paths, str): - new_binary_paths = [new_binary_paths] - elif not isinstance(new_binary_paths, list): - raise ValueError("binary_paths must be a string or a list of strings") self.config.set_binary_paths(new_binary_paths) - self.cluster.update_nodes_configurator(self.config) + self.cluster.update_configurator_and_restart(self.config) + # TODO: replace with `self.driver.wait()` time.sleep(60) - versions_on_after = self.get_nodes_version() - if binary_path_before != new_binary_paths: - assert versions_on_before != versions_on_after, f'Versions on before and after should be different: {versions_on_before} {versions_on_after}' - else: - assert versions_on_before == versions_on_after, f'Versions on before and after should be the same: {versions_on_before} {versions_on_after}' - - def get_nodes_version(self): - versions = [] - for node_id, node in enumerate(self.cluster.nodes.values()): - node.get_config_version() - get_version_command = [ - yatest.common.binary_path(os.getenv("YDB_CLI_BINARY")), - "--verbose", - "--endpoint", - "grpc://localhost:%d" % node.grpc_port, - "--database=/Root", - "yql", - "--script", - f'select version() as node_{node_id}_version', - '--format', - 'json-unicode' - ] - result = yatest.common.execute(get_version_command, wait=True) - result_data = json.loads(result.std_out.decode('utf-8')) - logger.debug(f'node_{node_id}_version": {result_data}') - node_version_key = f"node_{node_id}_version" - if node_version_key in result_data: - node_version = result_data[node_version_key] - versions.append(node_version) - else: - print(f"Key {node_version_key} not found in the result.") - return versions - - def check_table_exists(driver, table_path): - try: - driver.scheme_client.describe_table(table_path) - return True - except ydb.SchemeError as e: - if e.issue_code == ydb.IssueCode.SCHEME_ERROR_NO_SUCH_TABLE: - return False - else: - raise - - def exec_query(self, query: str): - command = [ - yatest.common.binary_path(os.getenv("YDB_CLI_BINARY")), - "--verbose", - "-e", - "grpc://localhost:%d" % self.cluster.nodes[1].port, - "-d", - "/Root", - "yql", - "--script", - f"{query}" - ] - yatest.common.execute(command, wait=True, stdout=self.output_f) def execute_scan_query(self, query_body): query = ydb.ScanQuery(query_body, {}) @@ -175,24 +114,9 @@ def execute_scan_query(self, query_body): return result_set - def log_database_scheme(self): - get_scheme_command = [ - yatest.common.binary_path(os.getenv("YDB_CLI_BINARY")), - "--verbose", - "-e", - "grpc://localhost:%d" % self.cluster.nodes[1].port, - "-d", - "/Root", - "scheme", - "ls", - "-l", - "-R" - ] - yatest.common.execute(get_scheme_command, wait=True, stdout=self.output_f) - @pytest.mark.parametrize("store_type", ["row", "column"]) def test_simple(self, store_type): - def read_update_data(self, iteration_count=1, start_index=0): + def upsert_and_check_sum(self, iteration_count=1, start_index=0): id_ = start_index upsert_count = 200 @@ -219,27 +143,9 @@ def read_update_data(self, iteration_count=1, start_index=0): ) query_body = "SELECT SUM(value) as sum_value from `sample_table`" - query = ydb.ScanQuery(query_body, {}) - it = self.driver.table_client.scan_query(query) - result_set = [] + assert self.execute_scan_query(query_body)[0]['sum_value'] == upsert_count * iteration_count + start_index - while True: - try: - result = next(it) - result_set = result_set + result.result_set.rows - except StopIteration: - break - - for row in result_set: - print(" ".join([str(x) for x in list(row.values())])) - - assert len(result_set) == 1 - assert len(result_set[0]) == 1 - result = list(result_set) - assert len(result) == 1 - assert result[0]['sum_value'] == upsert_count * iteration_count + start_index - - def create_table_column(self): + def create_table(self, store_type): with ydb.SessionPool(self.driver, size=1) as pool: with pool.checkout() as session: session.execute_scheme( @@ -248,29 +154,16 @@ def create_table_column(self): payload Utf8, income Decimal(22,9), PRIMARY KEY(id) ) WITH ( - STORE = COLUMN, - AUTO_PARTITIONING_BY_SIZE = ENABLED, - AUTO_PARTITIONING_PARTITION_SIZE_MB = 1);""" - ) - - def create_table_row(self): - with ydb.SessionPool(self.driver, size=1) as pool: - with pool.checkout() as session: - session.execute_scheme( - """create table `sample_table` ( - id Uint64, value Uint64, - payload Utf8, income Decimal(22,9), - PRIMARY KEY(id) - ) WITH ( + STORE = {store_type}, AUTO_PARTITIONING_BY_SIZE = ENABLED, - AUTO_PARTITIONING_PARTITION_SIZE_MB = 1);""" + AUTO_PARTITIONING_PARTITION_SIZE_MB = 1);""".format(store_type=store_type.upper()) ) - create_table_row(self) if store_type == "row" else create_table_column(self) - read_update_data(self) + create_table(self, store_type) + upsert_and_check_sum(self) self.change_cluster_version(self.all_binary_paths[1]) assert self.execute_scan_query('select count(*) as row_count from `sample_table`')[0]['row_count'] == 200, 'Expected 200 rows after update version' - read_update_data(self, iteration_count=2, start_index=100) + upsert_and_check_sum(self, iteration_count=2, start_index=100) assert self.execute_scan_query('select count(*) as row_count from `sample_table`')[0]['row_count'] == 500, 'Expected 500 rows: update 100-200 rows and added 300 rows' @pytest.mark.parametrize("store_type", ["row", "column"]) diff --git a/ydb/tests/library/harness/kikimr_runner.py b/ydb/tests/library/harness/kikimr_runner.py index 1b5f95cad3f0..104a095a358d 100644 --- a/ydb/tests/library/harness/kikimr_runner.py +++ b/ydb/tests/library/harness/kikimr_runner.py @@ -591,7 +591,7 @@ def start_node(self, node_id): except Exception as e: raise RuntimeError("Failed to start node %s: %s" % (str(node_id), str(e))) - def update_nodes_configurator(self, configurator): + def update_configurator_and_restart(self, configurator): for node in self.nodes.values(): node.stop() self.__configurator = configurator