20
20
current_binary_path = kikimr_driver_path ()
21
21
22
22
all_binary_combinations = [
23
- [last_stable_binary_path , current_binary_path ],
24
- [last_stable_binary_path , [last_stable_binary_path , current_binary_path ]],
25
- [current_binary_path , last_stable_binary_path ],
26
- [current_binary_path , current_binary_path ],
23
+ [[ last_stable_binary_path ], [ current_binary_path ] ],
24
+ [[ last_stable_binary_path ] , [last_stable_binary_path , current_binary_path ]],
25
+ [[ current_binary_path ], [ last_stable_binary_path ] ],
26
+ [[ current_binary_path ], [ current_binary_path ] ],
27
27
]
28
28
all_binary_combinations_ids = [
29
29
"last_stable_to_current" ,
@@ -41,7 +41,7 @@ def setup(self, request):
41
41
self .all_binary_paths = request .param
42
42
self .config = KikimrConfigGenerator (
43
43
erasure = Erasure .MIRROR_3_DC ,
44
- binary_paths = [ self .all_binary_paths [0 ] ],
44
+ binary_paths = self .all_binary_paths [0 ],
45
45
use_in_memory_pdisks = False ,
46
46
47
47
extra_feature_flags = {
@@ -95,71 +95,10 @@ def _execute_command_and_get_result(self, command):
95
95
return result
96
96
97
97
def change_cluster_version (self , new_binary_paths ):
98
- binary_path_before = self .config .get_binary_paths ()
99
- versions_on_before = self .get_nodes_version ()
100
- if isinstance (new_binary_paths , str ):
101
- new_binary_paths = [new_binary_paths ]
102
- elif not isinstance (new_binary_paths , list ):
103
- raise ValueError ("binary_paths must be a string or a list of strings" )
104
98
self .config .set_binary_paths (new_binary_paths )
105
- self .cluster .update_nodes_configurator (self .config )
99
+ self .cluster .update_configurator_and_restart (self .config )
100
+ # TODO: replace with `self.driver.wait()`
106
101
time .sleep (60 )
107
- versions_on_after = self .get_nodes_version ()
108
- if binary_path_before != new_binary_paths :
109
- assert versions_on_before != versions_on_after , f'Versions on before and after should be different: { versions_on_before } { versions_on_after } '
110
- else :
111
- assert versions_on_before == versions_on_after , f'Versions on before and after should be the same: { versions_on_before } { versions_on_after } '
112
-
113
- def get_nodes_version (self ):
114
- versions = []
115
- for node_id , node in enumerate (self .cluster .nodes .values ()):
116
- node .get_config_version ()
117
- get_version_command = [
118
- yatest .common .binary_path (os .getenv ("YDB_CLI_BINARY" )),
119
- "--verbose" ,
120
- "--endpoint" ,
121
- "grpc://localhost:%d" % node .grpc_port ,
122
- "--database=/Root" ,
123
- "yql" ,
124
- "--script" ,
125
- f'select version() as node_{ node_id } _version' ,
126
- '--format' ,
127
- 'json-unicode'
128
- ]
129
- result = yatest .common .execute (get_version_command , wait = True )
130
- result_data = json .loads (result .std_out .decode ('utf-8' ))
131
- logger .debug (f'node_{ node_id } _version": { result_data } ' )
132
- node_version_key = f"node_{ node_id } _version"
133
- if node_version_key in result_data :
134
- node_version = result_data [node_version_key ]
135
- versions .append (node_version )
136
- else :
137
- print (f"Key { node_version_key } not found in the result." )
138
- return versions
139
-
140
- def check_table_exists (driver , table_path ):
141
- try :
142
- driver .scheme_client .describe_table (table_path )
143
- return True
144
- except ydb .SchemeError as e :
145
- if e .issue_code == ydb .IssueCode .SCHEME_ERROR_NO_SUCH_TABLE :
146
- return False
147
- else :
148
- raise
149
-
150
- def exec_query (self , query : str ):
151
- command = [
152
- yatest .common .binary_path (os .getenv ("YDB_CLI_BINARY" )),
153
- "--verbose" ,
154
- "-e" ,
155
- "grpc://localhost:%d" % self .cluster .nodes [1 ].port ,
156
- "-d" ,
157
- "/Root" ,
158
- "yql" ,
159
- "--script" ,
160
- f"{ query } "
161
- ]
162
- yatest .common .execute (command , wait = True , stdout = self .output_f )
163
102
164
103
def execute_scan_query (self , query_body ):
165
104
query = ydb .ScanQuery (query_body , {})
@@ -175,24 +114,9 @@ def execute_scan_query(self, query_body):
175
114
176
115
return result_set
177
116
178
- def log_database_scheme (self ):
179
- get_scheme_command = [
180
- yatest .common .binary_path (os .getenv ("YDB_CLI_BINARY" )),
181
- "--verbose" ,
182
- "-e" ,
183
- "grpc://localhost:%d" % self .cluster .nodes [1 ].port ,
184
- "-d" ,
185
- "/Root" ,
186
- "scheme" ,
187
- "ls" ,
188
- "-l" ,
189
- "-R"
190
- ]
191
- yatest .common .execute (get_scheme_command , wait = True , stdout = self .output_f )
192
-
193
117
@pytest .mark .parametrize ("store_type" , ["row" , "column" ])
194
118
def test_simple (self , store_type ):
195
- def read_update_data (self , iteration_count = 1 , start_index = 0 ):
119
+ def upsert_and_check_sum (self , iteration_count = 1 , start_index = 0 ):
196
120
id_ = start_index
197
121
198
122
upsert_count = 200
@@ -219,27 +143,9 @@ def read_update_data(self, iteration_count=1, start_index=0):
219
143
)
220
144
221
145
query_body = "SELECT SUM(value) as sum_value from `sample_table`"
222
- query = ydb .ScanQuery (query_body , {})
223
- it = self .driver .table_client .scan_query (query )
224
- result_set = []
146
+ assert self .execute_scan_query (query_body )[0 ]['sum_value' ] == upsert_count * iteration_count + start_index
225
147
226
- while True :
227
- try :
228
- result = next (it )
229
- result_set = result_set + result .result_set .rows
230
- except StopIteration :
231
- break
232
-
233
- for row in result_set :
234
- print (" " .join ([str (x ) for x in list (row .values ())]))
235
-
236
- assert len (result_set ) == 1
237
- assert len (result_set [0 ]) == 1
238
- result = list (result_set )
239
- assert len (result ) == 1
240
- assert result [0 ]['sum_value' ] == upsert_count * iteration_count + start_index
241
-
242
- def create_table_column (self ):
148
+ def create_table (self , store_type ):
243
149
with ydb .SessionPool (self .driver , size = 1 ) as pool :
244
150
with pool .checkout () as session :
245
151
session .execute_scheme (
@@ -248,29 +154,16 @@ def create_table_column(self):
248
154
payload Utf8, income Decimal(22,9),
249
155
PRIMARY KEY(id)
250
156
) WITH (
251
- STORE = COLUMN,
252
- AUTO_PARTITIONING_BY_SIZE = ENABLED,
253
- AUTO_PARTITIONING_PARTITION_SIZE_MB = 1);"""
254
- )
255
-
256
- def create_table_row (self ):
257
- with ydb .SessionPool (self .driver , size = 1 ) as pool :
258
- with pool .checkout () as session :
259
- session .execute_scheme (
260
- """create table `sample_table` (
261
- id Uint64, value Uint64,
262
- payload Utf8, income Decimal(22,9),
263
- PRIMARY KEY(id)
264
- ) WITH (
157
+ STORE = {store_type},
265
158
AUTO_PARTITIONING_BY_SIZE = ENABLED,
266
- AUTO_PARTITIONING_PARTITION_SIZE_MB = 1);"""
159
+ AUTO_PARTITIONING_PARTITION_SIZE_MB = 1);""" . format ( store_type = store_type . upper ())
267
160
)
268
161
269
- create_table_row (self ) if store_type == "row" else create_table_column ( self )
270
- read_update_data (self )
162
+ create_table (self , store_type )
163
+ upsert_and_check_sum (self )
271
164
self .change_cluster_version (self .all_binary_paths [1 ])
272
165
assert self .execute_scan_query ('select count(*) as row_count from `sample_table`' )[0 ]['row_count' ] == 200 , 'Expected 200 rows after update version'
273
- read_update_data (self , iteration_count = 2 , start_index = 100 )
166
+ upsert_and_check_sum (self , iteration_count = 2 , start_index = 100 )
274
167
assert self .execute_scan_query ('select count(*) as row_count from `sample_table`' )[0 ]['row_count' ] == 500 , 'Expected 500 rows: update 100-200 rows and added 300 rows'
275
168
276
169
@pytest .mark .parametrize ("store_type" , ["row" , "column" ])
0 commit comments