Skip to content

GODRIVER-3522 Add support for the rawData option for time-series bucket access. #2079

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
[submodule "specifications"]
path = testdata/specifications
url = https://github.com/mongodb/specifications
url = https://github.com/qingyang-hu/specifications.git
branch = drivers3064
Original file line number Diff line number Diff line change
Expand Up @@ -1500,6 +1500,8 @@ func createFindCursor(ctx context.Context, operation *operation) (*cursorResult,
case "maxAwaitTimeMS":
maxAwaitTimeMS := time.Duration(val.Int32()) * time.Millisecond
opts.SetMaxAwaitTime(maxAwaitTimeMS)
case "rawData":
opts.SetRawBucketsData(val.Boolean())
default:
return nil, fmt.Errorf("unrecognized find option %q", key)
}
Expand Down
13 changes: 13 additions & 0 deletions mongo/bulk_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type bulkWrite struct {
writeConcern *writeconcern.WriteConcern
result BulkWriteResult
let interface{}
rawBucketsData *bool
}

func (bw *bulkWrite) execute(ctx context.Context) error {
Expand Down Expand Up @@ -209,6 +210,10 @@ func (bw *bulkWrite) runInsert(ctx context.Context, batch bulkWriteBatch) (opera
}
op = op.Retry(retry)

if bw.rawBucketsData != nil {
op.RawBucketsData(*bw.rawBucketsData)
}

err := op.Execute(ctx)

return op.Result(), err
Expand Down Expand Up @@ -282,6 +287,10 @@ func (bw *bulkWrite) runDelete(ctx context.Context, batch bulkWriteBatch) (opera
}
op = op.Retry(retry)

if bw.rawBucketsData != nil {
op.RawBucketsData(*bw.rawBucketsData)
}

err := op.Execute(ctx)

return op.Result(), err
Expand Down Expand Up @@ -415,6 +424,10 @@ func (bw *bulkWrite) runUpdate(ctx context.Context, batch bulkWriteBatch) (opera
}
op = op.Retry(retry)

if bw.rawBucketsData != nil {
op.RawBucketsData(*bw.rawBucketsData)
}

err := op.Execute(ctx)

return op.Result(), err
Expand Down
1 change: 1 addition & 0 deletions mongo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,7 @@ func (c *Client) BulkWrite(ctx context.Context, writes []ClientBulkWrite,
client: c,
selector: selector,
writeConcern: wc,
rawBucketsData: bwo.RawBucketsData,
}
if bwo.VerboseResults == nil || !(*bwo.VerboseResults) {
op.errorsOnly = true
Expand Down
4 changes: 4 additions & 0 deletions mongo/client_bulk_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type clientBulkWrite struct {
client *Client
selector description.ServerSelector
writeConcern *writeconcern.WriteConcern
rawBucketsData *bool

result ClientBulkWriteResult
}
Expand Down Expand Up @@ -143,6 +144,9 @@ func (bw *clientBulkWrite) newCommand() func([]byte, description.SelectedServer)
}
dst = bsoncore.AppendDocumentElement(dst, "let", let)
}
if bw.rawBucketsData != nil {
dst = bsoncore.AppendBooleanElement(dst, "rawData", *bw.rawBucketsData)
}
return dst, nil
}
}
Expand Down
49 changes: 45 additions & 4 deletions mongo/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ func (coll *Collection) BulkWrite(ctx context.Context, models []WriteModel,
selector: selector,
writeConcern: wc,
let: args.Let,
rawBucketsData: args.RawBucketsData,
}

err = op.execute(ctx)
Expand Down Expand Up @@ -324,6 +325,9 @@ func (coll *Collection) insert(
if args.Ordered != nil {
op = op.Ordered(*args.Ordered)
}
if args.RawBucketsData != nil {
op = op.RawBucketsData(*args.RawBucketsData)
}
retry := driver.RetryNone
if coll.client.retryWrites {
retry = driver.RetryOncePerCommand
Expand Down Expand Up @@ -375,6 +379,9 @@ func (coll *Collection) InsertOne(ctx context.Context, document interface{},
if args.Comment != nil {
imOpts.SetComment(args.Comment)
}
if args.RawBucketsData != nil {
imOpts = imOpts.SetRawBucketsData(*args.RawBucketsData)
}
res, err := coll.insert(ctx, []interface{}{document}, imOpts)

rr, err := processWriteError(err)
Expand Down Expand Up @@ -534,6 +541,9 @@ func (coll *Collection) delete(
}
op = op.Let(let)
}
if args.RawBucketsData != nil {
op = op.RawBucketsData(*args.RawBucketsData)
}

// deleteMany cannot be retried
retryMode := driver.RetryNone
Expand Down Expand Up @@ -571,10 +581,11 @@ func (coll *Collection) DeleteOne(
return nil, fmt.Errorf("failed to construct options from builder: %w", err)
}
deleteOptions := &options.DeleteManyOptions{
Collation: args.Collation,
Comment: args.Comment,
Hint: args.Hint,
Let: args.Let,
Collation: args.Collation,
Comment: args.Comment,
Hint: args.Hint,
Let: args.Let,
RawBucketsData: args.RawBucketsData,
}

return coll.delete(ctx, filter, true, rrOne, deleteOptions)
Expand Down Expand Up @@ -681,6 +692,9 @@ func (coll *Collection) updateOrReplace(
}
op = op.Comment(comment)
}
if args.RawBucketsData != nil {
op = op.RawBucketsData(*args.RawBucketsData)
}
retry := driver.RetryNone
// retryable writes are only enabled updateOne/replaceOne operations
if !multi && coll.client.retryWrites {
Expand Down Expand Up @@ -775,6 +789,7 @@ func (coll *Collection) UpdateOne(
Hint: args.Hint,
Upsert: args.Upsert,
Let: args.Let,
RawBucketsData: args.RawBucketsData,
}

return coll.updateOrReplace(ctx, f, update, false, rrOne, true, args.Sort, updateOptions)
Expand Down Expand Up @@ -865,6 +880,7 @@ func (coll *Collection) ReplaceOne(
Hint: args.Hint,
Let: args.Let,
Comment: args.Comment,
RawBucketsData: args.RawBucketsData,
}

return coll.updateOrReplace(ctx, f, r, false, rrOne, false, args.Sort, updateOptions)
Expand Down Expand Up @@ -1036,6 +1052,9 @@ func aggregate(a aggregateParams, opts ...options.Lister[options.AggregateOption
}
op.CustomOptions(customOptions)
}
if args.RawBucketsData != nil {
op = op.RawBucketsData(*args.RawBucketsData)
}

retry := driver.RetryNone
if a.retryRead && !hasOutputStage {
Expand Down Expand Up @@ -1124,6 +1143,9 @@ func (coll *Collection) CountDocuments(ctx context.Context, filter interface{},
}
op.Hint(hintVal)
}
if args.RawBucketsData != nil {
op = op.RawBucketsData(*args.RawBucketsData)
}
retry := driver.RetryNone
if coll.client.retryReads {
retry = driver.RetryOncePerCommand
Expand Down Expand Up @@ -1205,6 +1227,9 @@ func (coll *Collection) EstimatedDocumentCount(
}
op = op.Comment(comment)
}
if args.RawBucketsData != nil {
op = op.RawBucketsData(*args.RawBucketsData)
}

retry := driver.RetryNone
if coll.client.retryReads {
Expand Down Expand Up @@ -1294,6 +1319,9 @@ func (coll *Collection) Distinct(
}
op.Hint(hint)
}
if args.RawBucketsData != nil {
op = op.RawBucketsData(*args.RawBucketsData)
}
retry := driver.RetryNone
if coll.client.retryReads {
retry = driver.RetryOncePerCommand
Expand Down Expand Up @@ -1497,6 +1525,9 @@ func (coll *Collection) find(
}
op.Sort(sort)
}
if args.RawBucketsData != nil {
op = op.RawBucketsData(*args.RawBucketsData)
}
retry := driver.RetryNone
if coll.client.retryReads {
retry = driver.RetryOncePerCommand
Expand Down Expand Up @@ -1530,6 +1561,7 @@ func newFindArgsFromFindOneArgs(args *options.FindOneOptions) *options.FindOptio
v.ShowRecordID = args.ShowRecordID
v.Skip = args.Skip
v.Sort = args.Sort
v.RawBucketsData = args.RawBucketsData
}
return v
}
Expand Down Expand Up @@ -1692,6 +1724,9 @@ func (coll *Collection) FindOneAndDelete(
}
op = op.Let(let)
}
if args.RawBucketsData != nil {
op = op.RawBucketsData(*args.RawBucketsData)
}

return coll.findAndModify(ctx, op)
}
Expand Down Expand Up @@ -1789,6 +1824,9 @@ func (coll *Collection) FindOneAndReplace(
}
op = op.Let(let)
}
if args.RawBucketsData != nil {
op = op.RawBucketsData(*args.RawBucketsData)
}

return coll.findAndModify(ctx, op)
}
Expand Down Expand Up @@ -1898,6 +1936,9 @@ func (coll *Collection) FindOneAndUpdate(
}
op = op.Let(let)
}
if args.RawBucketsData != nil {
op = op.RawBucketsData(*args.RawBucketsData)
}

return coll.findAndModify(ctx, op)
}
Expand Down
13 changes: 13 additions & 0 deletions mongo/options/aggregateoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type AggregateOptions struct {
Hint interface{}
Let interface{}
Custom bson.M
RawBucketsData *bool
}

// AggregateOptionsBuilder contains options to configure aggregate operations.
Expand Down Expand Up @@ -163,3 +164,15 @@ func (ao *AggregateOptionsBuilder) SetCustom(c bson.M) *AggregateOptionsBuilder

return ao
}

// SetRawBucketsData sets the value for the RawBucketsData field. If true, it allows the CRUD operations to access timeseries
// collections on the bucket-level. This option is only valid for MongoDB versions >= 9.0. The default value is false.
func (ao *AggregateOptionsBuilder) SetRawBucketsData(rawBucketsData bool) *AggregateOptionsBuilder {
ao.Opts = append(ao.Opts, func(opts *AggregateOptions) error {
opts.RawBucketsData = &rawBucketsData

return nil
})

return ao
}
13 changes: 13 additions & 0 deletions mongo/options/bulkwriteoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type BulkWriteOptions struct {
Comment interface{}
Ordered *bool
Let interface{}
RawBucketsData *bool
}

// BulkWriteOptionsBuilder contains options to configure bulk write operations.
Expand Down Expand Up @@ -92,3 +93,15 @@ func (b *BulkWriteOptionsBuilder) SetLet(let interface{}) *BulkWriteOptionsBuild

return b
}

// SetRawBucketsData sets the value for the RawBucketsData field. If true, it allows the CRUD operations to access timeseries
// collections on the bucket-level. This option is only valid for MongoDB versions >= 9.0. The default value is false.
func (b *BulkWriteOptionsBuilder) SetRawBucketsData(rawBucketsData bool) *BulkWriteOptionsBuilder {
b.Opts = append(b.Opts, func(opts *BulkWriteOptions) error {
opts.RawBucketsData = &rawBucketsData

return nil
})

return b
}
13 changes: 13 additions & 0 deletions mongo/options/clientbulkwriteoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type ClientBulkWriteOptions struct {
Ordered *bool
Let interface{}
WriteConcern *writeconcern.WriteConcern
RawBucketsData *bool
VerboseResults *bool
}

Expand Down Expand Up @@ -108,6 +109,18 @@ func (b *ClientBulkWriteOptionsBuilder) SetWriteConcern(wc *writeconcern.WriteCo
return b
}

// SetRawBucketsData sets the value for the RawBucketsData field. If true, it allows the CRUD operations to access timeseries
// collections on the bucket-level. This option is only valid for MongoDB versions >= 9.0. The default value is false.
func (b *ClientBulkWriteOptionsBuilder) SetRawBucketsData(rawBucketsData bool) *ClientBulkWriteOptionsBuilder {
b.Opts = append(b.Opts, func(opts *ClientBulkWriteOptions) error {
opts.RawBucketsData = &rawBucketsData

return nil
})

return b
}

// SetVerboseResults sets the value for the VerboseResults field. Specifies whether detailed
// results for each successful operation should be included in the returned BulkWriteResult.
// The defaults value is false.
Expand Down
23 changes: 18 additions & 5 deletions mongo/options/countoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ package options
//
// See corresponding setter methods for documentation.
type CountOptions struct {
Collation *Collation
Comment interface{}
Hint interface{}
Limit *int64
Skip *int64
Collation *Collation
Comment interface{}
Hint interface{}
Limit *int64
Skip *int64
RawBucketsData *bool
}

// CountOptionsBuilder contains options to configure count operations. Each
Expand Down Expand Up @@ -99,3 +100,15 @@ func (co *CountOptionsBuilder) SetSkip(i int64) *CountOptionsBuilder {

return co
}

// SetRawBucketsData sets the value for the RawBucketsData field. If true, it allows the CRUD operations to access timeseries
// collections on the bucket-level. This option is only valid for MongoDB versions >= 9.0. The default value is false.
func (co *CountOptionsBuilder) SetRawBucketsData(rawBucketsData bool) *CountOptionsBuilder {
co.Opts = append(co.Opts, func(opts *CountOptions) error {
opts.RawBucketsData = &rawBucketsData

return nil
})

return co
}
Loading