blob: 53301178184591018380c70dcc5e0f32e7f8a441 [file] [log] [blame]
Don Newton379ae252019-04-01 12:17:06 -04001// Copyright (C) MongoDB, Inc. 2017-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package mongo
8
9import (
10 "context"
11 "errors"
12 "time"
13
14 "github.com/mongodb/mongo-go-driver/bson"
15 "github.com/mongodb/mongo-go-driver/bson/bsoncodec"
16 "github.com/mongodb/mongo-go-driver/mongo/options"
17 "github.com/mongodb/mongo-go-driver/mongo/readconcern"
18 "github.com/mongodb/mongo-go-driver/mongo/readpref"
19 "github.com/mongodb/mongo-go-driver/x/bsonx"
20 "github.com/mongodb/mongo-go-driver/x/bsonx/bsoncore"
21 "github.com/mongodb/mongo-go-driver/x/mongo/driver"
22 "github.com/mongodb/mongo-go-driver/x/mongo/driver/session"
23 "github.com/mongodb/mongo-go-driver/x/network/command"
24 "github.com/mongodb/mongo-go-driver/x/network/description"
25)
26
27const errorInterrupted int32 = 11601
28const errorCappedPositionLost int32 = 136
29const errorCursorKilled int32 = 237
30
31// ErrMissingResumeToken indicates that a change stream notification from the server did not
32// contain a resume token.
33var ErrMissingResumeToken = errors.New("cannot provide resume functionality when the resume token is missing")
34
35// ErrNilCursor indicates that the cursor for the change stream is nil.
36var ErrNilCursor = errors.New("cursor is nil")
37
38// ChangeStream instances iterate a stream of change documents. Each document can be decoded via the
39// Decode method. Resume tokens should be retrieved via the ResumeToken method and can be stored to
40// resume the change stream at a specific point in time.
41//
42// A typical usage of the ChangeStream type would be:
43type ChangeStream struct {
44 // Current is the BSON bytes of the current change document. This property is only valid until
45 // the next call to Next or Close. If continued access is required to the bson.Raw, you must
46 // make a copy of it.
47 Current bson.Raw
48
49 cmd bsonx.Doc // aggregate command to run to create stream and rebuild cursor
50 pipeline bsonx.Arr
51 options *options.ChangeStreamOptions
52 coll *Collection
53 db *Database
54 ns command.Namespace
55 cursor *Cursor
56 cursorOpts bsonx.Doc
57
58 resumeToken bsonx.Doc
59 err error
60 streamType StreamType
61 client *Client
62 sess Session
63 readPref *readpref.ReadPref
64 readConcern *readconcern.ReadConcern
65 registry *bsoncodec.Registry
66}
67
68func (cs *ChangeStream) replaceOptions(desc description.SelectedServer) {
69 // if cs has not received any changes and resumeAfter not specified and max wire version >= 7, run known agg cmd
70 // with startAtOperationTime set to startAtOperationTime provided by user or saved from initial agg
71 // must not send resumeAfter key
72
73 // else: run known agg cmd with resumeAfter set to last known resumeToken
74 // must not set startAtOperationTime (remove if originally in cmd)
75
76 if cs.options.ResumeAfter == nil && desc.WireVersion.Max >= 7 && cs.resumeToken == nil {
77 cs.options.SetStartAtOperationTime(cs.sess.OperationTime())
78 } else {
79 if cs.resumeToken == nil {
80 return // restart stream without the resume token
81 }
82
83 cs.options.SetResumeAfter(cs.resumeToken)
84 // remove startAtOperationTime
85 cs.options.SetStartAtOperationTime(nil)
86 }
87}
88
89// Create options docs for the pipeline and cursor
90func createCmdDocs(csType StreamType, opts *options.ChangeStreamOptions, registry *bsoncodec.Registry) (bsonx.Doc,
91 bsonx.Doc, bsonx.Doc, error) {
92
93 pipelineDoc := bsonx.Doc{}
94 cursorDoc := bsonx.Doc{}
95 optsDoc := bsonx.Doc{}
96
97 if csType == ClientStream {
98 pipelineDoc = pipelineDoc.Append("allChangesForCluster", bsonx.Boolean(true))
99 }
100
101 if opts.BatchSize != nil {
102 cursorDoc = cursorDoc.Append("batchSize", bsonx.Int32(*opts.BatchSize))
103 }
104 if opts.Collation != nil {
105 optsDoc = optsDoc.Append("collation", bsonx.Document(opts.Collation.ToDocument()))
106 }
107 if opts.FullDocument != nil {
108 pipelineDoc = pipelineDoc.Append("fullDocument", bsonx.String(string(*opts.FullDocument)))
109 }
110 if opts.MaxAwaitTime != nil {
111 ms := int64(time.Duration(*opts.MaxAwaitTime) / time.Millisecond)
112 pipelineDoc = pipelineDoc.Append("maxAwaitTimeMS", bsonx.Int64(ms))
113 }
114 if opts.ResumeAfter != nil {
115 rt, err := transformDocument(registry, opts.ResumeAfter)
116 if err != nil {
117 return nil, nil, nil, err
118 }
119
120 pipelineDoc = pipelineDoc.Append("resumeAfter", bsonx.Document(rt))
121 }
122 if opts.StartAtOperationTime != nil {
123 pipelineDoc = pipelineDoc.Append("startAtOperationTime",
124 bsonx.Timestamp(opts.StartAtOperationTime.T, opts.StartAtOperationTime.I))
125 }
126
127 return pipelineDoc, cursorDoc, optsDoc, nil
128}
129
130func getSession(ctx context.Context, client *Client) (Session, error) {
131 sess := sessionFromContext(ctx)
132 if err := client.ValidSession(sess); err != nil {
133 return nil, err
134 }
135
136 var mongoSess Session
137 if sess != nil {
138 mongoSess = &sessionImpl{
139 Client: sess,
140 }
141 } else {
142 // create implicit session because it will be needed
143 newSess, err := session.NewClientSession(client.topology.SessionPool, client.id, session.Implicit)
144 if err != nil {
145 return nil, err
146 }
147
148 mongoSess = &sessionImpl{
149 Client: newSess,
150 }
151 }
152
153 return mongoSess, nil
154}
155
156func parseOptions(csType StreamType, opts *options.ChangeStreamOptions, registry *bsoncodec.Registry) (bsonx.Doc,
157 bsonx.Doc, bsonx.Doc, error) {
158
159 if opts.FullDocument == nil {
160 opts = opts.SetFullDocument(options.Default)
161 }
162
163 pipelineDoc, cursorDoc, optsDoc, err := createCmdDocs(csType, opts, registry)
164 if err != nil {
165 return nil, nil, nil, err
166 }
167
168 return pipelineDoc, cursorDoc, optsDoc, nil
169}
170
171func (cs *ChangeStream) runCommand(ctx context.Context, replaceOptions bool) error {
172 ss, err := cs.client.topology.SelectServer(ctx, cs.db.writeSelector)
173 if err != nil {
174 return err
175 }
176
177 desc := ss.Description()
178 conn, err := ss.Connection(ctx)
179 if err != nil {
180 return err
181 }
182 defer conn.Close()
183
184 if replaceOptions {
185 cs.replaceOptions(desc)
186 optionsDoc, _, _, err := createCmdDocs(cs.streamType, cs.options, cs.registry)
187 if err != nil {
188 return err
189 }
190
191 changeStreamDoc := bsonx.Doc{
192 {"$changeStream", bsonx.Document(optionsDoc)},
193 }
194 cs.pipeline[0] = bsonx.Document(changeStreamDoc)
195 cs.cmd.Set("pipeline", bsonx.Array(cs.pipeline))
196 }
197
198 readCmd := command.Read{
199 DB: cs.db.name,
200 Command: cs.cmd,
201 Session: cs.sess.(*sessionImpl).Client,
202 Clock: cs.client.clock,
203 ReadPref: cs.readPref,
204 ReadConcern: cs.readConcern,
205 }
206
207 rdr, err := readCmd.RoundTrip(ctx, desc, conn)
208 if err != nil {
209 cs.sess.EndSession(ctx)
210 return err
211 }
212
213 batchCursor, err := driver.NewBatchCursor(bsoncore.Document(rdr), readCmd.Session, readCmd.Clock, ss.Server)
214 if err != nil {
215 cs.sess.EndSession(ctx)
216 return err
217 }
218 cursor, err := newCursor(batchCursor, cs.registry)
219 if err != nil {
220 cs.sess.EndSession(ctx)
221 return err
222 }
223 cs.cursor = cursor
224
225 cursorValue, err := rdr.LookupErr("cursor")
226 if err != nil {
227 return err
228 }
229 cursorDoc := cursorValue.Document()
230 cs.ns = command.ParseNamespace(cursorDoc.Lookup("ns").StringValue())
231
232 return nil
233}
234
235func newChangeStream(ctx context.Context, coll *Collection, pipeline interface{},
236 opts ...*options.ChangeStreamOptions) (*ChangeStream, error) {
237
238 pipelineArr, err := transformAggregatePipeline(coll.registry, pipeline)
239 if err != nil {
240 return nil, err
241 }
242
243 csOpts := options.MergeChangeStreamOptions(opts...)
244 pipelineDoc, cursorDoc, optsDoc, err := parseOptions(CollectionStream, csOpts, coll.registry)
245 if err != nil {
246 return nil, err
247 }
248 sess, err := getSession(ctx, coll.client)
249 if err != nil {
250 return nil, err
251 }
252
253 csDoc := bsonx.Document(bsonx.Doc{
254 {"$changeStream", bsonx.Document(pipelineDoc)},
255 })
256 pipelineArr = append(bsonx.Arr{csDoc}, pipelineArr...)
257
258 cmd := bsonx.Doc{
259 {"aggregate", bsonx.String(coll.name)},
260 {"pipeline", bsonx.Array(pipelineArr)},
261 {"cursor", bsonx.Document(cursorDoc)},
262 }
263 cmd = append(cmd, optsDoc...)
264
265 cs := &ChangeStream{
266 client: coll.client,
267 sess: sess,
268 cmd: cmd,
269 pipeline: pipelineArr,
270 coll: coll,
271 db: coll.db,
272 streamType: CollectionStream,
273 readPref: coll.readPreference,
274 readConcern: coll.readConcern,
275 options: csOpts,
276 registry: coll.registry,
277 cursorOpts: cursorDoc,
278 }
279
280 err = cs.runCommand(ctx, false)
281 if err != nil {
282 return nil, err
283 }
284
285 return cs, nil
286}
287
288func newDbChangeStream(ctx context.Context, db *Database, pipeline interface{},
289 opts ...*options.ChangeStreamOptions) (*ChangeStream, error) {
290
291 pipelineArr, err := transformAggregatePipeline(db.registry, pipeline)
292 if err != nil {
293 return nil, err
294 }
295
296 csOpts := options.MergeChangeStreamOptions(opts...)
297 pipelineDoc, cursorDoc, optsDoc, err := parseOptions(DatabaseStream, csOpts, db.registry)
298 if err != nil {
299 return nil, err
300 }
301 sess, err := getSession(ctx, db.client)
302 if err != nil {
303 return nil, err
304 }
305
306 csDoc := bsonx.Document(bsonx.Doc{
307 {"$changeStream", bsonx.Document(pipelineDoc)},
308 })
309 pipelineArr = append(bsonx.Arr{csDoc}, pipelineArr...)
310
311 cmd := bsonx.Doc{
312 {"aggregate", bsonx.Int32(1)},
313 {"pipeline", bsonx.Array(pipelineArr)},
314 {"cursor", bsonx.Document(cursorDoc)},
315 }
316 cmd = append(cmd, optsDoc...)
317
318 cs := &ChangeStream{
319 client: db.client,
320 db: db,
321 sess: sess,
322 cmd: cmd,
323 pipeline: pipelineArr,
324 streamType: DatabaseStream,
325 readPref: db.readPreference,
326 readConcern: db.readConcern,
327 options: csOpts,
328 registry: db.registry,
329 cursorOpts: cursorDoc,
330 }
331
332 err = cs.runCommand(ctx, false)
333 if err != nil {
334 return nil, err
335 }
336
337 return cs, nil
338}
339
340func newClientChangeStream(ctx context.Context, client *Client, pipeline interface{},
341 opts ...*options.ChangeStreamOptions) (*ChangeStream, error) {
342
343 pipelineArr, err := transformAggregatePipeline(client.registry, pipeline)
344 if err != nil {
345 return nil, err
346 }
347
348 csOpts := options.MergeChangeStreamOptions(opts...)
349 pipelineDoc, cursorDoc, optsDoc, err := parseOptions(ClientStream, csOpts, client.registry)
350 if err != nil {
351 return nil, err
352 }
353 sess, err := getSession(ctx, client)
354 if err != nil {
355 return nil, err
356 }
357
358 csDoc := bsonx.Document(bsonx.Doc{
359 {"$changeStream", bsonx.Document(pipelineDoc)},
360 })
361 pipelineArr = append(bsonx.Arr{csDoc}, pipelineArr...)
362
363 cmd := bsonx.Doc{
364 {"aggregate", bsonx.Int32(1)},
365 {"pipeline", bsonx.Array(pipelineArr)},
366 {"cursor", bsonx.Document(cursorDoc)},
367 }
368 cmd = append(cmd, optsDoc...)
369
370 cs := &ChangeStream{
371 client: client,
372 db: client.Database("admin"),
373 sess: sess,
374 cmd: cmd,
375 pipeline: pipelineArr,
376 streamType: ClientStream,
377 readPref: client.readPreference,
378 readConcern: client.readConcern,
379 options: csOpts,
380 registry: client.registry,
381 cursorOpts: cursorDoc,
382 }
383
384 err = cs.runCommand(ctx, false)
385 if err != nil {
386 return nil, err
387 }
388
389 return cs, nil
390}
391
392func (cs *ChangeStream) storeResumeToken() error {
393 idVal, err := cs.cursor.Current.LookupErr("_id")
394 if err != nil {
395 _ = cs.Close(context.Background())
396 return ErrMissingResumeToken
397 }
398
399 var idDoc bson.Raw
400 idDoc, ok := idVal.DocumentOK()
401 if !ok {
402 _ = cs.Close(context.Background())
403 return ErrMissingResumeToken
404 }
405 tokenDoc, err := bsonx.ReadDoc(idDoc)
406 if err != nil {
407 _ = cs.Close(context.Background())
408 return ErrMissingResumeToken
409 }
410
411 cs.resumeToken = tokenDoc
412 return nil
413}
414
415// ID returns the cursor ID for this change stream.
416func (cs *ChangeStream) ID() int64 {
417 if cs.cursor == nil {
418 return 0
419 }
420
421 return cs.cursor.ID()
422}
423
424// Next gets the next result from this change stream. Returns true if there were no errors and the next
425// result is available for decoding.
426func (cs *ChangeStream) Next(ctx context.Context) bool {
427 // execute in a loop to retry resume-able errors and advance the underlying cursor
428 for {
429 if cs.cursor == nil {
430 return false
431 }
432
433 if cs.cursor.Next(ctx) {
434 err := cs.storeResumeToken()
435 if err != nil {
436 cs.err = err
437 return false
438 }
439
440 cs.Current = cs.cursor.Current
441 return true
442 }
443
444 err := cs.cursor.Err()
445 if err == nil {
446 return false
447 }
448
449 switch t := err.(type) {
450 case command.Error:
451 if t.Code == errorInterrupted || t.Code == errorCappedPositionLost || t.Code == errorCursorKilled {
452 return false
453 }
454 }
455
456 killCursors := command.KillCursors{
457 NS: cs.ns,
458 IDs: []int64{cs.ID()},
459 }
460
461 _, _ = driver.KillCursors(ctx, killCursors, cs.client.topology, cs.db.writeSelector)
462 cs.err = cs.runCommand(ctx, true)
463 if cs.err != nil {
464 return false
465 }
466 }
467}
468
469// Decode will decode the current document into val.
470func (cs *ChangeStream) Decode(out interface{}) error {
471 if cs.cursor == nil {
472 return ErrNilCursor
473 }
474
475 return bson.UnmarshalWithRegistry(cs.registry, cs.Current, out)
476}
477
478// Err returns the current error.
479func (cs *ChangeStream) Err() error {
480 if cs.err != nil {
481 return cs.err
482 }
483 if cs.cursor == nil {
484 return nil
485 }
486
487 return cs.cursor.Err()
488}
489
490// Close closes this cursor.
491func (cs *ChangeStream) Close(ctx context.Context) error {
492 if cs.cursor == nil {
493 return nil // cursor is already closed
494 }
495
496 return cs.cursor.Close(ctx)
497}
498
499// StreamType represents the type of a change stream.
500type StreamType uint8
501
502// These constants represent valid change stream types. A change stream can be initialized over a collection, all
503// collections in a database, or over a whole client.
504const (
505 CollectionStream StreamType = iota
506 DatabaseStream
507 ClientStream
508)