blob: c8776ac5ae9ac6d5bdc9986d7b10e670e3fc6bd5 [file] [log] [blame]
Don Newton379ae252019-04-01 12:17:06 -04001// Copyright (C) MongoDB, Inc. 2017-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package options
8
9import (
10 "github.com/mongodb/mongo-go-driver/bson/primitive"
11 "time"
12)
13
14// ChangeStreamOptions represents all possible options to a change stream
15type ChangeStreamOptions struct {
16 BatchSize *int32 // The number of documents to return per batch
17 Collation *Collation // Specifies a collation
18 FullDocument *FullDocument // When set to ‘updateLookup’, the change notification for partial updates will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred.
19 MaxAwaitTime *time.Duration // The maximum amount of time for the server to wait on new documents to satisfy a change stream query
20 ResumeAfter interface{} // Specifies the logical starting point for the new change stream
21 StartAtOperationTime *primitive.Timestamp // Ensures that a change stream will only provide changes that occurred after a timestamp.
22}
23
24// ChangeStream returns a pointer to a new ChangeStreamOptions
25func ChangeStream() *ChangeStreamOptions {
26 return &ChangeStreamOptions{}
27}
28
29// SetBatchSize specifies the number of documents to return per batch
30func (cso *ChangeStreamOptions) SetBatchSize(i int32) *ChangeStreamOptions {
31 cso.BatchSize = &i
32 return cso
33}
34
35// SetCollation specifies a collation
36func (cso *ChangeStreamOptions) SetCollation(c Collation) *ChangeStreamOptions {
37 cso.Collation = &c
38 return cso
39}
40
41// SetFullDocument specifies the fullDocument option.
42// When set to ‘updateLookup’, the change notification for partial updates will
43// include both a delta describing the changes to the document, as well as a
44// copy of the entire document that was changed from some time after the change
45// occurred.
46func (cso *ChangeStreamOptions) SetFullDocument(fd FullDocument) *ChangeStreamOptions {
47 cso.FullDocument = &fd
48 return cso
49}
50
51// SetMaxAwaitTime specifies the maximum amount of time for the server to wait on new documents to satisfy a change stream query
52func (cso *ChangeStreamOptions) SetMaxAwaitTime(d time.Duration) *ChangeStreamOptions {
53 cso.MaxAwaitTime = &d
54 return cso
55}
56
57// SetResumeAfter specifies the logical starting point for the new change stream
58func (cso *ChangeStreamOptions) SetResumeAfter(rt interface{}) *ChangeStreamOptions {
59 cso.ResumeAfter = rt
60 return cso
61}
62
63// SetStartAtOperationTime ensures that a change stream will only provide changes that occurred after a specified timestamp.
64func (cso *ChangeStreamOptions) SetStartAtOperationTime(t *primitive.Timestamp) *ChangeStreamOptions {
65 cso.StartAtOperationTime = t
66 return cso
67}
68
69// MergeChangeStreamOptions combines the argued ChangeStreamOptions into a single ChangeStreamOptions in a last-one-wins fashion
70func MergeChangeStreamOptions(opts ...*ChangeStreamOptions) *ChangeStreamOptions {
71 csOpts := ChangeStream()
72 for _, cso := range opts {
73 if cso == nil {
74 continue
75 }
76 if cso.BatchSize != nil {
77 csOpts.BatchSize = cso.BatchSize
78 }
79 if cso.Collation != nil {
80 csOpts.Collation = cso.Collation
81 }
82 if cso.FullDocument != nil {
83 csOpts.FullDocument = cso.FullDocument
84 }
85 if cso.MaxAwaitTime != nil {
86 csOpts.MaxAwaitTime = cso.MaxAwaitTime
87 }
88 if cso.ResumeAfter != nil {
89 csOpts.ResumeAfter = cso.ResumeAfter
90 }
91 if cso.StartAtOperationTime != nil {
92 csOpts.StartAtOperationTime = cso.StartAtOperationTime
93 }
94 }
95
96 return csOpts
97}