From 76dbf88956567f7b1c938cf9db82d4314f207cc4 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Wed, 5 Mar 2025 17:18:28 +0800 Subject: [PATCH] feat: support `IncludeValue` for get operation --- oxia/async_client_impl.go | 10 +++++---- oxia/async_client_impl_test.go | 38 ++++++++++++++++++++++++++++++++++ oxia/internal/model/model.go | 3 ++- oxia/options_get.go | 20 +++++++++++++++++- 4 files changed, 65 insertions(+), 6 deletions(-) diff --git a/oxia/async_client_impl.go b/oxia/async_client_impl.go index 8e543e9d..ba3bce98 100644 --- a/oxia/async_client_impl.go +++ b/oxia/async_client_impl.go @@ -237,7 +237,7 @@ func (c *clientImpl) Get(key string, options ...GetOption) <-chan GetResult { if opts.comparisonType == proto.KeyComparisonType_EQUAL || opts.partitionKey != nil { c.doSingleShardGet(key, opts, ch) } else { - c.doFloorCeilingGet(key, opts.comparisonType, ch) + c.doFloorCeilingGet(key, opts, ch) } return ch @@ -248,6 +248,7 @@ func (c *clientImpl) doSingleShardGet(key string, opts *getOptions, ch chan GetR c.readBatchManager.Get(shardId).Add(model.GetCall{ Key: key, ComparisonType: opts.comparisonType, + IncludeValue: opts.includeValue, Callback: func(response *proto.GetResponse, err error) { ch <- toGetResult(response, key, err) close(ch) @@ -256,7 +257,7 @@ func (c *clientImpl) doSingleShardGet(key string, opts *getOptions, ch chan GetR } // The keys might get hashed to multiple shards, so we have to check on all shards and then compare the results. -func (c *clientImpl) doFloorCeilingGet(key string, comparisonType proto.KeyComparisonType, ch chan GetResult) { +func (c *clientImpl) doFloorCeilingGet(key string, options *getOptions, ch chan GetResult) { m := sync.Mutex{} var results []*proto.GetResponse shards := c.shardManager.GetAll() @@ -265,7 +266,8 @@ func (c *clientImpl) doFloorCeilingGet(key string, comparisonType proto.KeyCompa for _, shardId := range shards { c.readBatchManager.Get(shardId).Add(model.GetCall{ Key: key, - ComparisonType: comparisonType, + ComparisonType: options.comparisonType, + IncludeValue: options.includeValue, Callback: func(response *proto.GetResponse, err error) { m.Lock() defer m.Unlock() @@ -283,7 +285,7 @@ func (c *clientImpl) doFloorCeilingGet(key string, comparisonType proto.KeyCompa counter-- if counter == 0 { // We have responses from all the shards - processAllGetResponses(key, results, comparisonType, ch) + processAllGetResponses(key, results, options.comparisonType, ch) } }, }) diff --git a/oxia/async_client_impl_test.go b/oxia/async_client_impl_test.go index c22bca8b..2d84a8e6 100644 --- a/oxia/async_client_impl_test.go +++ b/oxia/async_client_impl_test.go @@ -920,3 +920,41 @@ func TestGetValueWithSessionId(t *testing.T) { assert.NotEqualValues(t, 0, r0.Version.SessionId) assert.EqualValues(t, r1.Version.SessionId, r0.Version.SessionId) } + +func TestGetWithoutValue(t *testing.T) { + standaloneServer, err := server.NewStandalone(server.NewTestConfig(t.TempDir())) + assert.NoError(t, err) + + serviceAddress := fmt.Sprintf("localhost:%d", standaloneServer.RpcPort()) + client, err := NewAsyncClient(serviceAddress) + assert.NoError(t, err) + + key := "stream" + + var keys []string + + putResult := <-client.Put(key, []byte("0"), PartitionKey(key), SequenceKeysDeltas(1)) + assert.NotNil(t, putResult.Key) + assert.NoError(t, putResult.Err) + keys = append(keys, putResult.Key) + + putResult = <-client.Put(key, []byte("1"), PartitionKey(key), SequenceKeysDeltas(1)) + assert.NotNil(t, putResult.Key) + assert.NoError(t, putResult.Err) + keys = append(keys, putResult.Key) + + for _, subKey := range keys { + result := <-client.Get(subKey, PartitionKey(key), IncludeValue(true)) + assert.NotNil(t, result.Value) + result = <-client.Get(subKey, PartitionKey(key), IncludeValue(false)) + assert.Nil(t, result.Value) + } + + result := <-client.Get(keys[0], PartitionKey(key), IncludeValue(false), ComparisonHigher()) + assert.Nil(t, result.Value) + assert.Equal(t, result.Key, keys[1]) + + result = <-client.Get(keys[1], PartitionKey(key), IncludeValue(false), ComparisonLower()) + assert.Nil(t, result.Value) + assert.Equal(t, result.Key, keys[0]) +} diff --git a/oxia/internal/model/model.go b/oxia/internal/model/model.go index 01ea81f4..8f01b8c0 100644 --- a/oxia/internal/model/model.go +++ b/oxia/internal/model/model.go @@ -45,6 +45,7 @@ type DeleteRangeCall struct { type GetCall struct { Key string ComparisonType proto.KeyComparisonType + IncludeValue bool Callback func(*proto.GetResponse, error) } @@ -79,7 +80,7 @@ func (r GetCall) ToProto() *proto.GetRequest { return &proto.GetRequest{ Key: r.Key, ComparisonType: r.ComparisonType, - IncludeValue: true, + IncludeValue: r.IncludeValue, } } diff --git a/oxia/options_get.go b/oxia/options_get.go index 90226ccd..516fa796 100644 --- a/oxia/options_get.go +++ b/oxia/options_get.go @@ -19,6 +19,7 @@ import "github.com/streamnative/oxia/proto" type getOptions struct { baseOptions comparisonType proto.KeyComparisonType + includeValue bool } // GetOption represents an option for the [SyncClient.Get] operation. @@ -27,7 +28,9 @@ type GetOption interface { } func newGetOptions(opts []GetOption) *getOptions { - getOpts := &getOptions{} + getOpts := &getOptions{ + includeValue: true, + } for _, opt := range opts { opt.applyGet(getOpts) } @@ -70,3 +73,18 @@ func ComparisonLower() GetOption { func ComparisonHigher() GetOption { return &getComparisonType{proto.KeyComparisonType_HIGHER} } + +type includeValue struct { + includeValue bool +} + +func (t *includeValue) applyGet(opts *getOptions) { + opts.includeValue = t.includeValue +} + +// IncludeValue is a function that creates a GetOption for including or excluding a value. +func IncludeValue(include bool) GetOption { + return &includeValue{ + includeValue: include, + } +}