@@ -615,10 +615,163 @@ int rondb_get_rondb_key(const NdbDictionary::Table *tab,
615
615
return 0 ;
616
616
}
617
617
618
+ #define RONDB_INSERT 2
619
+ #define RONDB_UPDATE 1
620
+ #define REG0 0
621
+ #define REG1 1
622
+ #define REG2 2
623
+ #define REG3 3
624
+ #define REG4 4
625
+ #define REG5 5
626
+ #define REG6 6
627
+ #define REG7 7
628
+ #define LABEL0 0
618
629
void incr_key_row (std::string *response,
619
630
Ndb *ndb,
620
631
const NdbDictionary::Table *tab,
621
632
NdbTransaction *trans,
622
633
struct key_table *key_row) {
623
- return ;
634
+
635
+ const NdbDictionary::Column *value_start_col = tab->getColumn (" value_start" );
636
+ const NdbDictionary::Column *tot_value_len_col = tab->getColumn (" tot_value_len" );
637
+
638
+ NdbOperation::OperationOptions opts;
639
+ std::memset (&opts, 0 , sizeof (opts));
640
+ /* *
641
+ * The mask specifies which columns is to be updated after the interpreter
642
+ * has finished. The values are set in the key_row.
643
+ * We have 7 columns, we will update tot_value_len in interpreter, same with
644
+ * value_start.
645
+ *
646
+ * The rest, redis_key, rondb_key, value_data_type, num_rows and expiry_date
647
+ * are updated through final update.
648
+ */
649
+
650
+ const Uint32 mask = 0x57 ;
651
+ const unsigned char *mask_ptr = (const unsigned char *)&mask;
652
+
653
+ // redis_key already set as this is the Primary key
654
+ key_row->null_bits = 1 ; // Set rondb_key to NULL, first NULL column
655
+ key_row->num_rows = 0 ;
656
+ key_row->value_data_type = 0 ;
657
+ key_row->expiry_date = 0 ;
658
+
659
+ /* Define the interpreted program */
660
+ Uint32 code_buffer[128 ];
661
+ NdbInterpretedCode code (tab, &code_buffer[0 ], sizeof (code_buffer));
662
+ code.load_const_u16 (REG0, 4 ); // Memory offset 0
663
+ code.load_const_u16 (REG6, 0 ); // Memory offset 0
664
+ int ret_code = code.load_op_type (REG1); // Read operation type into register 1
665
+ code.branch_eq_const (REG1, RONDB_INSERT, LABEL0); // Inserts go to label 1
666
+
667
+ /* *
668
+ * The first 4 bytes of the memory must be kept for the Attribute header
669
+ * REG0 Memory offset == 4
670
+ * REG1 Memory offset == 6
671
+ * REG2 Size of value_start
672
+ * REG3 Size of value_start without length bytes
673
+ * REG4 Old integer value after conversion
674
+ * REG5 New integer value after increment
675
+ * REG6 Memory offset == 0
676
+ * REG7 not used
677
+ */
678
+ /* UPDATE code */
679
+ code.read_full (value_start_col, REG6, REG2); // Read value_start column
680
+ code.load_const_u16 (REG1, 6 );// Memory offset 2
681
+ code.sub_const_reg (REG3, REG2, 2 );// Subtract 2 from length
682
+ code.str_to_int64 (REG4, REG1, REG3);// Convert string to number into register 6
683
+ code.add_const_reg (REG5, REG4, 1 ); // New integer value in register 6
684
+ code.int64_to_str (REG3, REG1, REG5);// Convert to string
685
+ code.add_const_reg (REG2, REG3, 2 ); // New value_start length
686
+ code.convert_size (REG3, REG0); // Write back length bytes in memory
687
+
688
+ code.write_interpreter_output (REG5, 0 ); // Write into output index 0
689
+ code.write_from_mem (value_start_col, REG6, REG2); // Write to column
690
+ code.write_attr (tot_value_len_col, REG3);
691
+ code.interpret_exit_ok ();
692
+
693
+ /* INSERT code */
694
+ code.def_label (LABEL0);
695
+ code.load_const_u16 (REG5, 1 );
696
+ code.load_const_u16 (REG3, 1 );
697
+ code.write_interpreter_output (REG5, 0 ); // Write into output index 0
698
+
699
+ Uint32 insert_value;
700
+ Uint8 *insert_value_ptr = (Uint8*)&insert_value;
701
+ insert_value_ptr[0 ] = 1 ; // Length is 1
702
+ insert_value_ptr[1 ] = 0 ; // Second length byte is 0
703
+ insert_value_ptr[2 ] = ' 1' ; // Inserts a string '1'
704
+ insert_value_ptr[3 ] = 0 ;
705
+
706
+ code.load_const_mem (REG0, REG2, 3 , &insert_value);// Load to memory
707
+ code.write_from_mem (value_start_col, REG6, REG2); // Write to column
708
+ code.write_attr (tot_value_len_col, REG3);
709
+ code.interpret_exit_ok ();
710
+
711
+ /* Program end, now compile code */
712
+ ret_code = code.finalise ();
713
+ if (ret_code != 0 ) {
714
+ assign_ndb_err_to_response (response,
715
+ " Failed to create Interpreted code" ,
716
+ code.getNdbError ());
717
+ return ;
718
+ }
719
+
720
+ /* Prepare the interpreted program to be part of the write */
721
+ opts.optionsPresent |= NdbOperation::OperationOptions::OO_INTERPRETED;
722
+ opts.optionsPresent |= NdbOperation::OperationOptions::OO_INTERPRETED_INSERT;
723
+ opts.interpretedCode = &code;
724
+
725
+ /* *
726
+ * Prepare to get the final value of the Redis row after INCR is finished
727
+ * This is performed by the reading the pseudo column that is reading the
728
+ * output index written in interpreter program.
729
+ */
730
+ NdbOperation::GetValueSpec getvals[1 ];
731
+ getvals[0 ].appStorage = nullptr ;
732
+ getvals[0 ].recAttr = nullptr ;
733
+ getvals[0 ].column = NdbDictionary::Column::READ_INTERPRETER_OUTPUT_0;
734
+ opts.optionsPresent |= NdbOperation::OperationOptions::OO_GET_FINAL_VALUE;
735
+ opts.numExtraGetFinalValues = 1 ;
736
+ opts.extraGetFinalValues = getvals;
737
+
738
+ /* Define the actual operation to be sent to RonDB data node. */
739
+ const NdbOperation *op = trans->writeTuple (
740
+ pk_key_record,
741
+ (const char *)key_row,
742
+ entire_key_record,
743
+ (char *)key_row,
744
+ mask_ptr,
745
+ &opts,
746
+ sizeof (opts));
747
+ if (op == nullptr ) {
748
+ assign_ndb_err_to_response (response,
749
+ " Failed to create NdbOperation" ,
750
+ trans->getNdbError ());
751
+ return ;
752
+ }
753
+
754
+ /* Send to RonDB and execute the INCR operation */
755
+ if (trans->execute (NdbTransaction::Commit,
756
+ NdbOperation::AbortOnError) != 0 ||
757
+ trans->getNdbError ().code != 0 )
758
+ {
759
+ assign_ndb_err_to_response (response,
760
+ FAILED_INCR_KEY,
761
+ trans->getNdbError ());
762
+ return ;
763
+ }
764
+
765
+ /* Retrieve the returned new value as an Int64 value */
766
+ NdbRecAttr *recAttr = getvals[0 ].recAttr ;
767
+ Int64 new_incremented_value = recAttr->int64_value ();
768
+
769
+ /* Send the return message to Redis client */
770
+ char header_buf[20 ];
771
+ int header_len = write_formatted (header_buf,
772
+ sizeof (header_buf),
773
+ " :%lld\r\n " ,
774
+ new_incremented_value);
775
+ response->assign (header_buf);
776
+ return ;
624
777
}
0 commit comments