seba-365 - implemented dep
Change-Id: Ia6226d50e7615935a0c8876809a687427ff88c22
diff --git a/vendor/github.com/mongodb/mongo-go-driver/mongo/collection.go b/vendor/github.com/mongodb/mongo-go-driver/mongo/collection.go
new file mode 100644
index 0000000..fb16775
--- /dev/null
+++ b/vendor/github.com/mongodb/mongo-go-driver/mongo/collection.go
@@ -0,0 +1,1298 @@
+// Copyright (C) MongoDB, Inc. 2017-present.
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License. You may obtain
+// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+
+package mongo
+
+import (
+ "context"
+ "errors"
+ "strings"
+
+ "github.com/mongodb/mongo-go-driver/bson/bsoncodec"
+ "github.com/mongodb/mongo-go-driver/mongo/options"
+ "github.com/mongodb/mongo-go-driver/mongo/readconcern"
+ "github.com/mongodb/mongo-go-driver/mongo/readpref"
+ "github.com/mongodb/mongo-go-driver/mongo/writeconcern"
+ "github.com/mongodb/mongo-go-driver/x/bsonx"
+ "github.com/mongodb/mongo-go-driver/x/mongo/driver"
+ "github.com/mongodb/mongo-go-driver/x/mongo/driver/session"
+ "github.com/mongodb/mongo-go-driver/x/network/command"
+ "github.com/mongodb/mongo-go-driver/x/network/description"
+)
+
+// Collection performs operations on a given collection.
+type Collection struct {
+ client *Client
+ db *Database
+ name string
+ readConcern *readconcern.ReadConcern
+ writeConcern *writeconcern.WriteConcern
+ readPreference *readpref.ReadPref
+ readSelector description.ServerSelector
+ writeSelector description.ServerSelector
+ registry *bsoncodec.Registry
+}
+
+func newCollection(db *Database, name string, opts ...*options.CollectionOptions) *Collection {
+ collOpt := options.MergeCollectionOptions(opts...)
+
+ rc := db.readConcern
+ if collOpt.ReadConcern != nil {
+ rc = collOpt.ReadConcern
+ }
+
+ wc := db.writeConcern
+ if collOpt.WriteConcern != nil {
+ wc = collOpt.WriteConcern
+ }
+
+ rp := db.readPreference
+ if collOpt.ReadPreference != nil {
+ rp = collOpt.ReadPreference
+ }
+
+ reg := db.registry
+ if collOpt.Registry != nil {
+ reg = collOpt.Registry
+ }
+
+ readSelector := description.CompositeSelector([]description.ServerSelector{
+ description.ReadPrefSelector(rp),
+ description.LatencySelector(db.client.localThreshold),
+ })
+
+ writeSelector := description.CompositeSelector([]description.ServerSelector{
+ description.WriteSelector(),
+ description.LatencySelector(db.client.localThreshold),
+ })
+
+ coll := &Collection{
+ client: db.client,
+ db: db,
+ name: name,
+ readPreference: rp,
+ readConcern: rc,
+ writeConcern: wc,
+ readSelector: readSelector,
+ writeSelector: writeSelector,
+ registry: reg,
+ }
+
+ return coll
+}
+
+func (coll *Collection) copy() *Collection {
+ return &Collection{
+ client: coll.client,
+ db: coll.db,
+ name: coll.name,
+ readConcern: coll.readConcern,
+ writeConcern: coll.writeConcern,
+ readPreference: coll.readPreference,
+ readSelector: coll.readSelector,
+ writeSelector: coll.writeSelector,
+ registry: coll.registry,
+ }
+}
+
+// Clone creates a copy of this collection with updated options, if any are given.
+func (coll *Collection) Clone(opts ...*options.CollectionOptions) (*Collection, error) {
+ copyColl := coll.copy()
+ optsColl := options.MergeCollectionOptions(opts...)
+
+ if optsColl.ReadConcern != nil {
+ copyColl.readConcern = optsColl.ReadConcern
+ }
+
+ if optsColl.WriteConcern != nil {
+ copyColl.writeConcern = optsColl.WriteConcern
+ }
+
+ if optsColl.ReadPreference != nil {
+ copyColl.readPreference = optsColl.ReadPreference
+ }
+
+ if optsColl.Registry != nil {
+ copyColl.registry = optsColl.Registry
+ }
+
+ copyColl.readSelector = description.CompositeSelector([]description.ServerSelector{
+ description.ReadPrefSelector(copyColl.readPreference),
+ description.LatencySelector(copyColl.client.localThreshold),
+ })
+
+ return copyColl, nil
+}
+
+// Name provides access to the name of the collection.
+func (coll *Collection) Name() string {
+ return coll.name
+}
+
+// namespace returns the namespace of the collection.
+func (coll *Collection) namespace() command.Namespace {
+ return command.NewNamespace(coll.db.name, coll.name)
+}
+
+// Database provides access to the database that contains the collection.
+func (coll *Collection) Database() *Database {
+ return coll.db
+}
+
+// BulkWrite performs a bulk write operation.
+//
+// See https://docs.mongodb.com/manual/core/bulk-write-operations/.
+func (coll *Collection) BulkWrite(ctx context.Context, models []WriteModel,
+ opts ...*options.BulkWriteOptions) (*BulkWriteResult, error) {
+
+ if len(models) == 0 {
+ return nil, ErrEmptySlice
+ }
+
+ if ctx == nil {
+ ctx = context.Background()
+ }
+
+ sess := sessionFromContext(ctx)
+
+ err := coll.client.ValidSession(sess)
+ if err != nil {
+ return nil, err
+ }
+
+ dispatchModels := make([]driver.WriteModel, len(models))
+ for i, model := range models {
+ if model == nil {
+ return nil, ErrNilDocument
+ }
+ dispatchModels[i] = model.convertModel()
+ }
+
+ res, err := driver.BulkWrite(
+ ctx,
+ coll.namespace(),
+ dispatchModels,
+ coll.client.topology,
+ coll.writeSelector,
+ coll.client.id,
+ coll.client.topology.SessionPool,
+ coll.client.retryWrites,
+ sess,
+ coll.writeConcern,
+ coll.client.clock,
+ coll.registry,
+ opts...,
+ )
+
+ if err != nil {
+ if conv, ok := err.(driver.BulkWriteException); ok {
+ return &BulkWriteResult{}, BulkWriteException{
+ WriteConcernError: convertWriteConcernError(conv.WriteConcernError),
+ WriteErrors: convertBulkWriteErrors(conv.WriteErrors),
+ }
+ }
+
+ return &BulkWriteResult{}, replaceTopologyErr(err)
+ }
+
+ return &BulkWriteResult{
+ InsertedCount: res.InsertedCount,
+ MatchedCount: res.MatchedCount,
+ ModifiedCount: res.ModifiedCount,
+ DeletedCount: res.DeletedCount,
+ UpsertedCount: res.UpsertedCount,
+ UpsertedIDs: res.UpsertedIDs,
+ }, nil
+}
+
+// InsertOne inserts a single document into the collection.
+func (coll *Collection) InsertOne(ctx context.Context, document interface{},
+ opts ...*options.InsertOneOptions) (*InsertOneResult, error) {
+
+ if ctx == nil {
+ ctx = context.Background()
+ }
+
+ doc, insertedID, err := transformAndEnsureID(coll.registry, document)
+ if err != nil {
+ return nil, err
+ }
+
+ sess := sessionFromContext(ctx)
+
+ err = coll.client.ValidSession(sess)
+ if err != nil {
+ return nil, err
+ }
+
+ wc := coll.writeConcern
+ if sess != nil && sess.TransactionRunning() {
+ wc = nil
+ }
+ oldns := coll.namespace()
+ cmd := command.Insert{
+ NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
+ Docs: []bsonx.Doc{doc},
+ WriteConcern: wc,
+ Session: sess,
+ Clock: coll.client.clock,
+ }
+
+ // convert to InsertManyOptions so these can be argued to dispatch.Insert
+ insertOpts := make([]*options.InsertManyOptions, len(opts))
+ for i, opt := range opts {
+ insertOpts[i] = options.InsertMany()
+ insertOpts[i].BypassDocumentValidation = opt.BypassDocumentValidation
+ }
+
+ res, err := driver.Insert(
+ ctx, cmd,
+ coll.client.topology,
+ coll.writeSelector,
+ coll.client.id,
+ coll.client.topology.SessionPool,
+ coll.client.retryWrites,
+ insertOpts...,
+ )
+
+ rr, err := processWriteError(res.WriteConcernError, res.WriteErrors, err)
+ if rr&rrOne == 0 {
+ return nil, err
+ }
+
+ return &InsertOneResult{InsertedID: insertedID}, err
+}
+
+// InsertMany inserts the provided documents.
+func (coll *Collection) InsertMany(ctx context.Context, documents []interface{},
+ opts ...*options.InsertManyOptions) (*InsertManyResult, error) {
+
+ if ctx == nil {
+ ctx = context.Background()
+ }
+
+ if len(documents) == 0 {
+ return nil, ErrEmptySlice
+ }
+
+ result := make([]interface{}, len(documents))
+ docs := make([]bsonx.Doc, len(documents))
+
+ for i, doc := range documents {
+ if doc == nil {
+ return nil, ErrNilDocument
+ }
+ bdoc, insertedID, err := transformAndEnsureID(coll.registry, doc)
+ if err != nil {
+ return nil, err
+ }
+
+ docs[i] = bdoc
+ result[i] = insertedID
+ }
+
+ sess := sessionFromContext(ctx)
+
+ err := coll.client.ValidSession(sess)
+ if err != nil {
+ return nil, err
+ }
+
+ wc := coll.writeConcern
+ if sess != nil && sess.TransactionRunning() {
+ wc = nil
+ }
+
+ oldns := coll.namespace()
+ cmd := command.Insert{
+ NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
+ Docs: docs,
+ WriteConcern: wc,
+ Session: sess,
+ Clock: coll.client.clock,
+ }
+
+ res, err := driver.Insert(
+ ctx, cmd,
+ coll.client.topology,
+ coll.writeSelector,
+ coll.client.id,
+ coll.client.topology.SessionPool,
+ coll.client.retryWrites,
+ opts...,
+ )
+
+ switch err {
+ case nil:
+ case command.ErrUnacknowledgedWrite:
+ return &InsertManyResult{InsertedIDs: result}, ErrUnacknowledgedWrite
+ default:
+ return nil, replaceTopologyErr(err)
+ }
+ if len(res.WriteErrors) > 0 || res.WriteConcernError != nil {
+ bwErrors := make([]BulkWriteError, 0, len(res.WriteErrors))
+ for _, we := range res.WriteErrors {
+ bwErrors = append(bwErrors, BulkWriteError{
+ WriteError{
+ Index: we.Index,
+ Code: we.Code,
+ Message: we.ErrMsg,
+ },
+ nil,
+ })
+ }
+
+ err = BulkWriteException{
+ WriteErrors: bwErrors,
+ WriteConcernError: convertWriteConcernError(res.WriteConcernError),
+ }
+ }
+
+ return &InsertManyResult{InsertedIDs: result}, err
+}
+
+// DeleteOne deletes a single document from the collection.
+func (coll *Collection) DeleteOne(ctx context.Context, filter interface{},
+ opts ...*options.DeleteOptions) (*DeleteResult, error) {
+
+ if ctx == nil {
+ ctx = context.Background()
+ }
+
+ f, err := transformDocument(coll.registry, filter)
+ if err != nil {
+ return nil, err
+ }
+ deleteDocs := []bsonx.Doc{
+ {
+ {"q", bsonx.Document(f)},
+ {"limit", bsonx.Int32(1)},
+ },
+ }
+
+ sess := sessionFromContext(ctx)
+
+ err = coll.client.ValidSession(sess)
+ if err != nil {
+ return nil, err
+ }
+
+ wc := coll.writeConcern
+ if sess != nil && sess.TransactionRunning() {
+ wc = nil
+ }
+
+ oldns := coll.namespace()
+ cmd := command.Delete{
+ NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
+ Deletes: deleteDocs,
+ WriteConcern: wc,
+ Session: sess,
+ Clock: coll.client.clock,
+ }
+
+ res, err := driver.Delete(
+ ctx, cmd,
+ coll.client.topology,
+ coll.writeSelector,
+ coll.client.id,
+ coll.client.topology.SessionPool,
+ coll.client.retryWrites,
+ opts...,
+ )
+
+ rr, err := processWriteError(res.WriteConcernError, res.WriteErrors, err)
+ if rr&rrOne == 0 {
+ return nil, err
+ }
+ return &DeleteResult{DeletedCount: int64(res.N)}, err
+}
+
+// DeleteMany deletes multiple documents from the collection.
+func (coll *Collection) DeleteMany(ctx context.Context, filter interface{},
+ opts ...*options.DeleteOptions) (*DeleteResult, error) {
+
+ if ctx == nil {
+ ctx = context.Background()
+ }
+
+ f, err := transformDocument(coll.registry, filter)
+ if err != nil {
+ return nil, err
+ }
+ deleteDocs := []bsonx.Doc{{{"q", bsonx.Document(f)}, {"limit", bsonx.Int32(0)}}}
+
+ sess := sessionFromContext(ctx)
+
+ err = coll.client.ValidSession(sess)
+ if err != nil {
+ return nil, err
+ }
+
+ wc := coll.writeConcern
+ if sess != nil && sess.TransactionRunning() {
+ wc = nil
+ }
+
+ oldns := coll.namespace()
+ cmd := command.Delete{
+ NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
+ Deletes: deleteDocs,
+ WriteConcern: wc,
+ Session: sess,
+ Clock: coll.client.clock,
+ }
+
+ res, err := driver.Delete(
+ ctx, cmd,
+ coll.client.topology,
+ coll.writeSelector,
+ coll.client.id,
+ coll.client.topology.SessionPool,
+ false,
+ opts...,
+ )
+
+ rr, err := processWriteError(res.WriteConcernError, res.WriteErrors, err)
+ if rr&rrMany == 0 {
+ return nil, err
+ }
+ return &DeleteResult{DeletedCount: int64(res.N)}, err
+}
+
+func (coll *Collection) updateOrReplaceOne(ctx context.Context, filter,
+ update bsonx.Doc, sess *session.Client, opts ...*options.UpdateOptions) (*UpdateResult, error) {
+
+ // TODO: should session be taken from ctx or left as argument?
+ if ctx == nil {
+ ctx = context.Background()
+ }
+
+ updateDocs := []bsonx.Doc{
+ {
+ {"q", bsonx.Document(filter)},
+ {"u", bsonx.Document(update)},
+ {"multi", bsonx.Boolean(false)},
+ },
+ }
+
+ wc := coll.writeConcern
+ if sess != nil && sess.TransactionRunning() {
+ wc = nil
+ }
+
+ oldns := coll.namespace()
+ cmd := command.Update{
+ NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
+ Docs: updateDocs,
+ WriteConcern: wc,
+ Session: sess,
+ Clock: coll.client.clock,
+ }
+
+ r, err := driver.Update(
+ ctx, cmd,
+ coll.client.topology,
+ coll.writeSelector,
+ coll.client.id,
+ coll.client.topology.SessionPool,
+ coll.client.retryWrites,
+ opts...,
+ )
+ if err != nil && err != command.ErrUnacknowledgedWrite {
+ return nil, replaceTopologyErr(err)
+ }
+
+ res := &UpdateResult{
+ MatchedCount: r.MatchedCount,
+ ModifiedCount: r.ModifiedCount,
+ UpsertedCount: int64(len(r.Upserted)),
+ }
+ if len(r.Upserted) > 0 {
+ res.UpsertedID = r.Upserted[0].ID
+ res.MatchedCount--
+ }
+
+ rr, err := processWriteError(r.WriteConcernError, r.WriteErrors, err)
+ if rr&rrOne == 0 {
+ return nil, err
+ }
+ return res, err
+}
+
+// UpdateOne updates a single document in the collection.
+func (coll *Collection) UpdateOne(ctx context.Context, filter interface{}, update interface{},
+ opts ...*options.UpdateOptions) (*UpdateResult, error) {
+
+ if ctx == nil {
+ ctx = context.Background()
+ }
+
+ f, err := transformDocument(coll.registry, filter)
+ if err != nil {
+ return nil, err
+ }
+
+ u, err := transformDocument(coll.registry, update)
+ if err != nil {
+ return nil, err
+ }
+
+ if err := ensureDollarKey(u); err != nil {
+ return nil, err
+ }
+
+ sess := sessionFromContext(ctx)
+
+ err = coll.client.ValidSession(sess)
+ if err != nil {
+ return nil, err
+ }
+
+ return coll.updateOrReplaceOne(ctx, f, u, sess, opts...)
+}
+
+// UpdateMany updates multiple documents in the collection.
+func (coll *Collection) UpdateMany(ctx context.Context, filter interface{}, update interface{},
+ opts ...*options.UpdateOptions) (*UpdateResult, error) {
+
+ if ctx == nil {
+ ctx = context.Background()
+ }
+
+ f, err := transformDocument(coll.registry, filter)
+ if err != nil {
+ return nil, err
+ }
+
+ u, err := transformDocument(coll.registry, update)
+ if err != nil {
+ return nil, err
+ }
+
+ if err = ensureDollarKey(u); err != nil {
+ return nil, err
+ }
+
+ updateDocs := []bsonx.Doc{
+ {
+ {"q", bsonx.Document(f)},
+ {"u", bsonx.Document(u)},
+ {"multi", bsonx.Boolean(true)},
+ },
+ }
+
+ sess := sessionFromContext(ctx)
+
+ err = coll.client.ValidSession(sess)
+ if err != nil {
+ return nil, err
+ }
+
+ wc := coll.writeConcern
+ if sess != nil && sess.TransactionRunning() {
+ wc = nil
+ }
+
+ oldns := coll.namespace()
+ cmd := command.Update{
+ NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
+ Docs: updateDocs,
+ WriteConcern: wc,
+ Session: sess,
+ Clock: coll.client.clock,
+ }
+
+ r, err := driver.Update(
+ ctx, cmd,
+ coll.client.topology,
+ coll.writeSelector,
+ coll.client.id,
+ coll.client.topology.SessionPool,
+ false,
+ opts...,
+ )
+ if err != nil && err != command.ErrUnacknowledgedWrite {
+ return nil, replaceTopologyErr(err)
+ }
+ res := &UpdateResult{
+ MatchedCount: r.MatchedCount,
+ ModifiedCount: r.ModifiedCount,
+ UpsertedCount: int64(len(r.Upserted)),
+ }
+ // TODO(skriptble): Is this correct? Do we only return the first upserted ID for an UpdateMany?
+ if len(r.Upserted) > 0 {
+ res.UpsertedID = r.Upserted[0].ID
+ res.MatchedCount--
+ }
+
+ rr, err := processWriteError(r.WriteConcernError, r.WriteErrors, err)
+ if rr&rrMany == 0 {
+ return nil, err
+ }
+ return res, err
+}
+
+// ReplaceOne replaces a single document in the collection.
+func (coll *Collection) ReplaceOne(ctx context.Context, filter interface{},
+ replacement interface{}, opts ...*options.ReplaceOptions) (*UpdateResult, error) {
+
+ if ctx == nil {
+ ctx = context.Background()
+ }
+
+ f, err := transformDocument(coll.registry, filter)
+ if err != nil {
+ return nil, err
+ }
+
+ r, err := transformDocument(coll.registry, replacement)
+ if err != nil {
+ return nil, err
+ }
+
+ if len(r) > 0 && strings.HasPrefix(r[0].Key, "$") {
+ return nil, errors.New("replacement document cannot contains keys beginning with '$")
+ }
+
+ sess := sessionFromContext(ctx)
+
+ err = coll.client.ValidSession(sess)
+ if err != nil {
+ return nil, err
+ }
+
+ updateOptions := make([]*options.UpdateOptions, 0, len(opts))
+ for _, opt := range opts {
+ uOpts := options.Update()
+ uOpts.BypassDocumentValidation = opt.BypassDocumentValidation
+ uOpts.Collation = opt.Collation
+ uOpts.Upsert = opt.Upsert
+ updateOptions = append(updateOptions, uOpts)
+ }
+
+ return coll.updateOrReplaceOne(ctx, f, r, sess, updateOptions...)
+}
+
+// Aggregate runs an aggregation framework pipeline.
+//
+// See https://docs.mongodb.com/manual/aggregation/.
+func (coll *Collection) Aggregate(ctx context.Context, pipeline interface{},
+ opts ...*options.AggregateOptions) (*Cursor, error) {
+
+ if ctx == nil {
+ ctx = context.Background()
+ }
+
+ pipelineArr, err := transformAggregatePipeline(coll.registry, pipeline)
+ if err != nil {
+ return nil, err
+ }
+
+ aggOpts := options.MergeAggregateOptions(opts...)
+
+ sess := sessionFromContext(ctx)
+
+ err = coll.client.ValidSession(sess)
+ if err != nil {
+ return nil, err
+ }
+
+ wc := coll.writeConcern
+ if sess != nil && sess.TransactionRunning() {
+ wc = nil
+ }
+
+ rc := coll.readConcern
+ if sess != nil && (sess.TransactionInProgress()) {
+ rc = nil
+ }
+
+ oldns := coll.namespace()
+ cmd := command.Aggregate{
+ NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
+ Pipeline: pipelineArr,
+ ReadPref: coll.readPreference,
+ WriteConcern: wc,
+ ReadConcern: rc,
+ Session: sess,
+ Clock: coll.client.clock,
+ }
+
+ batchCursor, err := driver.Aggregate(
+ ctx, cmd,
+ coll.client.topology,
+ coll.readSelector,
+ coll.writeSelector,
+ coll.client.id,
+ coll.client.topology.SessionPool,
+ coll.registry,
+ aggOpts,
+ )
+ if err != nil {
+ return nil, replaceTopologyErr(err)
+ }
+
+ cursor, err := newCursor(batchCursor, coll.registry)
+ return cursor, replaceTopologyErr(err)
+}
+
+// Count gets the number of documents matching the filter.
+func (coll *Collection) Count(ctx context.Context, filter interface{},
+ opts ...*options.CountOptions) (int64, error) {
+
+ if ctx == nil {
+ ctx = context.Background()
+ }
+
+ f, err := transformDocument(coll.registry, filter)
+ if err != nil {
+ return 0, err
+ }
+
+ sess := sessionFromContext(ctx)
+
+ err = coll.client.ValidSession(sess)
+ if err != nil {
+ return 0, err
+ }
+
+ rc := coll.readConcern
+ if sess != nil && (sess.TransactionInProgress()) {
+ rc = nil
+ }
+
+ oldns := coll.namespace()
+ cmd := command.Count{
+ NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
+ Query: f,
+ ReadPref: coll.readPreference,
+ ReadConcern: rc,
+ Session: sess,
+ Clock: coll.client.clock,
+ }
+
+ count, err := driver.Count(
+ ctx, cmd,
+ coll.client.topology,
+ coll.readSelector,
+ coll.client.id,
+ coll.client.topology.SessionPool,
+ coll.registry,
+ opts...,
+ )
+
+ return count, replaceTopologyErr(err)
+}
+
+// CountDocuments gets the number of documents matching the filter.
+func (coll *Collection) CountDocuments(ctx context.Context, filter interface{},
+ opts ...*options.CountOptions) (int64, error) {
+
+ if ctx == nil {
+ ctx = context.Background()
+ }
+
+ countOpts := options.MergeCountOptions(opts...)
+
+ pipelineArr, err := countDocumentsAggregatePipeline(coll.registry, filter, countOpts)
+ if err != nil {
+ return 0, err
+ }
+
+ sess := sessionFromContext(ctx)
+
+ err = coll.client.ValidSession(sess)
+ if err != nil {
+ return 0, err
+ }
+
+ rc := coll.readConcern
+ if sess != nil && (sess.TransactionInProgress()) {
+ rc = nil
+ }
+
+ oldns := coll.namespace()
+ cmd := command.CountDocuments{
+ NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
+ Pipeline: pipelineArr,
+ ReadPref: coll.readPreference,
+ ReadConcern: rc,
+ Session: sess,
+ Clock: coll.client.clock,
+ }
+
+ count, err := driver.CountDocuments(
+ ctx, cmd,
+ coll.client.topology,
+ coll.readSelector,
+ coll.client.id,
+ coll.client.topology.SessionPool,
+ coll.registry,
+ countOpts,
+ )
+
+ return count, replaceTopologyErr(err)
+}
+
+// EstimatedDocumentCount gets an estimate of the count of documents in a collection using collection metadata.
+func (coll *Collection) EstimatedDocumentCount(ctx context.Context,
+ opts ...*options.EstimatedDocumentCountOptions) (int64, error) {
+
+ if ctx == nil {
+ ctx = context.Background()
+ }
+
+ sess := sessionFromContext(ctx)
+
+ err := coll.client.ValidSession(sess)
+ if err != nil {
+ return 0, err
+ }
+
+ rc := coll.readConcern
+ if sess != nil && (sess.TransactionInProgress()) {
+ rc = nil
+ }
+
+ oldns := coll.namespace()
+ cmd := command.Count{
+ NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
+ Query: bsonx.Doc{},
+ ReadPref: coll.readPreference,
+ ReadConcern: rc,
+ Session: sess,
+ Clock: coll.client.clock,
+ }
+
+ countOpts := options.Count()
+ if len(opts) >= 1 {
+ countOpts = countOpts.SetMaxTime(*opts[len(opts)-1].MaxTime)
+ }
+
+ count, err := driver.Count(
+ ctx, cmd,
+ coll.client.topology,
+ coll.readSelector,
+ coll.client.id,
+ coll.client.topology.SessionPool,
+ coll.registry,
+ countOpts,
+ )
+
+ return count, replaceTopologyErr(err)
+}
+
+// Distinct finds the distinct values for a specified field across a single
+// collection.
+func (coll *Collection) Distinct(ctx context.Context, fieldName string, filter interface{},
+ opts ...*options.DistinctOptions) ([]interface{}, error) {
+
+ if ctx == nil {
+ ctx = context.Background()
+ }
+
+ f, err := transformDocument(coll.registry, filter)
+ if err != nil {
+ return nil, err
+ }
+
+ sess := sessionFromContext(ctx)
+
+ err = coll.client.ValidSession(sess)
+ if err != nil {
+ return nil, err
+ }
+
+ rc := coll.readConcern
+ if sess != nil && (sess.TransactionInProgress()) {
+ rc = nil
+ }
+
+ oldns := coll.namespace()
+ cmd := command.Distinct{
+ NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
+ Field: fieldName,
+ Query: f,
+ ReadPref: coll.readPreference,
+ ReadConcern: rc,
+ Session: sess,
+ Clock: coll.client.clock,
+ }
+
+ res, err := driver.Distinct(
+ ctx, cmd,
+ coll.client.topology,
+ coll.readSelector,
+ coll.client.id,
+ coll.client.topology.SessionPool,
+ opts...,
+ )
+ if err != nil {
+ return nil, replaceTopologyErr(err)
+ }
+
+ return res.Values, nil
+}
+
+// Find finds the documents matching a model.
+func (coll *Collection) Find(ctx context.Context, filter interface{},
+ opts ...*options.FindOptions) (*Cursor, error) {
+
+ if ctx == nil {
+ ctx = context.Background()
+ }
+
+ f, err := transformDocument(coll.registry, filter)
+ if err != nil {
+ return nil, err
+ }
+
+ sess := sessionFromContext(ctx)
+
+ err = coll.client.ValidSession(sess)
+ if err != nil {
+ return nil, err
+ }
+
+ rc := coll.readConcern
+ if sess != nil && (sess.TransactionInProgress()) {
+ rc = nil
+ }
+
+ oldns := coll.namespace()
+ cmd := command.Find{
+ NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
+ Filter: f,
+ ReadPref: coll.readPreference,
+ ReadConcern: rc,
+ Session: sess,
+ Clock: coll.client.clock,
+ }
+
+ batchCursor, err := driver.Find(
+ ctx, cmd,
+ coll.client.topology,
+ coll.readSelector,
+ coll.client.id,
+ coll.client.topology.SessionPool,
+ coll.registry,
+ opts...,
+ )
+ if err != nil {
+ return nil, replaceTopologyErr(err)
+ }
+
+ cursor, err := newCursor(batchCursor, coll.registry)
+ return cursor, replaceTopologyErr(err)
+}
+
+// FindOne returns up to one document that matches the model.
+func (coll *Collection) FindOne(ctx context.Context, filter interface{},
+ opts ...*options.FindOneOptions) *SingleResult {
+
+ if ctx == nil {
+ ctx = context.Background()
+ }
+
+ f, err := transformDocument(coll.registry, filter)
+ if err != nil {
+ return &SingleResult{err: err}
+ }
+
+ sess := sessionFromContext(ctx)
+
+ err = coll.client.ValidSession(sess)
+ if err != nil {
+ return &SingleResult{err: err}
+ }
+
+ rc := coll.readConcern
+ if sess != nil && (sess.TransactionInProgress()) {
+ rc = nil
+ }
+
+ oldns := coll.namespace()
+ cmd := command.Find{
+ NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
+ Filter: f,
+ ReadPref: coll.readPreference,
+ ReadConcern: rc,
+ Session: sess,
+ Clock: coll.client.clock,
+ }
+
+ findOpts := make([]*options.FindOptions, len(opts))
+ for i, opt := range opts {
+ findOpts[i] = &options.FindOptions{
+ AllowPartialResults: opt.AllowPartialResults,
+ BatchSize: opt.BatchSize,
+ Collation: opt.Collation,
+ Comment: opt.Comment,
+ CursorType: opt.CursorType,
+ Hint: opt.Hint,
+ Max: opt.Max,
+ MaxAwaitTime: opt.MaxAwaitTime,
+ Min: opt.Min,
+ NoCursorTimeout: opt.NoCursorTimeout,
+ OplogReplay: opt.OplogReplay,
+ Projection: opt.Projection,
+ ReturnKey: opt.ReturnKey,
+ ShowRecordID: opt.ShowRecordID,
+ Skip: opt.Skip,
+ Snapshot: opt.Snapshot,
+ Sort: opt.Sort,
+ }
+ }
+
+ batchCursor, err := driver.Find(
+ ctx, cmd,
+ coll.client.topology,
+ coll.readSelector,
+ coll.client.id,
+ coll.client.topology.SessionPool,
+ coll.registry,
+ findOpts...,
+ )
+ if err != nil {
+ return &SingleResult{err: replaceTopologyErr(err)}
+ }
+
+ cursor, err := newCursor(batchCursor, coll.registry)
+ return &SingleResult{cur: cursor, reg: coll.registry, err: replaceTopologyErr(err)}
+}
+
+// FindOneAndDelete find a single document and deletes it, returning the
+// original in result.
+func (coll *Collection) FindOneAndDelete(ctx context.Context, filter interface{},
+ opts ...*options.FindOneAndDeleteOptions) *SingleResult {
+
+ if ctx == nil {
+ ctx = context.Background()
+ }
+
+ f, err := transformDocument(coll.registry, filter)
+ if err != nil {
+ return &SingleResult{err: err}
+ }
+
+ sess := sessionFromContext(ctx)
+
+ err = coll.client.ValidSession(sess)
+ if err != nil {
+ return &SingleResult{err: err}
+ }
+
+ oldns := coll.namespace()
+ wc := coll.writeConcern
+ if sess != nil && sess.TransactionRunning() {
+ wc = nil
+ }
+
+ cmd := command.FindOneAndDelete{
+ NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
+ Query: f,
+ WriteConcern: wc,
+ Session: sess,
+ Clock: coll.client.clock,
+ }
+
+ res, err := driver.FindOneAndDelete(
+ ctx, cmd,
+ coll.client.topology,
+ coll.writeSelector,
+ coll.client.id,
+ coll.client.topology.SessionPool,
+ coll.client.retryWrites,
+ coll.registry,
+ opts...,
+ )
+ if err != nil {
+ return &SingleResult{err: replaceTopologyErr(err)}
+ }
+
+ return &SingleResult{rdr: res.Value, reg: coll.registry}
+}
+
+// FindOneAndReplace finds a single document and replaces it, returning either
+// the original or the replaced document.
+func (coll *Collection) FindOneAndReplace(ctx context.Context, filter interface{},
+ replacement interface{}, opts ...*options.FindOneAndReplaceOptions) *SingleResult {
+
+ if ctx == nil {
+ ctx = context.Background()
+ }
+
+ f, err := transformDocument(coll.registry, filter)
+ if err != nil {
+ return &SingleResult{err: err}
+ }
+
+ r, err := transformDocument(coll.registry, replacement)
+ if err != nil {
+ return &SingleResult{err: err}
+ }
+
+ if len(r) > 0 && strings.HasPrefix(r[0].Key, "$") {
+ return &SingleResult{err: errors.New("replacement document cannot contains keys beginning with '$")}
+ }
+
+ sess := sessionFromContext(ctx)
+
+ err = coll.client.ValidSession(sess)
+ if err != nil {
+ return &SingleResult{err: err}
+ }
+
+ wc := coll.writeConcern
+ if sess != nil && sess.TransactionRunning() {
+ wc = nil
+ }
+
+ oldns := coll.namespace()
+ cmd := command.FindOneAndReplace{
+ NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
+ Query: f,
+ Replacement: r,
+ WriteConcern: wc,
+ Session: sess,
+ Clock: coll.client.clock,
+ }
+
+ res, err := driver.FindOneAndReplace(
+ ctx, cmd,
+ coll.client.topology,
+ coll.writeSelector,
+ coll.client.id,
+ coll.client.topology.SessionPool,
+ coll.client.retryWrites,
+ coll.registry,
+ opts...,
+ )
+ if err != nil {
+ return &SingleResult{err: replaceTopologyErr(err)}
+ }
+
+ return &SingleResult{rdr: res.Value, reg: coll.registry}
+}
+
+// FindOneAndUpdate finds a single document and updates it, returning either
+// the original or the updated.
+func (coll *Collection) FindOneAndUpdate(ctx context.Context, filter interface{},
+ update interface{}, opts ...*options.FindOneAndUpdateOptions) *SingleResult {
+
+ if ctx == nil {
+ ctx = context.Background()
+ }
+
+ f, err := transformDocument(coll.registry, filter)
+ if err != nil {
+ return &SingleResult{err: err}
+ }
+
+ u, err := transformDocument(coll.registry, update)
+ if err != nil {
+ return &SingleResult{err: err}
+ }
+
+ err = ensureDollarKey(u)
+ if err != nil {
+ return &SingleResult{
+ err: err,
+ }
+ }
+
+ sess := sessionFromContext(ctx)
+
+ err = coll.client.ValidSession(sess)
+ if err != nil {
+ return &SingleResult{err: err}
+ }
+
+ wc := coll.writeConcern
+ if sess != nil && sess.TransactionRunning() {
+ wc = nil
+ }
+
+ oldns := coll.namespace()
+ cmd := command.FindOneAndUpdate{
+ NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
+ Query: f,
+ Update: u,
+ WriteConcern: wc,
+ Session: sess,
+ Clock: coll.client.clock,
+ }
+
+ res, err := driver.FindOneAndUpdate(
+ ctx, cmd,
+ coll.client.topology,
+ coll.writeSelector,
+ coll.client.id,
+ coll.client.topology.SessionPool,
+ coll.client.retryWrites,
+ coll.registry,
+ opts...,
+ )
+ if err != nil {
+ return &SingleResult{err: replaceTopologyErr(err)}
+ }
+
+ return &SingleResult{rdr: res.Value, reg: coll.registry}
+}
+
+// Watch returns a change stream cursor used to receive notifications of changes to the collection.
+//
+// This method is preferred to running a raw aggregation with a $changeStream stage because it
+// supports resumability in the case of some errors. The collection must have read concern majority or no read concern
+// for a change stream to be created successfully.
+func (coll *Collection) Watch(ctx context.Context, pipeline interface{},
+ opts ...*options.ChangeStreamOptions) (*ChangeStream, error) {
+ return newChangeStream(ctx, coll, pipeline, opts...)
+}
+
+// Indexes returns the index view for this collection.
+func (coll *Collection) Indexes() IndexView {
+ return IndexView{coll: coll}
+}
+
+// Drop drops this collection from database.
+func (coll *Collection) Drop(ctx context.Context) error {
+ if ctx == nil {
+ ctx = context.Background()
+ }
+
+ sess := sessionFromContext(ctx)
+
+ err := coll.client.ValidSession(sess)
+ if err != nil {
+ return err
+ }
+
+ wc := coll.writeConcern
+ if sess != nil && sess.TransactionRunning() {
+ wc = nil
+ }
+
+ cmd := command.DropCollection{
+ DB: coll.db.name,
+ Collection: coll.name,
+ WriteConcern: wc,
+ Session: sess,
+ Clock: coll.client.clock,
+ }
+ _, err = driver.DropCollection(
+ ctx, cmd,
+ coll.client.topology,
+ coll.writeSelector,
+ coll.client.id,
+ coll.client.topology.SessionPool,
+ )
+ if err != nil && !command.IsNotFound(err) {
+ return replaceTopologyErr(err)
+ }
+ return nil
+}