Skip to content

Commit

Permalink
feat: support IncludeValue for get operation
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao committed Mar 5, 2025
1 parent d434094 commit 76dbf88
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 6 deletions.
10 changes: 6 additions & 4 deletions oxia/async_client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (c *clientImpl) Get(key string, options ...GetOption) <-chan GetResult {
if opts.comparisonType == proto.KeyComparisonType_EQUAL || opts.partitionKey != nil {
c.doSingleShardGet(key, opts, ch)
} else {
c.doFloorCeilingGet(key, opts.comparisonType, ch)
c.doFloorCeilingGet(key, opts, ch)
}

return ch
Expand All @@ -248,6 +248,7 @@ func (c *clientImpl) doSingleShardGet(key string, opts *getOptions, ch chan GetR
c.readBatchManager.Get(shardId).Add(model.GetCall{
Key: key,
ComparisonType: opts.comparisonType,
IncludeValue: opts.includeValue,
Callback: func(response *proto.GetResponse, err error) {
ch <- toGetResult(response, key, err)
close(ch)
Expand All @@ -256,7 +257,7 @@ func (c *clientImpl) doSingleShardGet(key string, opts *getOptions, ch chan GetR
}

// The keys might get hashed to multiple shards, so we have to check on all shards and then compare the results.
func (c *clientImpl) doFloorCeilingGet(key string, comparisonType proto.KeyComparisonType, ch chan GetResult) {
func (c *clientImpl) doFloorCeilingGet(key string, options *getOptions, ch chan GetResult) {
m := sync.Mutex{}
var results []*proto.GetResponse
shards := c.shardManager.GetAll()
Expand All @@ -265,7 +266,8 @@ func (c *clientImpl) doFloorCeilingGet(key string, comparisonType proto.KeyCompa
for _, shardId := range shards {
c.readBatchManager.Get(shardId).Add(model.GetCall{
Key: key,
ComparisonType: comparisonType,
ComparisonType: options.comparisonType,
IncludeValue: options.includeValue,
Callback: func(response *proto.GetResponse, err error) {
m.Lock()
defer m.Unlock()
Expand All @@ -283,7 +285,7 @@ func (c *clientImpl) doFloorCeilingGet(key string, comparisonType proto.KeyCompa
counter--
if counter == 0 {
// We have responses from all the shards
processAllGetResponses(key, results, comparisonType, ch)
processAllGetResponses(key, results, options.comparisonType, ch)
}
},
})
Expand Down
38 changes: 38 additions & 0 deletions oxia/async_client_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,3 +920,41 @@ func TestGetValueWithSessionId(t *testing.T) {
assert.NotEqualValues(t, 0, r0.Version.SessionId)
assert.EqualValues(t, r1.Version.SessionId, r0.Version.SessionId)
}

func TestGetWithoutValue(t *testing.T) {
standaloneServer, err := server.NewStandalone(server.NewTestConfig(t.TempDir()))
assert.NoError(t, err)

serviceAddress := fmt.Sprintf("localhost:%d", standaloneServer.RpcPort())
client, err := NewAsyncClient(serviceAddress)
assert.NoError(t, err)

key := "stream"

var keys []string

putResult := <-client.Put(key, []byte("0"), PartitionKey(key), SequenceKeysDeltas(1))
assert.NotNil(t, putResult.Key)
assert.NoError(t, putResult.Err)
keys = append(keys, putResult.Key)

putResult = <-client.Put(key, []byte("1"), PartitionKey(key), SequenceKeysDeltas(1))
assert.NotNil(t, putResult.Key)
assert.NoError(t, putResult.Err)
keys = append(keys, putResult.Key)

for _, subKey := range keys {
result := <-client.Get(subKey, PartitionKey(key), IncludeValue(true))
assert.NotNil(t, result.Value)
result = <-client.Get(subKey, PartitionKey(key), IncludeValue(false))
assert.Nil(t, result.Value)
}

result := <-client.Get(keys[0], PartitionKey(key), IncludeValue(false), ComparisonHigher())
assert.Nil(t, result.Value)
assert.Equal(t, result.Key, keys[1])

result = <-client.Get(keys[1], PartitionKey(key), IncludeValue(false), ComparisonLower())
assert.Nil(t, result.Value)
assert.Equal(t, result.Key, keys[0])
}
3 changes: 2 additions & 1 deletion oxia/internal/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type DeleteRangeCall struct {
type GetCall struct {
Key string
ComparisonType proto.KeyComparisonType
IncludeValue bool
Callback func(*proto.GetResponse, error)
}

Expand Down Expand Up @@ -79,7 +80,7 @@ func (r GetCall) ToProto() *proto.GetRequest {
return &proto.GetRequest{
Key: r.Key,
ComparisonType: r.ComparisonType,
IncludeValue: true,
IncludeValue: r.IncludeValue,
}
}

Expand Down
20 changes: 19 additions & 1 deletion oxia/options_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import "github.com/streamnative/oxia/proto"
type getOptions struct {
baseOptions
comparisonType proto.KeyComparisonType
includeValue bool
}

// GetOption represents an option for the [SyncClient.Get] operation.
Expand All @@ -27,7 +28,9 @@ type GetOption interface {
}

func newGetOptions(opts []GetOption) *getOptions {
getOpts := &getOptions{}
getOpts := &getOptions{
includeValue: true,
}
for _, opt := range opts {
opt.applyGet(getOpts)
}
Expand Down Expand Up @@ -70,3 +73,18 @@ func ComparisonLower() GetOption {
func ComparisonHigher() GetOption {
return &getComparisonType{proto.KeyComparisonType_HIGHER}
}

type includeValue struct {
includeValue bool
}

func (t *includeValue) applyGet(opts *getOptions) {
opts.includeValue = t.includeValue
}

// IncludeValue is a function that creates a GetOption for including or excluding a value.
func IncludeValue(include bool) GetOption {
return &includeValue{
includeValue: include,
}
}

0 comments on commit 76dbf88

Please sign in to comment.