blob: fb167753eed528345b4e58e1ccb398ae3eab93f7 [file] [log] [blame]
Don Newton379ae252019-04-01 12:17:06 -04001// Copyright (C) MongoDB, Inc. 2017-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package mongo
8
9import (
10 "context"
11 "errors"
12 "strings"
13
14 "github.com/mongodb/mongo-go-driver/bson/bsoncodec"
15 "github.com/mongodb/mongo-go-driver/mongo/options"
16 "github.com/mongodb/mongo-go-driver/mongo/readconcern"
17 "github.com/mongodb/mongo-go-driver/mongo/readpref"
18 "github.com/mongodb/mongo-go-driver/mongo/writeconcern"
19 "github.com/mongodb/mongo-go-driver/x/bsonx"
20 "github.com/mongodb/mongo-go-driver/x/mongo/driver"
21 "github.com/mongodb/mongo-go-driver/x/mongo/driver/session"
22 "github.com/mongodb/mongo-go-driver/x/network/command"
23 "github.com/mongodb/mongo-go-driver/x/network/description"
24)
25
26// Collection performs operations on a given collection.
27type Collection struct {
28 client *Client
29 db *Database
30 name string
31 readConcern *readconcern.ReadConcern
32 writeConcern *writeconcern.WriteConcern
33 readPreference *readpref.ReadPref
34 readSelector description.ServerSelector
35 writeSelector description.ServerSelector
36 registry *bsoncodec.Registry
37}
38
39func newCollection(db *Database, name string, opts ...*options.CollectionOptions) *Collection {
40 collOpt := options.MergeCollectionOptions(opts...)
41
42 rc := db.readConcern
43 if collOpt.ReadConcern != nil {
44 rc = collOpt.ReadConcern
45 }
46
47 wc := db.writeConcern
48 if collOpt.WriteConcern != nil {
49 wc = collOpt.WriteConcern
50 }
51
52 rp := db.readPreference
53 if collOpt.ReadPreference != nil {
54 rp = collOpt.ReadPreference
55 }
56
57 reg := db.registry
58 if collOpt.Registry != nil {
59 reg = collOpt.Registry
60 }
61
62 readSelector := description.CompositeSelector([]description.ServerSelector{
63 description.ReadPrefSelector(rp),
64 description.LatencySelector(db.client.localThreshold),
65 })
66
67 writeSelector := description.CompositeSelector([]description.ServerSelector{
68 description.WriteSelector(),
69 description.LatencySelector(db.client.localThreshold),
70 })
71
72 coll := &Collection{
73 client: db.client,
74 db: db,
75 name: name,
76 readPreference: rp,
77 readConcern: rc,
78 writeConcern: wc,
79 readSelector: readSelector,
80 writeSelector: writeSelector,
81 registry: reg,
82 }
83
84 return coll
85}
86
87func (coll *Collection) copy() *Collection {
88 return &Collection{
89 client: coll.client,
90 db: coll.db,
91 name: coll.name,
92 readConcern: coll.readConcern,
93 writeConcern: coll.writeConcern,
94 readPreference: coll.readPreference,
95 readSelector: coll.readSelector,
96 writeSelector: coll.writeSelector,
97 registry: coll.registry,
98 }
99}
100
101// Clone creates a copy of this collection with updated options, if any are given.
102func (coll *Collection) Clone(opts ...*options.CollectionOptions) (*Collection, error) {
103 copyColl := coll.copy()
104 optsColl := options.MergeCollectionOptions(opts...)
105
106 if optsColl.ReadConcern != nil {
107 copyColl.readConcern = optsColl.ReadConcern
108 }
109
110 if optsColl.WriteConcern != nil {
111 copyColl.writeConcern = optsColl.WriteConcern
112 }
113
114 if optsColl.ReadPreference != nil {
115 copyColl.readPreference = optsColl.ReadPreference
116 }
117
118 if optsColl.Registry != nil {
119 copyColl.registry = optsColl.Registry
120 }
121
122 copyColl.readSelector = description.CompositeSelector([]description.ServerSelector{
123 description.ReadPrefSelector(copyColl.readPreference),
124 description.LatencySelector(copyColl.client.localThreshold),
125 })
126
127 return copyColl, nil
128}
129
130// Name provides access to the name of the collection.
131func (coll *Collection) Name() string {
132 return coll.name
133}
134
135// namespace returns the namespace of the collection.
136func (coll *Collection) namespace() command.Namespace {
137 return command.NewNamespace(coll.db.name, coll.name)
138}
139
140// Database provides access to the database that contains the collection.
141func (coll *Collection) Database() *Database {
142 return coll.db
143}
144
145// BulkWrite performs a bulk write operation.
146//
147// See https://docs.mongodb.com/manual/core/bulk-write-operations/.
148func (coll *Collection) BulkWrite(ctx context.Context, models []WriteModel,
149 opts ...*options.BulkWriteOptions) (*BulkWriteResult, error) {
150
151 if len(models) == 0 {
152 return nil, ErrEmptySlice
153 }
154
155 if ctx == nil {
156 ctx = context.Background()
157 }
158
159 sess := sessionFromContext(ctx)
160
161 err := coll.client.ValidSession(sess)
162 if err != nil {
163 return nil, err
164 }
165
166 dispatchModels := make([]driver.WriteModel, len(models))
167 for i, model := range models {
168 if model == nil {
169 return nil, ErrNilDocument
170 }
171 dispatchModels[i] = model.convertModel()
172 }
173
174 res, err := driver.BulkWrite(
175 ctx,
176 coll.namespace(),
177 dispatchModels,
178 coll.client.topology,
179 coll.writeSelector,
180 coll.client.id,
181 coll.client.topology.SessionPool,
182 coll.client.retryWrites,
183 sess,
184 coll.writeConcern,
185 coll.client.clock,
186 coll.registry,
187 opts...,
188 )
189
190 if err != nil {
191 if conv, ok := err.(driver.BulkWriteException); ok {
192 return &BulkWriteResult{}, BulkWriteException{
193 WriteConcernError: convertWriteConcernError(conv.WriteConcernError),
194 WriteErrors: convertBulkWriteErrors(conv.WriteErrors),
195 }
196 }
197
198 return &BulkWriteResult{}, replaceTopologyErr(err)
199 }
200
201 return &BulkWriteResult{
202 InsertedCount: res.InsertedCount,
203 MatchedCount: res.MatchedCount,
204 ModifiedCount: res.ModifiedCount,
205 DeletedCount: res.DeletedCount,
206 UpsertedCount: res.UpsertedCount,
207 UpsertedIDs: res.UpsertedIDs,
208 }, nil
209}
210
211// InsertOne inserts a single document into the collection.
212func (coll *Collection) InsertOne(ctx context.Context, document interface{},
213 opts ...*options.InsertOneOptions) (*InsertOneResult, error) {
214
215 if ctx == nil {
216 ctx = context.Background()
217 }
218
219 doc, insertedID, err := transformAndEnsureID(coll.registry, document)
220 if err != nil {
221 return nil, err
222 }
223
224 sess := sessionFromContext(ctx)
225
226 err = coll.client.ValidSession(sess)
227 if err != nil {
228 return nil, err
229 }
230
231 wc := coll.writeConcern
232 if sess != nil && sess.TransactionRunning() {
233 wc = nil
234 }
235 oldns := coll.namespace()
236 cmd := command.Insert{
237 NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
238 Docs: []bsonx.Doc{doc},
239 WriteConcern: wc,
240 Session: sess,
241 Clock: coll.client.clock,
242 }
243
244 // convert to InsertManyOptions so these can be argued to dispatch.Insert
245 insertOpts := make([]*options.InsertManyOptions, len(opts))
246 for i, opt := range opts {
247 insertOpts[i] = options.InsertMany()
248 insertOpts[i].BypassDocumentValidation = opt.BypassDocumentValidation
249 }
250
251 res, err := driver.Insert(
252 ctx, cmd,
253 coll.client.topology,
254 coll.writeSelector,
255 coll.client.id,
256 coll.client.topology.SessionPool,
257 coll.client.retryWrites,
258 insertOpts...,
259 )
260
261 rr, err := processWriteError(res.WriteConcernError, res.WriteErrors, err)
262 if rr&rrOne == 0 {
263 return nil, err
264 }
265
266 return &InsertOneResult{InsertedID: insertedID}, err
267}
268
269// InsertMany inserts the provided documents.
270func (coll *Collection) InsertMany(ctx context.Context, documents []interface{},
271 opts ...*options.InsertManyOptions) (*InsertManyResult, error) {
272
273 if ctx == nil {
274 ctx = context.Background()
275 }
276
277 if len(documents) == 0 {
278 return nil, ErrEmptySlice
279 }
280
281 result := make([]interface{}, len(documents))
282 docs := make([]bsonx.Doc, len(documents))
283
284 for i, doc := range documents {
285 if doc == nil {
286 return nil, ErrNilDocument
287 }
288 bdoc, insertedID, err := transformAndEnsureID(coll.registry, doc)
289 if err != nil {
290 return nil, err
291 }
292
293 docs[i] = bdoc
294 result[i] = insertedID
295 }
296
297 sess := sessionFromContext(ctx)
298
299 err := coll.client.ValidSession(sess)
300 if err != nil {
301 return nil, err
302 }
303
304 wc := coll.writeConcern
305 if sess != nil && sess.TransactionRunning() {
306 wc = nil
307 }
308
309 oldns := coll.namespace()
310 cmd := command.Insert{
311 NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
312 Docs: docs,
313 WriteConcern: wc,
314 Session: sess,
315 Clock: coll.client.clock,
316 }
317
318 res, err := driver.Insert(
319 ctx, cmd,
320 coll.client.topology,
321 coll.writeSelector,
322 coll.client.id,
323 coll.client.topology.SessionPool,
324 coll.client.retryWrites,
325 opts...,
326 )
327
328 switch err {
329 case nil:
330 case command.ErrUnacknowledgedWrite:
331 return &InsertManyResult{InsertedIDs: result}, ErrUnacknowledgedWrite
332 default:
333 return nil, replaceTopologyErr(err)
334 }
335 if len(res.WriteErrors) > 0 || res.WriteConcernError != nil {
336 bwErrors := make([]BulkWriteError, 0, len(res.WriteErrors))
337 for _, we := range res.WriteErrors {
338 bwErrors = append(bwErrors, BulkWriteError{
339 WriteError{
340 Index: we.Index,
341 Code: we.Code,
342 Message: we.ErrMsg,
343 },
344 nil,
345 })
346 }
347
348 err = BulkWriteException{
349 WriteErrors: bwErrors,
350 WriteConcernError: convertWriteConcernError(res.WriteConcernError),
351 }
352 }
353
354 return &InsertManyResult{InsertedIDs: result}, err
355}
356
357// DeleteOne deletes a single document from the collection.
358func (coll *Collection) DeleteOne(ctx context.Context, filter interface{},
359 opts ...*options.DeleteOptions) (*DeleteResult, error) {
360
361 if ctx == nil {
362 ctx = context.Background()
363 }
364
365 f, err := transformDocument(coll.registry, filter)
366 if err != nil {
367 return nil, err
368 }
369 deleteDocs := []bsonx.Doc{
370 {
371 {"q", bsonx.Document(f)},
372 {"limit", bsonx.Int32(1)},
373 },
374 }
375
376 sess := sessionFromContext(ctx)
377
378 err = coll.client.ValidSession(sess)
379 if err != nil {
380 return nil, err
381 }
382
383 wc := coll.writeConcern
384 if sess != nil && sess.TransactionRunning() {
385 wc = nil
386 }
387
388 oldns := coll.namespace()
389 cmd := command.Delete{
390 NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
391 Deletes: deleteDocs,
392 WriteConcern: wc,
393 Session: sess,
394 Clock: coll.client.clock,
395 }
396
397 res, err := driver.Delete(
398 ctx, cmd,
399 coll.client.topology,
400 coll.writeSelector,
401 coll.client.id,
402 coll.client.topology.SessionPool,
403 coll.client.retryWrites,
404 opts...,
405 )
406
407 rr, err := processWriteError(res.WriteConcernError, res.WriteErrors, err)
408 if rr&rrOne == 0 {
409 return nil, err
410 }
411 return &DeleteResult{DeletedCount: int64(res.N)}, err
412}
413
414// DeleteMany deletes multiple documents from the collection.
415func (coll *Collection) DeleteMany(ctx context.Context, filter interface{},
416 opts ...*options.DeleteOptions) (*DeleteResult, error) {
417
418 if ctx == nil {
419 ctx = context.Background()
420 }
421
422 f, err := transformDocument(coll.registry, filter)
423 if err != nil {
424 return nil, err
425 }
426 deleteDocs := []bsonx.Doc{{{"q", bsonx.Document(f)}, {"limit", bsonx.Int32(0)}}}
427
428 sess := sessionFromContext(ctx)
429
430 err = coll.client.ValidSession(sess)
431 if err != nil {
432 return nil, err
433 }
434
435 wc := coll.writeConcern
436 if sess != nil && sess.TransactionRunning() {
437 wc = nil
438 }
439
440 oldns := coll.namespace()
441 cmd := command.Delete{
442 NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
443 Deletes: deleteDocs,
444 WriteConcern: wc,
445 Session: sess,
446 Clock: coll.client.clock,
447 }
448
449 res, err := driver.Delete(
450 ctx, cmd,
451 coll.client.topology,
452 coll.writeSelector,
453 coll.client.id,
454 coll.client.topology.SessionPool,
455 false,
456 opts...,
457 )
458
459 rr, err := processWriteError(res.WriteConcernError, res.WriteErrors, err)
460 if rr&rrMany == 0 {
461 return nil, err
462 }
463 return &DeleteResult{DeletedCount: int64(res.N)}, err
464}
465
466func (coll *Collection) updateOrReplaceOne(ctx context.Context, filter,
467 update bsonx.Doc, sess *session.Client, opts ...*options.UpdateOptions) (*UpdateResult, error) {
468
469 // TODO: should session be taken from ctx or left as argument?
470 if ctx == nil {
471 ctx = context.Background()
472 }
473
474 updateDocs := []bsonx.Doc{
475 {
476 {"q", bsonx.Document(filter)},
477 {"u", bsonx.Document(update)},
478 {"multi", bsonx.Boolean(false)},
479 },
480 }
481
482 wc := coll.writeConcern
483 if sess != nil && sess.TransactionRunning() {
484 wc = nil
485 }
486
487 oldns := coll.namespace()
488 cmd := command.Update{
489 NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
490 Docs: updateDocs,
491 WriteConcern: wc,
492 Session: sess,
493 Clock: coll.client.clock,
494 }
495
496 r, err := driver.Update(
497 ctx, cmd,
498 coll.client.topology,
499 coll.writeSelector,
500 coll.client.id,
501 coll.client.topology.SessionPool,
502 coll.client.retryWrites,
503 opts...,
504 )
505 if err != nil && err != command.ErrUnacknowledgedWrite {
506 return nil, replaceTopologyErr(err)
507 }
508
509 res := &UpdateResult{
510 MatchedCount: r.MatchedCount,
511 ModifiedCount: r.ModifiedCount,
512 UpsertedCount: int64(len(r.Upserted)),
513 }
514 if len(r.Upserted) > 0 {
515 res.UpsertedID = r.Upserted[0].ID
516 res.MatchedCount--
517 }
518
519 rr, err := processWriteError(r.WriteConcernError, r.WriteErrors, err)
520 if rr&rrOne == 0 {
521 return nil, err
522 }
523 return res, err
524}
525
526// UpdateOne updates a single document in the collection.
527func (coll *Collection) UpdateOne(ctx context.Context, filter interface{}, update interface{},
528 opts ...*options.UpdateOptions) (*UpdateResult, error) {
529
530 if ctx == nil {
531 ctx = context.Background()
532 }
533
534 f, err := transformDocument(coll.registry, filter)
535 if err != nil {
536 return nil, err
537 }
538
539 u, err := transformDocument(coll.registry, update)
540 if err != nil {
541 return nil, err
542 }
543
544 if err := ensureDollarKey(u); err != nil {
545 return nil, err
546 }
547
548 sess := sessionFromContext(ctx)
549
550 err = coll.client.ValidSession(sess)
551 if err != nil {
552 return nil, err
553 }
554
555 return coll.updateOrReplaceOne(ctx, f, u, sess, opts...)
556}
557
558// UpdateMany updates multiple documents in the collection.
559func (coll *Collection) UpdateMany(ctx context.Context, filter interface{}, update interface{},
560 opts ...*options.UpdateOptions) (*UpdateResult, error) {
561
562 if ctx == nil {
563 ctx = context.Background()
564 }
565
566 f, err := transformDocument(coll.registry, filter)
567 if err != nil {
568 return nil, err
569 }
570
571 u, err := transformDocument(coll.registry, update)
572 if err != nil {
573 return nil, err
574 }
575
576 if err = ensureDollarKey(u); err != nil {
577 return nil, err
578 }
579
580 updateDocs := []bsonx.Doc{
581 {
582 {"q", bsonx.Document(f)},
583 {"u", bsonx.Document(u)},
584 {"multi", bsonx.Boolean(true)},
585 },
586 }
587
588 sess := sessionFromContext(ctx)
589
590 err = coll.client.ValidSession(sess)
591 if err != nil {
592 return nil, err
593 }
594
595 wc := coll.writeConcern
596 if sess != nil && sess.TransactionRunning() {
597 wc = nil
598 }
599
600 oldns := coll.namespace()
601 cmd := command.Update{
602 NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
603 Docs: updateDocs,
604 WriteConcern: wc,
605 Session: sess,
606 Clock: coll.client.clock,
607 }
608
609 r, err := driver.Update(
610 ctx, cmd,
611 coll.client.topology,
612 coll.writeSelector,
613 coll.client.id,
614 coll.client.topology.SessionPool,
615 false,
616 opts...,
617 )
618 if err != nil && err != command.ErrUnacknowledgedWrite {
619 return nil, replaceTopologyErr(err)
620 }
621 res := &UpdateResult{
622 MatchedCount: r.MatchedCount,
623 ModifiedCount: r.ModifiedCount,
624 UpsertedCount: int64(len(r.Upserted)),
625 }
626 // TODO(skriptble): Is this correct? Do we only return the first upserted ID for an UpdateMany?
627 if len(r.Upserted) > 0 {
628 res.UpsertedID = r.Upserted[0].ID
629 res.MatchedCount--
630 }
631
632 rr, err := processWriteError(r.WriteConcernError, r.WriteErrors, err)
633 if rr&rrMany == 0 {
634 return nil, err
635 }
636 return res, err
637}
638
639// ReplaceOne replaces a single document in the collection.
640func (coll *Collection) ReplaceOne(ctx context.Context, filter interface{},
641 replacement interface{}, opts ...*options.ReplaceOptions) (*UpdateResult, error) {
642
643 if ctx == nil {
644 ctx = context.Background()
645 }
646
647 f, err := transformDocument(coll.registry, filter)
648 if err != nil {
649 return nil, err
650 }
651
652 r, err := transformDocument(coll.registry, replacement)
653 if err != nil {
654 return nil, err
655 }
656
657 if len(r) > 0 && strings.HasPrefix(r[0].Key, "$") {
658 return nil, errors.New("replacement document cannot contains keys beginning with '$")
659 }
660
661 sess := sessionFromContext(ctx)
662
663 err = coll.client.ValidSession(sess)
664 if err != nil {
665 return nil, err
666 }
667
668 updateOptions := make([]*options.UpdateOptions, 0, len(opts))
669 for _, opt := range opts {
670 uOpts := options.Update()
671 uOpts.BypassDocumentValidation = opt.BypassDocumentValidation
672 uOpts.Collation = opt.Collation
673 uOpts.Upsert = opt.Upsert
674 updateOptions = append(updateOptions, uOpts)
675 }
676
677 return coll.updateOrReplaceOne(ctx, f, r, sess, updateOptions...)
678}
679
680// Aggregate runs an aggregation framework pipeline.
681//
682// See https://docs.mongodb.com/manual/aggregation/.
683func (coll *Collection) Aggregate(ctx context.Context, pipeline interface{},
684 opts ...*options.AggregateOptions) (*Cursor, error) {
685
686 if ctx == nil {
687 ctx = context.Background()
688 }
689
690 pipelineArr, err := transformAggregatePipeline(coll.registry, pipeline)
691 if err != nil {
692 return nil, err
693 }
694
695 aggOpts := options.MergeAggregateOptions(opts...)
696
697 sess := sessionFromContext(ctx)
698
699 err = coll.client.ValidSession(sess)
700 if err != nil {
701 return nil, err
702 }
703
704 wc := coll.writeConcern
705 if sess != nil && sess.TransactionRunning() {
706 wc = nil
707 }
708
709 rc := coll.readConcern
710 if sess != nil && (sess.TransactionInProgress()) {
711 rc = nil
712 }
713
714 oldns := coll.namespace()
715 cmd := command.Aggregate{
716 NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
717 Pipeline: pipelineArr,
718 ReadPref: coll.readPreference,
719 WriteConcern: wc,
720 ReadConcern: rc,
721 Session: sess,
722 Clock: coll.client.clock,
723 }
724
725 batchCursor, err := driver.Aggregate(
726 ctx, cmd,
727 coll.client.topology,
728 coll.readSelector,
729 coll.writeSelector,
730 coll.client.id,
731 coll.client.topology.SessionPool,
732 coll.registry,
733 aggOpts,
734 )
735 if err != nil {
736 return nil, replaceTopologyErr(err)
737 }
738
739 cursor, err := newCursor(batchCursor, coll.registry)
740 return cursor, replaceTopologyErr(err)
741}
742
743// Count gets the number of documents matching the filter.
744func (coll *Collection) Count(ctx context.Context, filter interface{},
745 opts ...*options.CountOptions) (int64, error) {
746
747 if ctx == nil {
748 ctx = context.Background()
749 }
750
751 f, err := transformDocument(coll.registry, filter)
752 if err != nil {
753 return 0, err
754 }
755
756 sess := sessionFromContext(ctx)
757
758 err = coll.client.ValidSession(sess)
759 if err != nil {
760 return 0, err
761 }
762
763 rc := coll.readConcern
764 if sess != nil && (sess.TransactionInProgress()) {
765 rc = nil
766 }
767
768 oldns := coll.namespace()
769 cmd := command.Count{
770 NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
771 Query: f,
772 ReadPref: coll.readPreference,
773 ReadConcern: rc,
774 Session: sess,
775 Clock: coll.client.clock,
776 }
777
778 count, err := driver.Count(
779 ctx, cmd,
780 coll.client.topology,
781 coll.readSelector,
782 coll.client.id,
783 coll.client.topology.SessionPool,
784 coll.registry,
785 opts...,
786 )
787
788 return count, replaceTopologyErr(err)
789}
790
791// CountDocuments gets the number of documents matching the filter.
792func (coll *Collection) CountDocuments(ctx context.Context, filter interface{},
793 opts ...*options.CountOptions) (int64, error) {
794
795 if ctx == nil {
796 ctx = context.Background()
797 }
798
799 countOpts := options.MergeCountOptions(opts...)
800
801 pipelineArr, err := countDocumentsAggregatePipeline(coll.registry, filter, countOpts)
802 if err != nil {
803 return 0, err
804 }
805
806 sess := sessionFromContext(ctx)
807
808 err = coll.client.ValidSession(sess)
809 if err != nil {
810 return 0, err
811 }
812
813 rc := coll.readConcern
814 if sess != nil && (sess.TransactionInProgress()) {
815 rc = nil
816 }
817
818 oldns := coll.namespace()
819 cmd := command.CountDocuments{
820 NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
821 Pipeline: pipelineArr,
822 ReadPref: coll.readPreference,
823 ReadConcern: rc,
824 Session: sess,
825 Clock: coll.client.clock,
826 }
827
828 count, err := driver.CountDocuments(
829 ctx, cmd,
830 coll.client.topology,
831 coll.readSelector,
832 coll.client.id,
833 coll.client.topology.SessionPool,
834 coll.registry,
835 countOpts,
836 )
837
838 return count, replaceTopologyErr(err)
839}
840
841// EstimatedDocumentCount gets an estimate of the count of documents in a collection using collection metadata.
842func (coll *Collection) EstimatedDocumentCount(ctx context.Context,
843 opts ...*options.EstimatedDocumentCountOptions) (int64, error) {
844
845 if ctx == nil {
846 ctx = context.Background()
847 }
848
849 sess := sessionFromContext(ctx)
850
851 err := coll.client.ValidSession(sess)
852 if err != nil {
853 return 0, err
854 }
855
856 rc := coll.readConcern
857 if sess != nil && (sess.TransactionInProgress()) {
858 rc = nil
859 }
860
861 oldns := coll.namespace()
862 cmd := command.Count{
863 NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
864 Query: bsonx.Doc{},
865 ReadPref: coll.readPreference,
866 ReadConcern: rc,
867 Session: sess,
868 Clock: coll.client.clock,
869 }
870
871 countOpts := options.Count()
872 if len(opts) >= 1 {
873 countOpts = countOpts.SetMaxTime(*opts[len(opts)-1].MaxTime)
874 }
875
876 count, err := driver.Count(
877 ctx, cmd,
878 coll.client.topology,
879 coll.readSelector,
880 coll.client.id,
881 coll.client.topology.SessionPool,
882 coll.registry,
883 countOpts,
884 )
885
886 return count, replaceTopologyErr(err)
887}
888
889// Distinct finds the distinct values for a specified field across a single
890// collection.
891func (coll *Collection) Distinct(ctx context.Context, fieldName string, filter interface{},
892 opts ...*options.DistinctOptions) ([]interface{}, error) {
893
894 if ctx == nil {
895 ctx = context.Background()
896 }
897
898 f, err := transformDocument(coll.registry, filter)
899 if err != nil {
900 return nil, err
901 }
902
903 sess := sessionFromContext(ctx)
904
905 err = coll.client.ValidSession(sess)
906 if err != nil {
907 return nil, err
908 }
909
910 rc := coll.readConcern
911 if sess != nil && (sess.TransactionInProgress()) {
912 rc = nil
913 }
914
915 oldns := coll.namespace()
916 cmd := command.Distinct{
917 NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
918 Field: fieldName,
919 Query: f,
920 ReadPref: coll.readPreference,
921 ReadConcern: rc,
922 Session: sess,
923 Clock: coll.client.clock,
924 }
925
926 res, err := driver.Distinct(
927 ctx, cmd,
928 coll.client.topology,
929 coll.readSelector,
930 coll.client.id,
931 coll.client.topology.SessionPool,
932 opts...,
933 )
934 if err != nil {
935 return nil, replaceTopologyErr(err)
936 }
937
938 return res.Values, nil
939}
940
941// Find finds the documents matching a model.
942func (coll *Collection) Find(ctx context.Context, filter interface{},
943 opts ...*options.FindOptions) (*Cursor, error) {
944
945 if ctx == nil {
946 ctx = context.Background()
947 }
948
949 f, err := transformDocument(coll.registry, filter)
950 if err != nil {
951 return nil, err
952 }
953
954 sess := sessionFromContext(ctx)
955
956 err = coll.client.ValidSession(sess)
957 if err != nil {
958 return nil, err
959 }
960
961 rc := coll.readConcern
962 if sess != nil && (sess.TransactionInProgress()) {
963 rc = nil
964 }
965
966 oldns := coll.namespace()
967 cmd := command.Find{
968 NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
969 Filter: f,
970 ReadPref: coll.readPreference,
971 ReadConcern: rc,
972 Session: sess,
973 Clock: coll.client.clock,
974 }
975
976 batchCursor, err := driver.Find(
977 ctx, cmd,
978 coll.client.topology,
979 coll.readSelector,
980 coll.client.id,
981 coll.client.topology.SessionPool,
982 coll.registry,
983 opts...,
984 )
985 if err != nil {
986 return nil, replaceTopologyErr(err)
987 }
988
989 cursor, err := newCursor(batchCursor, coll.registry)
990 return cursor, replaceTopologyErr(err)
991}
992
993// FindOne returns up to one document that matches the model.
994func (coll *Collection) FindOne(ctx context.Context, filter interface{},
995 opts ...*options.FindOneOptions) *SingleResult {
996
997 if ctx == nil {
998 ctx = context.Background()
999 }
1000
1001 f, err := transformDocument(coll.registry, filter)
1002 if err != nil {
1003 return &SingleResult{err: err}
1004 }
1005
1006 sess := sessionFromContext(ctx)
1007
1008 err = coll.client.ValidSession(sess)
1009 if err != nil {
1010 return &SingleResult{err: err}
1011 }
1012
1013 rc := coll.readConcern
1014 if sess != nil && (sess.TransactionInProgress()) {
1015 rc = nil
1016 }
1017
1018 oldns := coll.namespace()
1019 cmd := command.Find{
1020 NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
1021 Filter: f,
1022 ReadPref: coll.readPreference,
1023 ReadConcern: rc,
1024 Session: sess,
1025 Clock: coll.client.clock,
1026 }
1027
1028 findOpts := make([]*options.FindOptions, len(opts))
1029 for i, opt := range opts {
1030 findOpts[i] = &options.FindOptions{
1031 AllowPartialResults: opt.AllowPartialResults,
1032 BatchSize: opt.BatchSize,
1033 Collation: opt.Collation,
1034 Comment: opt.Comment,
1035 CursorType: opt.CursorType,
1036 Hint: opt.Hint,
1037 Max: opt.Max,
1038 MaxAwaitTime: opt.MaxAwaitTime,
1039 Min: opt.Min,
1040 NoCursorTimeout: opt.NoCursorTimeout,
1041 OplogReplay: opt.OplogReplay,
1042 Projection: opt.Projection,
1043 ReturnKey: opt.ReturnKey,
1044 ShowRecordID: opt.ShowRecordID,
1045 Skip: opt.Skip,
1046 Snapshot: opt.Snapshot,
1047 Sort: opt.Sort,
1048 }
1049 }
1050
1051 batchCursor, err := driver.Find(
1052 ctx, cmd,
1053 coll.client.topology,
1054 coll.readSelector,
1055 coll.client.id,
1056 coll.client.topology.SessionPool,
1057 coll.registry,
1058 findOpts...,
1059 )
1060 if err != nil {
1061 return &SingleResult{err: replaceTopologyErr(err)}
1062 }
1063
1064 cursor, err := newCursor(batchCursor, coll.registry)
1065 return &SingleResult{cur: cursor, reg: coll.registry, err: replaceTopologyErr(err)}
1066}
1067
1068// FindOneAndDelete find a single document and deletes it, returning the
1069// original in result.
1070func (coll *Collection) FindOneAndDelete(ctx context.Context, filter interface{},
1071 opts ...*options.FindOneAndDeleteOptions) *SingleResult {
1072
1073 if ctx == nil {
1074 ctx = context.Background()
1075 }
1076
1077 f, err := transformDocument(coll.registry, filter)
1078 if err != nil {
1079 return &SingleResult{err: err}
1080 }
1081
1082 sess := sessionFromContext(ctx)
1083
1084 err = coll.client.ValidSession(sess)
1085 if err != nil {
1086 return &SingleResult{err: err}
1087 }
1088
1089 oldns := coll.namespace()
1090 wc := coll.writeConcern
1091 if sess != nil && sess.TransactionRunning() {
1092 wc = nil
1093 }
1094
1095 cmd := command.FindOneAndDelete{
1096 NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
1097 Query: f,
1098 WriteConcern: wc,
1099 Session: sess,
1100 Clock: coll.client.clock,
1101 }
1102
1103 res, err := driver.FindOneAndDelete(
1104 ctx, cmd,
1105 coll.client.topology,
1106 coll.writeSelector,
1107 coll.client.id,
1108 coll.client.topology.SessionPool,
1109 coll.client.retryWrites,
1110 coll.registry,
1111 opts...,
1112 )
1113 if err != nil {
1114 return &SingleResult{err: replaceTopologyErr(err)}
1115 }
1116
1117 return &SingleResult{rdr: res.Value, reg: coll.registry}
1118}
1119
1120// FindOneAndReplace finds a single document and replaces it, returning either
1121// the original or the replaced document.
1122func (coll *Collection) FindOneAndReplace(ctx context.Context, filter interface{},
1123 replacement interface{}, opts ...*options.FindOneAndReplaceOptions) *SingleResult {
1124
1125 if ctx == nil {
1126 ctx = context.Background()
1127 }
1128
1129 f, err := transformDocument(coll.registry, filter)
1130 if err != nil {
1131 return &SingleResult{err: err}
1132 }
1133
1134 r, err := transformDocument(coll.registry, replacement)
1135 if err != nil {
1136 return &SingleResult{err: err}
1137 }
1138
1139 if len(r) > 0 && strings.HasPrefix(r[0].Key, "$") {
1140 return &SingleResult{err: errors.New("replacement document cannot contains keys beginning with '$")}
1141 }
1142
1143 sess := sessionFromContext(ctx)
1144
1145 err = coll.client.ValidSession(sess)
1146 if err != nil {
1147 return &SingleResult{err: err}
1148 }
1149
1150 wc := coll.writeConcern
1151 if sess != nil && sess.TransactionRunning() {
1152 wc = nil
1153 }
1154
1155 oldns := coll.namespace()
1156 cmd := command.FindOneAndReplace{
1157 NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
1158 Query: f,
1159 Replacement: r,
1160 WriteConcern: wc,
1161 Session: sess,
1162 Clock: coll.client.clock,
1163 }
1164
1165 res, err := driver.FindOneAndReplace(
1166 ctx, cmd,
1167 coll.client.topology,
1168 coll.writeSelector,
1169 coll.client.id,
1170 coll.client.topology.SessionPool,
1171 coll.client.retryWrites,
1172 coll.registry,
1173 opts...,
1174 )
1175 if err != nil {
1176 return &SingleResult{err: replaceTopologyErr(err)}
1177 }
1178
1179 return &SingleResult{rdr: res.Value, reg: coll.registry}
1180}
1181
1182// FindOneAndUpdate finds a single document and updates it, returning either
1183// the original or the updated.
1184func (coll *Collection) FindOneAndUpdate(ctx context.Context, filter interface{},
1185 update interface{}, opts ...*options.FindOneAndUpdateOptions) *SingleResult {
1186
1187 if ctx == nil {
1188 ctx = context.Background()
1189 }
1190
1191 f, err := transformDocument(coll.registry, filter)
1192 if err != nil {
1193 return &SingleResult{err: err}
1194 }
1195
1196 u, err := transformDocument(coll.registry, update)
1197 if err != nil {
1198 return &SingleResult{err: err}
1199 }
1200
1201 err = ensureDollarKey(u)
1202 if err != nil {
1203 return &SingleResult{
1204 err: err,
1205 }
1206 }
1207
1208 sess := sessionFromContext(ctx)
1209
1210 err = coll.client.ValidSession(sess)
1211 if err != nil {
1212 return &SingleResult{err: err}
1213 }
1214
1215 wc := coll.writeConcern
1216 if sess != nil && sess.TransactionRunning() {
1217 wc = nil
1218 }
1219
1220 oldns := coll.namespace()
1221 cmd := command.FindOneAndUpdate{
1222 NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
1223 Query: f,
1224 Update: u,
1225 WriteConcern: wc,
1226 Session: sess,
1227 Clock: coll.client.clock,
1228 }
1229
1230 res, err := driver.FindOneAndUpdate(
1231 ctx, cmd,
1232 coll.client.topology,
1233 coll.writeSelector,
1234 coll.client.id,
1235 coll.client.topology.SessionPool,
1236 coll.client.retryWrites,
1237 coll.registry,
1238 opts...,
1239 )
1240 if err != nil {
1241 return &SingleResult{err: replaceTopologyErr(err)}
1242 }
1243
1244 return &SingleResult{rdr: res.Value, reg: coll.registry}
1245}
1246
1247// Watch returns a change stream cursor used to receive notifications of changes to the collection.
1248//
1249// This method is preferred to running a raw aggregation with a $changeStream stage because it
1250// supports resumability in the case of some errors. The collection must have read concern majority or no read concern
1251// for a change stream to be created successfully.
1252func (coll *Collection) Watch(ctx context.Context, pipeline interface{},
1253 opts ...*options.ChangeStreamOptions) (*ChangeStream, error) {
1254 return newChangeStream(ctx, coll, pipeline, opts...)
1255}
1256
1257// Indexes returns the index view for this collection.
1258func (coll *Collection) Indexes() IndexView {
1259 return IndexView{coll: coll}
1260}
1261
1262// Drop drops this collection from database.
1263func (coll *Collection) Drop(ctx context.Context) error {
1264 if ctx == nil {
1265 ctx = context.Background()
1266 }
1267
1268 sess := sessionFromContext(ctx)
1269
1270 err := coll.client.ValidSession(sess)
1271 if err != nil {
1272 return err
1273 }
1274
1275 wc := coll.writeConcern
1276 if sess != nil && sess.TransactionRunning() {
1277 wc = nil
1278 }
1279
1280 cmd := command.DropCollection{
1281 DB: coll.db.name,
1282 Collection: coll.name,
1283 WriteConcern: wc,
1284 Session: sess,
1285 Clock: coll.client.clock,
1286 }
1287 _, err = driver.DropCollection(
1288 ctx, cmd,
1289 coll.client.topology,
1290 coll.writeSelector,
1291 coll.client.id,
1292 coll.client.topology.SessionPool,
1293 )
1294 if err != nil && !command.IsNotFound(err) {
1295 return replaceTopologyErr(err)
1296 }
1297 return nil
1298}