blob: c8776ac5ae9ac6d5bdc9986d7b10e670e3fc6bd5 [file] [log] [blame]
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package options
import (
"github.com/mongodb/mongo-go-driver/bson/primitive"
"time"
)
// ChangeStreamOptions represents all possible options to a change stream
type ChangeStreamOptions struct {
BatchSize *int32 // The number of documents to return per batch
Collation *Collation // Specifies a collation
FullDocument *FullDocument // When set to ‘updateLookup’, the change notification for partial updates will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred.
MaxAwaitTime *time.Duration // The maximum amount of time for the server to wait on new documents to satisfy a change stream query
ResumeAfter interface{} // Specifies the logical starting point for the new change stream
StartAtOperationTime *primitive.Timestamp // Ensures that a change stream will only provide changes that occurred after a timestamp.
}
// ChangeStream returns a pointer to a new ChangeStreamOptions
func ChangeStream() *ChangeStreamOptions {
return &ChangeStreamOptions{}
}
// SetBatchSize specifies the number of documents to return per batch
func (cso *ChangeStreamOptions) SetBatchSize(i int32) *ChangeStreamOptions {
cso.BatchSize = &i
return cso
}
// SetCollation specifies a collation
func (cso *ChangeStreamOptions) SetCollation(c Collation) *ChangeStreamOptions {
cso.Collation = &c
return cso
}
// SetFullDocument specifies the fullDocument option.
// When set to ‘updateLookup’, the change notification for partial updates will
// include both a delta describing the changes to the document, as well as a
// copy of the entire document that was changed from some time after the change
// occurred.
func (cso *ChangeStreamOptions) SetFullDocument(fd FullDocument) *ChangeStreamOptions {
cso.FullDocument = &fd
return cso
}
// SetMaxAwaitTime specifies the maximum amount of time for the server to wait on new documents to satisfy a change stream query
func (cso *ChangeStreamOptions) SetMaxAwaitTime(d time.Duration) *ChangeStreamOptions {
cso.MaxAwaitTime = &d
return cso
}
// SetResumeAfter specifies the logical starting point for the new change stream
func (cso *ChangeStreamOptions) SetResumeAfter(rt interface{}) *ChangeStreamOptions {
cso.ResumeAfter = rt
return cso
}
// SetStartAtOperationTime ensures that a change stream will only provide changes that occurred after a specified timestamp.
func (cso *ChangeStreamOptions) SetStartAtOperationTime(t *primitive.Timestamp) *ChangeStreamOptions {
cso.StartAtOperationTime = t
return cso
}
// MergeChangeStreamOptions combines the argued ChangeStreamOptions into a single ChangeStreamOptions in a last-one-wins fashion
func MergeChangeStreamOptions(opts ...*ChangeStreamOptions) *ChangeStreamOptions {
csOpts := ChangeStream()
for _, cso := range opts {
if cso == nil {
continue
}
if cso.BatchSize != nil {
csOpts.BatchSize = cso.BatchSize
}
if cso.Collation != nil {
csOpts.Collation = cso.Collation
}
if cso.FullDocument != nil {
csOpts.FullDocument = cso.FullDocument
}
if cso.MaxAwaitTime != nil {
csOpts.MaxAwaitTime = cso.MaxAwaitTime
}
if cso.ResumeAfter != nil {
csOpts.ResumeAfter = cso.ResumeAfter
}
if cso.StartAtOperationTime != nil {
csOpts.StartAtOperationTime = cso.StartAtOperationTime
}
}
return csOpts
}