@@ -162,6 +162,7 @@ void rondb_set(
162
162
Uint32 value_len = argv[arg_index_start + 1 ].size ();
163
163
char varsize_param[EXTENSION_VALUE_LEN + 500 ];
164
164
Uint32 num_value_rows = 0 ;
165
+ Uint32 prev_num_rows = 0 ;
165
166
Uint64 rondb_key = 0 ;
166
167
167
168
if (value_len > INLINE_VALUE_LEN)
@@ -191,7 +192,6 @@ void rondb_set(
191
192
192
193
int ret_code = 0 ;
193
194
ret_code = create_key_row (response,
194
- ndb,
195
195
tab,
196
196
trans,
197
197
redis_key_id,
@@ -201,8 +201,8 @@ void rondb_set(
201
201
value_str,
202
202
value_len,
203
203
num_value_rows,
204
- Uint32 ( 0 ) ,
205
- &varsize_param[ 0 ] );
204
+ prev_num_rows ,
205
+ Uint32 ( 0 ) );
206
206
if (ret_code != 0 )
207
207
{
208
208
// Often unnecessary since it already failed to commit
@@ -226,41 +226,68 @@ void rondb_set(
226
226
assign_ndb_err_to_response (response, FAILED_CREATE_TXN_OBJECT, ndb->getNdbError ());
227
227
return ;
228
228
}
229
- if (delete_and_insert_key_row (response,
230
- ndb,
231
- tab,
232
- trans,
233
- redis_key_id,
234
- rondb_key,
235
- key_str,
236
- key_len,
237
- value_str,
238
- value_len,
239
- num_value_rows,
240
- Uint32 (0 ),
241
- &varsize_param[0 ]) != 0 )
242
- {
243
- ndb->closeTransaction (trans);
244
- return ;
245
- }
246
- }
247
-
248
- if (num_value_rows == 0 )
249
- {
229
+ /* *
230
+ * We don't know the exact number of value rows, but we know that it is
231
+ * at least one.
232
+ */
233
+ prev_num_rows = 1 ;
234
+ ret_code = create_key_row (response,
235
+ tab,
236
+ trans,
237
+ redis_key_id,
238
+ rondb_key,
239
+ key_str,
240
+ key_len,
241
+ value_str,
242
+ value_len,
243
+ num_value_rows,
244
+ prev_num_rows,
245
+ Uint32 (0 ));
246
+ } else if (num_value_rows == 0 ) {
250
247
ndb->closeTransaction (trans);
251
248
response->append (" +OK\r\n " );
252
249
return ;
253
250
}
254
- create_all_value_rows (response,
255
- ndb,
256
- dict,
257
- trans,
258
- rondb_key,
259
- value_str,
260
- value_len,
261
- num_value_rows,
262
- &varsize_param[0 ]);
251
+ /* *
252
+ * Coming here means that we either have to add new value rows or we have
253
+ * to delete previous value rows or both. Thus the transaction is still
254
+ * open. We start by creating the new value rows. Next we delete the
255
+ * remaining value rows from the previous instantiation of the row.
256
+ */
257
+ if (num_value_rows > 0 ) {
258
+ ret_code = create_all_value_rows (response,
259
+ ndb,
260
+ dict,
261
+ trans,
262
+ rondb_key,
263
+ value_str,
264
+ value_len,
265
+ num_value_rows,
266
+ &varsize_param[0 ]);
267
+ }
268
+ if (ret_code != 0 ) {
269
+ return ;
270
+ }
271
+ ret_code = delete_value_rows (response,
272
+ tab,
273
+ trans,
274
+ rondb_key,
275
+ num_value_rows,
276
+ prev_num_rows);
277
+ if (ret_code != 0 ) {
278
+ return ;
279
+ }
280
+ if (trans->execute (NdbTransaction::Commit,
281
+ NdbOperation::AbortOnError) == 0 &&
282
+ trans->getNdbError ().code != 0 )
283
+ {
284
+ assign_ndb_err_to_response (response,
285
+ FAILED_EXEC_TXN,
286
+ trans->getNdbError ());
287
+ return ;
288
+ }
263
289
ndb->closeTransaction (trans);
290
+ response->append (" +OK\r\n " );
264
291
return ;
265
292
}
266
293
0 commit comments