seba-365 - implemented dep

Change-Id: Ia6226d50e7615935a0c8876809a687427ff88c22
diff --git a/vendor/github.com/mongodb/mongo-go-driver/mongo/change_stream.go b/vendor/github.com/mongodb/mongo-go-driver/mongo/change_stream.go
new file mode 100644
index 0000000..5330117
--- /dev/null
+++ b/vendor/github.com/mongodb/mongo-go-driver/mongo/change_stream.go
@@ -0,0 +1,508 @@
+// Copyright (C) MongoDB, Inc. 2017-present.
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License. You may obtain
+// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+
+package mongo
+
+import (
+	"context"
+	"errors"
+	"time"
+
+	"github.com/mongodb/mongo-go-driver/bson"
+	"github.com/mongodb/mongo-go-driver/bson/bsoncodec"
+	"github.com/mongodb/mongo-go-driver/mongo/options"
+	"github.com/mongodb/mongo-go-driver/mongo/readconcern"
+	"github.com/mongodb/mongo-go-driver/mongo/readpref"
+	"github.com/mongodb/mongo-go-driver/x/bsonx"
+	"github.com/mongodb/mongo-go-driver/x/bsonx/bsoncore"
+	"github.com/mongodb/mongo-go-driver/x/mongo/driver"
+	"github.com/mongodb/mongo-go-driver/x/mongo/driver/session"
+	"github.com/mongodb/mongo-go-driver/x/network/command"
+	"github.com/mongodb/mongo-go-driver/x/network/description"
+)
+
+const errorInterrupted int32 = 11601
+const errorCappedPositionLost int32 = 136
+const errorCursorKilled int32 = 237
+
+// ErrMissingResumeToken indicates that a change stream notification from the server did not
+// contain a resume token.
+var ErrMissingResumeToken = errors.New("cannot provide resume functionality when the resume token is missing")
+
+// ErrNilCursor indicates that the cursor for the change stream is nil.
+var ErrNilCursor = errors.New("cursor is nil")
+
+// ChangeStream instances iterate a stream of change documents. Each document can be decoded via the
+// Decode method. Resume tokens should be retrieved via the ResumeToken method and can be stored to
+// resume the change stream at a specific point in time.
+//
+// A typical usage of the ChangeStream type would be:
+type ChangeStream struct {
+	// Current is the BSON bytes of the current change document. This property is only valid until
+	// the next call to Next or Close. If continued access is required to the bson.Raw, you must
+	// make a copy of it.
+	Current bson.Raw
+
+	cmd        bsonx.Doc // aggregate command to run to create stream and rebuild cursor
+	pipeline   bsonx.Arr
+	options    *options.ChangeStreamOptions
+	coll       *Collection
+	db         *Database
+	ns         command.Namespace
+	cursor     *Cursor
+	cursorOpts bsonx.Doc
+
+	resumeToken bsonx.Doc
+	err         error
+	streamType  StreamType
+	client      *Client
+	sess        Session
+	readPref    *readpref.ReadPref
+	readConcern *readconcern.ReadConcern
+	registry    *bsoncodec.Registry
+}
+
+func (cs *ChangeStream) replaceOptions(desc description.SelectedServer) {
+	// if cs has not received any changes and resumeAfter not specified and max wire version >= 7, run known agg cmd
+	// with startAtOperationTime set to startAtOperationTime provided by user or saved from initial agg
+	// must not send resumeAfter key
+
+	// else: run known agg cmd with resumeAfter set to last known resumeToken
+	// must not set startAtOperationTime (remove if originally in cmd)
+
+	if cs.options.ResumeAfter == nil && desc.WireVersion.Max >= 7 && cs.resumeToken == nil {
+		cs.options.SetStartAtOperationTime(cs.sess.OperationTime())
+	} else {
+		if cs.resumeToken == nil {
+			return // restart stream without the resume token
+		}
+
+		cs.options.SetResumeAfter(cs.resumeToken)
+		// remove startAtOperationTime
+		cs.options.SetStartAtOperationTime(nil)
+	}
+}
+
+// Create options docs for the pipeline and cursor
+func createCmdDocs(csType StreamType, opts *options.ChangeStreamOptions, registry *bsoncodec.Registry) (bsonx.Doc,
+	bsonx.Doc, bsonx.Doc, error) {
+
+	pipelineDoc := bsonx.Doc{}
+	cursorDoc := bsonx.Doc{}
+	optsDoc := bsonx.Doc{}
+
+	if csType == ClientStream {
+		pipelineDoc = pipelineDoc.Append("allChangesForCluster", bsonx.Boolean(true))
+	}
+
+	if opts.BatchSize != nil {
+		cursorDoc = cursorDoc.Append("batchSize", bsonx.Int32(*opts.BatchSize))
+	}
+	if opts.Collation != nil {
+		optsDoc = optsDoc.Append("collation", bsonx.Document(opts.Collation.ToDocument()))
+	}
+	if opts.FullDocument != nil {
+		pipelineDoc = pipelineDoc.Append("fullDocument", bsonx.String(string(*opts.FullDocument)))
+	}
+	if opts.MaxAwaitTime != nil {
+		ms := int64(time.Duration(*opts.MaxAwaitTime) / time.Millisecond)
+		pipelineDoc = pipelineDoc.Append("maxAwaitTimeMS", bsonx.Int64(ms))
+	}
+	if opts.ResumeAfter != nil {
+		rt, err := transformDocument(registry, opts.ResumeAfter)
+		if err != nil {
+			return nil, nil, nil, err
+		}
+
+		pipelineDoc = pipelineDoc.Append("resumeAfter", bsonx.Document(rt))
+	}
+	if opts.StartAtOperationTime != nil {
+		pipelineDoc = pipelineDoc.Append("startAtOperationTime",
+			bsonx.Timestamp(opts.StartAtOperationTime.T, opts.StartAtOperationTime.I))
+	}
+
+	return pipelineDoc, cursorDoc, optsDoc, nil
+}
+
+func getSession(ctx context.Context, client *Client) (Session, error) {
+	sess := sessionFromContext(ctx)
+	if err := client.ValidSession(sess); err != nil {
+		return nil, err
+	}
+
+	var mongoSess Session
+	if sess != nil {
+		mongoSess = &sessionImpl{
+			Client: sess,
+		}
+	} else {
+		// create implicit session because it will be needed
+		newSess, err := session.NewClientSession(client.topology.SessionPool, client.id, session.Implicit)
+		if err != nil {
+			return nil, err
+		}
+
+		mongoSess = &sessionImpl{
+			Client: newSess,
+		}
+	}
+
+	return mongoSess, nil
+}
+
+func parseOptions(csType StreamType, opts *options.ChangeStreamOptions, registry *bsoncodec.Registry) (bsonx.Doc,
+	bsonx.Doc, bsonx.Doc, error) {
+
+	if opts.FullDocument == nil {
+		opts = opts.SetFullDocument(options.Default)
+	}
+
+	pipelineDoc, cursorDoc, optsDoc, err := createCmdDocs(csType, opts, registry)
+	if err != nil {
+		return nil, nil, nil, err
+	}
+
+	return pipelineDoc, cursorDoc, optsDoc, nil
+}
+
+func (cs *ChangeStream) runCommand(ctx context.Context, replaceOptions bool) error {
+	ss, err := cs.client.topology.SelectServer(ctx, cs.db.writeSelector)
+	if err != nil {
+		return err
+	}
+
+	desc := ss.Description()
+	conn, err := ss.Connection(ctx)
+	if err != nil {
+		return err
+	}
+	defer conn.Close()
+
+	if replaceOptions {
+		cs.replaceOptions(desc)
+		optionsDoc, _, _, err := createCmdDocs(cs.streamType, cs.options, cs.registry)
+		if err != nil {
+			return err
+		}
+
+		changeStreamDoc := bsonx.Doc{
+			{"$changeStream", bsonx.Document(optionsDoc)},
+		}
+		cs.pipeline[0] = bsonx.Document(changeStreamDoc)
+		cs.cmd.Set("pipeline", bsonx.Array(cs.pipeline))
+	}
+
+	readCmd := command.Read{
+		DB:          cs.db.name,
+		Command:     cs.cmd,
+		Session:     cs.sess.(*sessionImpl).Client,
+		Clock:       cs.client.clock,
+		ReadPref:    cs.readPref,
+		ReadConcern: cs.readConcern,
+	}
+
+	rdr, err := readCmd.RoundTrip(ctx, desc, conn)
+	if err != nil {
+		cs.sess.EndSession(ctx)
+		return err
+	}
+
+	batchCursor, err := driver.NewBatchCursor(bsoncore.Document(rdr), readCmd.Session, readCmd.Clock, ss.Server)
+	if err != nil {
+		cs.sess.EndSession(ctx)
+		return err
+	}
+	cursor, err := newCursor(batchCursor, cs.registry)
+	if err != nil {
+		cs.sess.EndSession(ctx)
+		return err
+	}
+	cs.cursor = cursor
+
+	cursorValue, err := rdr.LookupErr("cursor")
+	if err != nil {
+		return err
+	}
+	cursorDoc := cursorValue.Document()
+	cs.ns = command.ParseNamespace(cursorDoc.Lookup("ns").StringValue())
+
+	return nil
+}
+
+func newChangeStream(ctx context.Context, coll *Collection, pipeline interface{},
+	opts ...*options.ChangeStreamOptions) (*ChangeStream, error) {
+
+	pipelineArr, err := transformAggregatePipeline(coll.registry, pipeline)
+	if err != nil {
+		return nil, err
+	}
+
+	csOpts := options.MergeChangeStreamOptions(opts...)
+	pipelineDoc, cursorDoc, optsDoc, err := parseOptions(CollectionStream, csOpts, coll.registry)
+	if err != nil {
+		return nil, err
+	}
+	sess, err := getSession(ctx, coll.client)
+	if err != nil {
+		return nil, err
+	}
+
+	csDoc := bsonx.Document(bsonx.Doc{
+		{"$changeStream", bsonx.Document(pipelineDoc)},
+	})
+	pipelineArr = append(bsonx.Arr{csDoc}, pipelineArr...)
+
+	cmd := bsonx.Doc{
+		{"aggregate", bsonx.String(coll.name)},
+		{"pipeline", bsonx.Array(pipelineArr)},
+		{"cursor", bsonx.Document(cursorDoc)},
+	}
+	cmd = append(cmd, optsDoc...)
+
+	cs := &ChangeStream{
+		client:      coll.client,
+		sess:        sess,
+		cmd:         cmd,
+		pipeline:    pipelineArr,
+		coll:        coll,
+		db:          coll.db,
+		streamType:  CollectionStream,
+		readPref:    coll.readPreference,
+		readConcern: coll.readConcern,
+		options:     csOpts,
+		registry:    coll.registry,
+		cursorOpts:  cursorDoc,
+	}
+
+	err = cs.runCommand(ctx, false)
+	if err != nil {
+		return nil, err
+	}
+
+	return cs, nil
+}
+
+func newDbChangeStream(ctx context.Context, db *Database, pipeline interface{},
+	opts ...*options.ChangeStreamOptions) (*ChangeStream, error) {
+
+	pipelineArr, err := transformAggregatePipeline(db.registry, pipeline)
+	if err != nil {
+		return nil, err
+	}
+
+	csOpts := options.MergeChangeStreamOptions(opts...)
+	pipelineDoc, cursorDoc, optsDoc, err := parseOptions(DatabaseStream, csOpts, db.registry)
+	if err != nil {
+		return nil, err
+	}
+	sess, err := getSession(ctx, db.client)
+	if err != nil {
+		return nil, err
+	}
+
+	csDoc := bsonx.Document(bsonx.Doc{
+		{"$changeStream", bsonx.Document(pipelineDoc)},
+	})
+	pipelineArr = append(bsonx.Arr{csDoc}, pipelineArr...)
+
+	cmd := bsonx.Doc{
+		{"aggregate", bsonx.Int32(1)},
+		{"pipeline", bsonx.Array(pipelineArr)},
+		{"cursor", bsonx.Document(cursorDoc)},
+	}
+	cmd = append(cmd, optsDoc...)
+
+	cs := &ChangeStream{
+		client:      db.client,
+		db:          db,
+		sess:        sess,
+		cmd:         cmd,
+		pipeline:    pipelineArr,
+		streamType:  DatabaseStream,
+		readPref:    db.readPreference,
+		readConcern: db.readConcern,
+		options:     csOpts,
+		registry:    db.registry,
+		cursorOpts:  cursorDoc,
+	}
+
+	err = cs.runCommand(ctx, false)
+	if err != nil {
+		return nil, err
+	}
+
+	return cs, nil
+}
+
+func newClientChangeStream(ctx context.Context, client *Client, pipeline interface{},
+	opts ...*options.ChangeStreamOptions) (*ChangeStream, error) {
+
+	pipelineArr, err := transformAggregatePipeline(client.registry, pipeline)
+	if err != nil {
+		return nil, err
+	}
+
+	csOpts := options.MergeChangeStreamOptions(opts...)
+	pipelineDoc, cursorDoc, optsDoc, err := parseOptions(ClientStream, csOpts, client.registry)
+	if err != nil {
+		return nil, err
+	}
+	sess, err := getSession(ctx, client)
+	if err != nil {
+		return nil, err
+	}
+
+	csDoc := bsonx.Document(bsonx.Doc{
+		{"$changeStream", bsonx.Document(pipelineDoc)},
+	})
+	pipelineArr = append(bsonx.Arr{csDoc}, pipelineArr...)
+
+	cmd := bsonx.Doc{
+		{"aggregate", bsonx.Int32(1)},
+		{"pipeline", bsonx.Array(pipelineArr)},
+		{"cursor", bsonx.Document(cursorDoc)},
+	}
+	cmd = append(cmd, optsDoc...)
+
+	cs := &ChangeStream{
+		client:      client,
+		db:          client.Database("admin"),
+		sess:        sess,
+		cmd:         cmd,
+		pipeline:    pipelineArr,
+		streamType:  ClientStream,
+		readPref:    client.readPreference,
+		readConcern: client.readConcern,
+		options:     csOpts,
+		registry:    client.registry,
+		cursorOpts:  cursorDoc,
+	}
+
+	err = cs.runCommand(ctx, false)
+	if err != nil {
+		return nil, err
+	}
+
+	return cs, nil
+}
+
+func (cs *ChangeStream) storeResumeToken() error {
+	idVal, err := cs.cursor.Current.LookupErr("_id")
+	if err != nil {
+		_ = cs.Close(context.Background())
+		return ErrMissingResumeToken
+	}
+
+	var idDoc bson.Raw
+	idDoc, ok := idVal.DocumentOK()
+	if !ok {
+		_ = cs.Close(context.Background())
+		return ErrMissingResumeToken
+	}
+	tokenDoc, err := bsonx.ReadDoc(idDoc)
+	if err != nil {
+		_ = cs.Close(context.Background())
+		return ErrMissingResumeToken
+	}
+
+	cs.resumeToken = tokenDoc
+	return nil
+}
+
+// ID returns the cursor ID for this change stream.
+func (cs *ChangeStream) ID() int64 {
+	if cs.cursor == nil {
+		return 0
+	}
+
+	return cs.cursor.ID()
+}
+
+// Next gets the next result from this change stream. Returns true if there were no errors and the next
+// result is available for decoding.
+func (cs *ChangeStream) Next(ctx context.Context) bool {
+	// execute in a loop to retry resume-able errors and advance the underlying cursor
+	for {
+		if cs.cursor == nil {
+			return false
+		}
+
+		if cs.cursor.Next(ctx) {
+			err := cs.storeResumeToken()
+			if err != nil {
+				cs.err = err
+				return false
+			}
+
+			cs.Current = cs.cursor.Current
+			return true
+		}
+
+		err := cs.cursor.Err()
+		if err == nil {
+			return false
+		}
+
+		switch t := err.(type) {
+		case command.Error:
+			if t.Code == errorInterrupted || t.Code == errorCappedPositionLost || t.Code == errorCursorKilled {
+				return false
+			}
+		}
+
+		killCursors := command.KillCursors{
+			NS:  cs.ns,
+			IDs: []int64{cs.ID()},
+		}
+
+		_, _ = driver.KillCursors(ctx, killCursors, cs.client.topology, cs.db.writeSelector)
+		cs.err = cs.runCommand(ctx, true)
+		if cs.err != nil {
+			return false
+		}
+	}
+}
+
+// Decode will decode the current document into val.
+func (cs *ChangeStream) Decode(out interface{}) error {
+	if cs.cursor == nil {
+		return ErrNilCursor
+	}
+
+	return bson.UnmarshalWithRegistry(cs.registry, cs.Current, out)
+}
+
+// Err returns the current error.
+func (cs *ChangeStream) Err() error {
+	if cs.err != nil {
+		return cs.err
+	}
+	if cs.cursor == nil {
+		return nil
+	}
+
+	return cs.cursor.Err()
+}
+
+// Close closes this cursor.
+func (cs *ChangeStream) Close(ctx context.Context) error {
+	if cs.cursor == nil {
+		return nil // cursor is already closed
+	}
+
+	return cs.cursor.Close(ctx)
+}
+
+// StreamType represents the type of a change stream.
+type StreamType uint8
+
+// These constants represent valid change stream types. A change stream can be initialized over a collection, all
+// collections in a database, or over a whole client.
+const (
+	CollectionStream StreamType = iota
+	DatabaseStream
+	ClientStream
+)