seba-365 - implemented dep
Change-Id: Ia6226d50e7615935a0c8876809a687427ff88c22
diff --git a/vendor/github.com/mongodb/mongo-go-driver/mongo/client.go b/vendor/github.com/mongodb/mongo-go-driver/mongo/client.go
new file mode 100644
index 0000000..7984bc0
--- /dev/null
+++ b/vendor/github.com/mongodb/mongo-go-driver/mongo/client.go
@@ -0,0 +1,454 @@
+// Copyright (C) MongoDB, Inc. 2017-present.
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License. You may obtain
+// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+
+package mongo
+
+import (
+ "context"
+ "time"
+
+ "github.com/mongodb/mongo-go-driver/bson"
+ "github.com/mongodb/mongo-go-driver/bson/bsoncodec"
+ "github.com/mongodb/mongo-go-driver/mongo/options"
+ "github.com/mongodb/mongo-go-driver/mongo/readconcern"
+ "github.com/mongodb/mongo-go-driver/mongo/readpref"
+ "github.com/mongodb/mongo-go-driver/mongo/writeconcern"
+ "github.com/mongodb/mongo-go-driver/tag"
+ "github.com/mongodb/mongo-go-driver/x/mongo/driver"
+ "github.com/mongodb/mongo-go-driver/x/mongo/driver/session"
+ "github.com/mongodb/mongo-go-driver/x/mongo/driver/topology"
+ "github.com/mongodb/mongo-go-driver/x/mongo/driver/uuid"
+ "github.com/mongodb/mongo-go-driver/x/network/command"
+ "github.com/mongodb/mongo-go-driver/x/network/connstring"
+ "github.com/mongodb/mongo-go-driver/x/network/description"
+)
+
+const defaultLocalThreshold = 15 * time.Millisecond
+
+// Client performs operations on a given topology.
+type Client struct {
+ id uuid.UUID
+ topologyOptions []topology.Option
+ topology *topology.Topology
+ connString connstring.ConnString
+ localThreshold time.Duration
+ retryWrites bool
+ clock *session.ClusterClock
+ readPreference *readpref.ReadPref
+ readConcern *readconcern.ReadConcern
+ writeConcern *writeconcern.WriteConcern
+ registry *bsoncodec.Registry
+ marshaller BSONAppender
+}
+
+// Connect creates a new Client and then initializes it using the Connect method.
+func Connect(ctx context.Context, uri string, opts ...*options.ClientOptions) (*Client, error) {
+ c, err := NewClientWithOptions(uri, opts...)
+ if err != nil {
+ return nil, err
+ }
+ err = c.Connect(ctx)
+ if err != nil {
+ return nil, err
+ }
+ return c, nil
+}
+
+// NewClient creates a new client to connect to a cluster specified by the uri.
+func NewClient(uri string) (*Client, error) {
+ cs, err := connstring.Parse(uri)
+ if err != nil {
+ return nil, err
+ }
+
+ return newClient(cs)
+}
+
+// NewClientWithOptions creates a new client to connect to to a cluster specified by the connection
+// string and the options manually passed in. If the same option is configured in both the
+// connection string and the manual options, the manual option will be ignored.
+func NewClientWithOptions(uri string, opts ...*options.ClientOptions) (*Client, error) {
+ cs, err := connstring.Parse(uri)
+ if err != nil {
+ return nil, err
+ }
+
+ return newClient(cs, opts...)
+}
+
+// Connect initializes the Client by starting background monitoring goroutines.
+// This method must be called before a Client can be used.
+func (c *Client) Connect(ctx context.Context) error {
+ err := c.topology.Connect(ctx)
+ if err != nil {
+ return replaceTopologyErr(err)
+ }
+
+ return nil
+
+}
+
+// Disconnect closes sockets to the topology referenced by this Client. It will
+// shut down any monitoring goroutines, close the idle connection pool, and will
+// wait until all the in use connections have been returned to the connection
+// pool and closed before returning. If the context expires via cancellation,
+// deadline, or timeout before the in use connections have returned, the in use
+// connections will be closed, resulting in the failure of any in flight read
+// or write operations. If this method returns with no errors, all connections
+// associated with this Client have been closed.
+func (c *Client) Disconnect(ctx context.Context) error {
+ c.endSessions(ctx)
+ return replaceTopologyErr(c.topology.Disconnect(ctx))
+}
+
+// Ping verifies that the client can connect to the topology.
+// If readPreference is nil then will use the client's default read
+// preference.
+func (c *Client) Ping(ctx context.Context, rp *readpref.ReadPref) error {
+ if ctx == nil {
+ ctx = context.Background()
+ }
+
+ if rp == nil {
+ rp = c.readPreference
+ }
+
+ _, err := c.topology.SelectServer(ctx, description.ReadPrefSelector(rp))
+ return replaceTopologyErr(err)
+}
+
+// StartSession starts a new session.
+func (c *Client) StartSession(opts ...*options.SessionOptions) (Session, error) {
+ if c.topology.SessionPool == nil {
+ return nil, ErrClientDisconnected
+ }
+
+ sopts := options.MergeSessionOptions(opts...)
+ coreOpts := &session.ClientOptions{
+ DefaultReadConcern: c.readConcern,
+ DefaultReadPreference: c.readPreference,
+ DefaultWriteConcern: c.writeConcern,
+ }
+ if sopts.CausalConsistency != nil {
+ coreOpts.CausalConsistency = sopts.CausalConsistency
+ }
+ if sopts.DefaultReadConcern != nil {
+ coreOpts.DefaultReadConcern = sopts.DefaultReadConcern
+ }
+ if sopts.DefaultWriteConcern != nil {
+ coreOpts.DefaultWriteConcern = sopts.DefaultWriteConcern
+ }
+ if sopts.DefaultReadPreference != nil {
+ coreOpts.DefaultReadPreference = sopts.DefaultReadPreference
+ }
+
+ sess, err := session.NewClientSession(c.topology.SessionPool, c.id, session.Explicit, coreOpts)
+ if err != nil {
+ return nil, replaceTopologyErr(err)
+ }
+
+ sess.RetryWrite = c.retryWrites
+
+ return &sessionImpl{
+ Client: sess,
+ topo: c.topology,
+ }, nil
+}
+
+func (c *Client) endSessions(ctx context.Context) {
+ if c.topology.SessionPool == nil {
+ return
+ }
+ cmd := command.EndSessions{
+ Clock: c.clock,
+ SessionIDs: c.topology.SessionPool.IDSlice(),
+ }
+
+ _, _ = driver.EndSessions(ctx, cmd, c.topology, description.ReadPrefSelector(readpref.PrimaryPreferred()))
+}
+
+func newClient(cs connstring.ConnString, opts ...*options.ClientOptions) (*Client, error) {
+ clientOpt := options.MergeClientOptions(cs, opts...)
+
+ client := &Client{
+ topologyOptions: clientOpt.TopologyOptions,
+ connString: clientOpt.ConnString,
+ localThreshold: defaultLocalThreshold,
+ readPreference: clientOpt.ReadPreference,
+ readConcern: clientOpt.ReadConcern,
+ writeConcern: clientOpt.WriteConcern,
+ registry: clientOpt.Registry,
+ }
+
+ if client.connString.RetryWritesSet {
+ client.retryWrites = client.connString.RetryWrites
+ }
+ if clientOpt.RetryWrites != nil {
+ client.retryWrites = *clientOpt.RetryWrites
+ }
+
+ clientID, err := uuid.New()
+ if err != nil {
+ return nil, err
+ }
+ client.id = clientID
+
+ topts := append(
+ client.topologyOptions,
+ topology.WithConnString(func(connstring.ConnString) connstring.ConnString { return client.connString }),
+ topology.WithServerOptions(func(opts ...topology.ServerOption) []topology.ServerOption {
+ return append(opts, topology.WithClock(func(clock *session.ClusterClock) *session.ClusterClock {
+ return client.clock
+ }), topology.WithRegistry(func(registry *bsoncodec.Registry) *bsoncodec.Registry {
+ return client.registry
+ }))
+ }),
+ )
+ topo, err := topology.New(topts...)
+ if err != nil {
+ return nil, replaceTopologyErr(err)
+ }
+ client.topology = topo
+ client.clock = &session.ClusterClock{}
+
+ if client.readConcern == nil {
+ client.readConcern = readConcernFromConnString(&client.connString)
+
+ if client.readConcern == nil {
+ // no read concern in conn string
+ client.readConcern = readconcern.New()
+ }
+ }
+
+ if client.writeConcern == nil {
+ client.writeConcern = writeConcernFromConnString(&client.connString)
+ }
+ if client.readPreference == nil {
+ rp, err := readPreferenceFromConnString(&client.connString)
+ if err != nil {
+ return nil, err
+ }
+ if rp != nil {
+ client.readPreference = rp
+ } else {
+ client.readPreference = readpref.Primary()
+ }
+ }
+
+ if client.registry == nil {
+ client.registry = bson.DefaultRegistry
+ }
+ return client, nil
+}
+
+func readConcernFromConnString(cs *connstring.ConnString) *readconcern.ReadConcern {
+ if len(cs.ReadConcernLevel) == 0 {
+ return nil
+ }
+
+ rc := &readconcern.ReadConcern{}
+ readconcern.Level(cs.ReadConcernLevel)(rc)
+
+ return rc
+}
+
+func writeConcernFromConnString(cs *connstring.ConnString) *writeconcern.WriteConcern {
+ var wc *writeconcern.WriteConcern
+
+ if len(cs.WString) > 0 {
+ if wc == nil {
+ wc = writeconcern.New()
+ }
+
+ writeconcern.WTagSet(cs.WString)(wc)
+ } else if cs.WNumberSet {
+ if wc == nil {
+ wc = writeconcern.New()
+ }
+
+ writeconcern.W(cs.WNumber)(wc)
+ }
+
+ if cs.JSet {
+ if wc == nil {
+ wc = writeconcern.New()
+ }
+
+ writeconcern.J(cs.J)(wc)
+ }
+
+ if cs.WTimeoutSet {
+ if wc == nil {
+ wc = writeconcern.New()
+ }
+
+ writeconcern.WTimeout(cs.WTimeout)(wc)
+ }
+
+ return wc
+}
+
+func readPreferenceFromConnString(cs *connstring.ConnString) (*readpref.ReadPref, error) {
+ var rp *readpref.ReadPref
+ var err error
+ options := make([]readpref.Option, 0, 1)
+
+ tagSets := tag.NewTagSetsFromMaps(cs.ReadPreferenceTagSets)
+ if len(tagSets) > 0 {
+ options = append(options, readpref.WithTagSets(tagSets...))
+ }
+
+ if cs.MaxStaleness != 0 {
+ options = append(options, readpref.WithMaxStaleness(cs.MaxStaleness))
+ }
+
+ if len(cs.ReadPreference) > 0 {
+ if rp == nil {
+ mode, _ := readpref.ModeFromString(cs.ReadPreference)
+ rp, err = readpref.New(mode, options...)
+ if err != nil {
+ return nil, err
+ }
+ }
+ }
+
+ return rp, nil
+}
+
+// ValidSession returns an error if the session doesn't belong to the client
+func (c *Client) ValidSession(sess *session.Client) error {
+ if sess != nil && !uuid.Equal(sess.ClientID, c.id) {
+ return ErrWrongClient
+ }
+ return nil
+}
+
+// Database returns a handle for a given database.
+func (c *Client) Database(name string, opts ...*options.DatabaseOptions) *Database {
+ return newDatabase(c, name, opts...)
+}
+
+// ConnectionString returns the connection string of the cluster the client is connected to.
+func (c *Client) ConnectionString() string {
+ return c.connString.Original
+}
+
+// ListDatabases returns a ListDatabasesResult.
+func (c *Client) ListDatabases(ctx context.Context, filter interface{}, opts ...*options.ListDatabasesOptions) (ListDatabasesResult, error) {
+ if ctx == nil {
+ ctx = context.Background()
+ }
+
+ sess := sessionFromContext(ctx)
+
+ err := c.ValidSession(sess)
+ if err != nil {
+ return ListDatabasesResult{}, err
+ }
+
+ f, err := transformDocument(c.registry, filter)
+ if err != nil {
+ return ListDatabasesResult{}, err
+ }
+
+ cmd := command.ListDatabases{
+ Filter: f,
+ Session: sess,
+ Clock: c.clock,
+ }
+
+ readSelector := description.CompositeSelector([]description.ServerSelector{
+ description.ReadPrefSelector(readpref.Primary()),
+ description.LatencySelector(c.localThreshold),
+ })
+ res, err := driver.ListDatabases(
+ ctx, cmd,
+ c.topology,
+ readSelector,
+ c.id,
+ c.topology.SessionPool,
+ opts...,
+ )
+ if err != nil {
+ return ListDatabasesResult{}, replaceTopologyErr(err)
+ }
+
+ return (ListDatabasesResult{}).fromResult(res), nil
+}
+
+// ListDatabaseNames returns a slice containing the names of all of the databases on the server.
+func (c *Client) ListDatabaseNames(ctx context.Context, filter interface{}, opts ...*options.ListDatabasesOptions) ([]string, error) {
+ opts = append(opts, options.ListDatabases().SetNameOnly(true))
+
+ res, err := c.ListDatabases(ctx, filter, opts...)
+ if err != nil {
+ return nil, err
+ }
+
+ names := make([]string, 0)
+ for _, spec := range res.Databases {
+ names = append(names, spec.Name)
+ }
+
+ return names, nil
+}
+
+// WithSession allows a user to start a session themselves and manage
+// its lifetime. The only way to provide a session to a CRUD method is
+// to invoke that CRUD method with the mongo.SessionContext within the
+// closure. The mongo.SessionContext can be used as a regular context,
+// so methods like context.WithDeadline and context.WithTimeout are
+// supported.
+//
+// If the context.Context already has a mongo.Session attached, that
+// mongo.Session will be replaced with the one provided.
+//
+// Errors returned from the closure are transparently returned from
+// this function.
+func WithSession(ctx context.Context, sess Session, fn func(SessionContext) error) error {
+ return fn(contextWithSession(ctx, sess))
+}
+
+// UseSession creates a default session, that is only valid for the
+// lifetime of the closure. No cleanup outside of closing the session
+// is done upon exiting the closure. This means that an outstanding
+// transaction will be aborted, even if the closure returns an error.
+//
+// If ctx already contains a mongo.Session, that mongo.Session will be
+// replaced with the newly created mongo.Session.
+//
+// Errors returned from the closure are transparently returned from
+// this method.
+func (c *Client) UseSession(ctx context.Context, fn func(SessionContext) error) error {
+ return c.UseSessionWithOptions(ctx, options.Session(), fn)
+}
+
+// UseSessionWithOptions works like UseSession but allows the caller
+// to specify the options used to create the session.
+func (c *Client) UseSessionWithOptions(ctx context.Context, opts *options.SessionOptions, fn func(SessionContext) error) error {
+ defaultSess, err := c.StartSession(opts)
+ if err != nil {
+ return err
+ }
+
+ defer defaultSess.EndSession(ctx)
+
+ sessCtx := sessionContext{
+ Context: context.WithValue(ctx, sessionKey{}, defaultSess),
+ Session: defaultSess,
+ }
+
+ return fn(sessCtx)
+}
+
+// Watch returns a change stream cursor used to receive information of changes to the client. This method is preferred
+// to running a raw aggregation with a $changeStream stage because it supports resumability in the case of some errors.
+// The client must have read concern majority or no read concern for a change stream to be created successfully.
+func (c *Client) Watch(ctx context.Context, pipeline interface{},
+ opts ...*options.ChangeStreamOptions) (*ChangeStream, error) {
+
+ return newClientChangeStream(ctx, c, pipeline, opts...)
+}