| // Copyright (C) MongoDB, Inc. 2017-present. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); you may |
| // not use this file except in compliance with the License. You may obtain |
| // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 |
| |
| package mongo |
| |
| import ( |
| "context" |
| |
| "github.com/mongodb/mongo-go-driver/bson/bsoncodec" |
| "github.com/mongodb/mongo-go-driver/mongo/options" |
| "github.com/mongodb/mongo-go-driver/mongo/readconcern" |
| "github.com/mongodb/mongo-go-driver/mongo/readpref" |
| "github.com/mongodb/mongo-go-driver/mongo/writeconcern" |
| "github.com/mongodb/mongo-go-driver/x/mongo/driver" |
| "github.com/mongodb/mongo-go-driver/x/network/command" |
| "github.com/mongodb/mongo-go-driver/x/network/description" |
| ) |
| |
| // Database performs operations on a given database. |
| type Database struct { |
| client *Client |
| name string |
| readConcern *readconcern.ReadConcern |
| writeConcern *writeconcern.WriteConcern |
| readPreference *readpref.ReadPref |
| readSelector description.ServerSelector |
| writeSelector description.ServerSelector |
| registry *bsoncodec.Registry |
| } |
| |
| func newDatabase(client *Client, name string, opts ...*options.DatabaseOptions) *Database { |
| dbOpt := options.MergeDatabaseOptions(opts...) |
| |
| rc := client.readConcern |
| if dbOpt.ReadConcern != nil { |
| rc = dbOpt.ReadConcern |
| } |
| |
| rp := client.readPreference |
| if dbOpt.ReadPreference != nil { |
| rp = dbOpt.ReadPreference |
| } |
| |
| wc := client.writeConcern |
| if dbOpt.WriteConcern != nil { |
| wc = dbOpt.WriteConcern |
| } |
| |
| db := &Database{ |
| client: client, |
| name: name, |
| readPreference: rp, |
| readConcern: rc, |
| writeConcern: wc, |
| registry: client.registry, |
| } |
| |
| db.readSelector = description.CompositeSelector([]description.ServerSelector{ |
| description.ReadPrefSelector(db.readPreference), |
| description.LatencySelector(db.client.localThreshold), |
| }) |
| |
| db.writeSelector = description.CompositeSelector([]description.ServerSelector{ |
| description.WriteSelector(), |
| description.LatencySelector(db.client.localThreshold), |
| }) |
| |
| return db |
| } |
| |
| // Client returns the Client the database was created from. |
| func (db *Database) Client() *Client { |
| return db.client |
| } |
| |
| // Name returns the name of the database. |
| func (db *Database) Name() string { |
| return db.name |
| } |
| |
| // Collection gets a handle for a given collection in the database. |
| func (db *Database) Collection(name string, opts ...*options.CollectionOptions) *Collection { |
| return newCollection(db, name, opts...) |
| } |
| |
| func (db *Database) processRunCommand(ctx context.Context, cmd interface{}, opts ...*options.RunCmdOptions) (command.Read, |
| description.ServerSelector, error) { |
| |
| if ctx == nil { |
| ctx = context.Background() |
| } |
| |
| sess := sessionFromContext(ctx) |
| runCmd := options.MergeRunCmdOptions(opts...) |
| |
| if err := db.client.ValidSession(sess); err != nil { |
| return command.Read{}, nil, err |
| } |
| |
| rp := runCmd.ReadPreference |
| if rp == nil { |
| if sess != nil && sess.TransactionRunning() { |
| rp = sess.CurrentRp // override with transaction read pref if specified |
| } |
| if rp == nil { |
| rp = readpref.Primary() // set to primary if nothing specified in options |
| } |
| } |
| |
| runCmdDoc, err := transformDocument(db.registry, cmd) |
| if err != nil { |
| return command.Read{}, nil, err |
| } |
| |
| readSelect := description.CompositeSelector([]description.ServerSelector{ |
| description.ReadPrefSelector(rp), |
| description.LatencySelector(db.client.localThreshold), |
| }) |
| |
| return command.Read{ |
| DB: db.Name(), |
| Command: runCmdDoc, |
| ReadPref: rp, |
| Session: sess, |
| Clock: db.client.clock, |
| }, readSelect, nil |
| } |
| |
| // RunCommand runs a command on the database. A user can supply a custom |
| // context to this method, or nil to default to context.Background(). |
| func (db *Database) RunCommand(ctx context.Context, runCommand interface{}, opts ...*options.RunCmdOptions) *SingleResult { |
| if ctx == nil { |
| ctx = context.Background() |
| } |
| |
| readCmd, readSelect, err := db.processRunCommand(ctx, runCommand, opts...) |
| if err != nil { |
| return &SingleResult{err: err} |
| } |
| |
| doc, err := driver.Read(ctx, |
| readCmd, |
| db.client.topology, |
| readSelect, |
| db.client.id, |
| db.client.topology.SessionPool, |
| ) |
| |
| return &SingleResult{err: replaceTopologyErr(err), rdr: doc, reg: db.registry} |
| } |
| |
| // RunCommandCursor runs a command on the database and returns a cursor over the resulting reader. A user can supply |
| // a custom context to this method, or nil to default to context.Background(). |
| func (db *Database) RunCommandCursor(ctx context.Context, runCommand interface{}, opts ...*options.RunCmdOptions) (*Cursor, error) { |
| if ctx == nil { |
| ctx = context.Background() |
| } |
| |
| readCmd, readSelect, err := db.processRunCommand(ctx, runCommand, opts...) |
| if err != nil { |
| return nil, err |
| } |
| |
| batchCursor, err := driver.ReadCursor( |
| ctx, |
| readCmd, |
| db.client.topology, |
| readSelect, |
| db.client.id, |
| db.client.topology.SessionPool, |
| ) |
| if err != nil { |
| return nil, replaceTopologyErr(err) |
| } |
| |
| cursor, err := newCursor(batchCursor, db.registry) |
| return cursor, replaceTopologyErr(err) |
| } |
| |
| // Drop drops this database from mongodb. |
| func (db *Database) Drop(ctx context.Context) error { |
| if ctx == nil { |
| ctx = context.Background() |
| } |
| |
| sess := sessionFromContext(ctx) |
| |
| err := db.client.ValidSession(sess) |
| if err != nil { |
| return err |
| } |
| |
| cmd := command.DropDatabase{ |
| DB: db.name, |
| Session: sess, |
| Clock: db.client.clock, |
| } |
| _, err = driver.DropDatabase( |
| ctx, cmd, |
| db.client.topology, |
| db.writeSelector, |
| db.client.id, |
| db.client.topology.SessionPool, |
| ) |
| if err != nil && !command.IsNotFound(err) { |
| return replaceTopologyErr(err) |
| } |
| return nil |
| } |
| |
| // ListCollections list collections from mongodb database. |
| func (db *Database) ListCollections(ctx context.Context, filter interface{}, opts ...*options.ListCollectionsOptions) (*Cursor, error) { |
| if ctx == nil { |
| ctx = context.Background() |
| } |
| |
| sess := sessionFromContext(ctx) |
| |
| err := db.client.ValidSession(sess) |
| if err != nil { |
| return nil, err |
| } |
| |
| filterDoc, err := transformDocument(db.registry, filter) |
| if err != nil { |
| return nil, err |
| } |
| |
| cmd := command.ListCollections{ |
| DB: db.name, |
| Filter: filterDoc, |
| ReadPref: readpref.Primary(), // list collections must be run on a primary by default |
| Session: sess, |
| Clock: db.client.clock, |
| } |
| |
| readSelector := description.CompositeSelector([]description.ServerSelector{ |
| description.ReadPrefSelector(readpref.Primary()), |
| description.LatencySelector(db.client.localThreshold), |
| }) |
| batchCursor, err := driver.ListCollections( |
| ctx, cmd, |
| db.client.topology, |
| readSelector, |
| db.client.id, |
| db.client.topology.SessionPool, |
| opts..., |
| ) |
| if err != nil { |
| return nil, replaceTopologyErr(err) |
| } |
| |
| cursor, err := newCursor(batchCursor, db.registry) |
| return cursor, replaceTopologyErr(err) |
| } |
| |
| // ReadConcern returns the read concern of this database. |
| func (db *Database) ReadConcern() *readconcern.ReadConcern { |
| return db.readConcern |
| } |
| |
| // ReadPreference returns the read preference of this database. |
| func (db *Database) ReadPreference() *readpref.ReadPref { |
| return db.readPreference |
| } |
| |
| // WriteConcern returns the write concern of this database. |
| func (db *Database) WriteConcern() *writeconcern.WriteConcern { |
| return db.writeConcern |
| } |
| |
| // Watch returns a change stream cursor used to receive information of changes to the database. This method is preferred |
| // to running a raw aggregation with a $changeStream stage because it supports resumability in the case of some errors. |
| // The database must have read concern majority or no read concern for a change stream to be created successfully. |
| func (db *Database) Watch(ctx context.Context, pipeline interface{}, |
| opts ...*options.ChangeStreamOptions) (*ChangeStream, error) { |
| |
| return newDbChangeStream(ctx, db, pipeline, opts...) |
| } |