Don Newton | 379ae25 | 2019-04-01 12:17:06 -0400 | [diff] [blame^] | 1 | // Copyright (C) MongoDB, Inc. 2017-present. |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); you may |
| 4 | // not use this file except in compliance with the License. You may obtain |
| 5 | // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 |
| 6 | |
| 7 | package mongo |
| 8 | |
| 9 | import ( |
| 10 | "context" |
| 11 | |
| 12 | "github.com/mongodb/mongo-go-driver/bson/bsoncodec" |
| 13 | "github.com/mongodb/mongo-go-driver/mongo/options" |
| 14 | "github.com/mongodb/mongo-go-driver/mongo/readconcern" |
| 15 | "github.com/mongodb/mongo-go-driver/mongo/readpref" |
| 16 | "github.com/mongodb/mongo-go-driver/mongo/writeconcern" |
| 17 | "github.com/mongodb/mongo-go-driver/x/mongo/driver" |
| 18 | "github.com/mongodb/mongo-go-driver/x/network/command" |
| 19 | "github.com/mongodb/mongo-go-driver/x/network/description" |
| 20 | ) |
| 21 | |
| 22 | // Database performs operations on a given database. |
| 23 | type Database struct { |
| 24 | client *Client |
| 25 | name string |
| 26 | readConcern *readconcern.ReadConcern |
| 27 | writeConcern *writeconcern.WriteConcern |
| 28 | readPreference *readpref.ReadPref |
| 29 | readSelector description.ServerSelector |
| 30 | writeSelector description.ServerSelector |
| 31 | registry *bsoncodec.Registry |
| 32 | } |
| 33 | |
| 34 | func newDatabase(client *Client, name string, opts ...*options.DatabaseOptions) *Database { |
| 35 | dbOpt := options.MergeDatabaseOptions(opts...) |
| 36 | |
| 37 | rc := client.readConcern |
| 38 | if dbOpt.ReadConcern != nil { |
| 39 | rc = dbOpt.ReadConcern |
| 40 | } |
| 41 | |
| 42 | rp := client.readPreference |
| 43 | if dbOpt.ReadPreference != nil { |
| 44 | rp = dbOpt.ReadPreference |
| 45 | } |
| 46 | |
| 47 | wc := client.writeConcern |
| 48 | if dbOpt.WriteConcern != nil { |
| 49 | wc = dbOpt.WriteConcern |
| 50 | } |
| 51 | |
| 52 | db := &Database{ |
| 53 | client: client, |
| 54 | name: name, |
| 55 | readPreference: rp, |
| 56 | readConcern: rc, |
| 57 | writeConcern: wc, |
| 58 | registry: client.registry, |
| 59 | } |
| 60 | |
| 61 | db.readSelector = description.CompositeSelector([]description.ServerSelector{ |
| 62 | description.ReadPrefSelector(db.readPreference), |
| 63 | description.LatencySelector(db.client.localThreshold), |
| 64 | }) |
| 65 | |
| 66 | db.writeSelector = description.CompositeSelector([]description.ServerSelector{ |
| 67 | description.WriteSelector(), |
| 68 | description.LatencySelector(db.client.localThreshold), |
| 69 | }) |
| 70 | |
| 71 | return db |
| 72 | } |
| 73 | |
| 74 | // Client returns the Client the database was created from. |
| 75 | func (db *Database) Client() *Client { |
| 76 | return db.client |
| 77 | } |
| 78 | |
| 79 | // Name returns the name of the database. |
| 80 | func (db *Database) Name() string { |
| 81 | return db.name |
| 82 | } |
| 83 | |
| 84 | // Collection gets a handle for a given collection in the database. |
| 85 | func (db *Database) Collection(name string, opts ...*options.CollectionOptions) *Collection { |
| 86 | return newCollection(db, name, opts...) |
| 87 | } |
| 88 | |
| 89 | func (db *Database) processRunCommand(ctx context.Context, cmd interface{}, opts ...*options.RunCmdOptions) (command.Read, |
| 90 | description.ServerSelector, error) { |
| 91 | |
| 92 | if ctx == nil { |
| 93 | ctx = context.Background() |
| 94 | } |
| 95 | |
| 96 | sess := sessionFromContext(ctx) |
| 97 | runCmd := options.MergeRunCmdOptions(opts...) |
| 98 | |
| 99 | if err := db.client.ValidSession(sess); err != nil { |
| 100 | return command.Read{}, nil, err |
| 101 | } |
| 102 | |
| 103 | rp := runCmd.ReadPreference |
| 104 | if rp == nil { |
| 105 | if sess != nil && sess.TransactionRunning() { |
| 106 | rp = sess.CurrentRp // override with transaction read pref if specified |
| 107 | } |
| 108 | if rp == nil { |
| 109 | rp = readpref.Primary() // set to primary if nothing specified in options |
| 110 | } |
| 111 | } |
| 112 | |
| 113 | runCmdDoc, err := transformDocument(db.registry, cmd) |
| 114 | if err != nil { |
| 115 | return command.Read{}, nil, err |
| 116 | } |
| 117 | |
| 118 | readSelect := description.CompositeSelector([]description.ServerSelector{ |
| 119 | description.ReadPrefSelector(rp), |
| 120 | description.LatencySelector(db.client.localThreshold), |
| 121 | }) |
| 122 | |
| 123 | return command.Read{ |
| 124 | DB: db.Name(), |
| 125 | Command: runCmdDoc, |
| 126 | ReadPref: rp, |
| 127 | Session: sess, |
| 128 | Clock: db.client.clock, |
| 129 | }, readSelect, nil |
| 130 | } |
| 131 | |
| 132 | // RunCommand runs a command on the database. A user can supply a custom |
| 133 | // context to this method, or nil to default to context.Background(). |
| 134 | func (db *Database) RunCommand(ctx context.Context, runCommand interface{}, opts ...*options.RunCmdOptions) *SingleResult { |
| 135 | if ctx == nil { |
| 136 | ctx = context.Background() |
| 137 | } |
| 138 | |
| 139 | readCmd, readSelect, err := db.processRunCommand(ctx, runCommand, opts...) |
| 140 | if err != nil { |
| 141 | return &SingleResult{err: err} |
| 142 | } |
| 143 | |
| 144 | doc, err := driver.Read(ctx, |
| 145 | readCmd, |
| 146 | db.client.topology, |
| 147 | readSelect, |
| 148 | db.client.id, |
| 149 | db.client.topology.SessionPool, |
| 150 | ) |
| 151 | |
| 152 | return &SingleResult{err: replaceTopologyErr(err), rdr: doc, reg: db.registry} |
| 153 | } |
| 154 | |
| 155 | // RunCommandCursor runs a command on the database and returns a cursor over the resulting reader. A user can supply |
| 156 | // a custom context to this method, or nil to default to context.Background(). |
| 157 | func (db *Database) RunCommandCursor(ctx context.Context, runCommand interface{}, opts ...*options.RunCmdOptions) (*Cursor, error) { |
| 158 | if ctx == nil { |
| 159 | ctx = context.Background() |
| 160 | } |
| 161 | |
| 162 | readCmd, readSelect, err := db.processRunCommand(ctx, runCommand, opts...) |
| 163 | if err != nil { |
| 164 | return nil, err |
| 165 | } |
| 166 | |
| 167 | batchCursor, err := driver.ReadCursor( |
| 168 | ctx, |
| 169 | readCmd, |
| 170 | db.client.topology, |
| 171 | readSelect, |
| 172 | db.client.id, |
| 173 | db.client.topology.SessionPool, |
| 174 | ) |
| 175 | if err != nil { |
| 176 | return nil, replaceTopologyErr(err) |
| 177 | } |
| 178 | |
| 179 | cursor, err := newCursor(batchCursor, db.registry) |
| 180 | return cursor, replaceTopologyErr(err) |
| 181 | } |
| 182 | |
| 183 | // Drop drops this database from mongodb. |
| 184 | func (db *Database) Drop(ctx context.Context) error { |
| 185 | if ctx == nil { |
| 186 | ctx = context.Background() |
| 187 | } |
| 188 | |
| 189 | sess := sessionFromContext(ctx) |
| 190 | |
| 191 | err := db.client.ValidSession(sess) |
| 192 | if err != nil { |
| 193 | return err |
| 194 | } |
| 195 | |
| 196 | cmd := command.DropDatabase{ |
| 197 | DB: db.name, |
| 198 | Session: sess, |
| 199 | Clock: db.client.clock, |
| 200 | } |
| 201 | _, err = driver.DropDatabase( |
| 202 | ctx, cmd, |
| 203 | db.client.topology, |
| 204 | db.writeSelector, |
| 205 | db.client.id, |
| 206 | db.client.topology.SessionPool, |
| 207 | ) |
| 208 | if err != nil && !command.IsNotFound(err) { |
| 209 | return replaceTopologyErr(err) |
| 210 | } |
| 211 | return nil |
| 212 | } |
| 213 | |
| 214 | // ListCollections list collections from mongodb database. |
| 215 | func (db *Database) ListCollections(ctx context.Context, filter interface{}, opts ...*options.ListCollectionsOptions) (*Cursor, error) { |
| 216 | if ctx == nil { |
| 217 | ctx = context.Background() |
| 218 | } |
| 219 | |
| 220 | sess := sessionFromContext(ctx) |
| 221 | |
| 222 | err := db.client.ValidSession(sess) |
| 223 | if err != nil { |
| 224 | return nil, err |
| 225 | } |
| 226 | |
| 227 | filterDoc, err := transformDocument(db.registry, filter) |
| 228 | if err != nil { |
| 229 | return nil, err |
| 230 | } |
| 231 | |
| 232 | cmd := command.ListCollections{ |
| 233 | DB: db.name, |
| 234 | Filter: filterDoc, |
| 235 | ReadPref: readpref.Primary(), // list collections must be run on a primary by default |
| 236 | Session: sess, |
| 237 | Clock: db.client.clock, |
| 238 | } |
| 239 | |
| 240 | readSelector := description.CompositeSelector([]description.ServerSelector{ |
| 241 | description.ReadPrefSelector(readpref.Primary()), |
| 242 | description.LatencySelector(db.client.localThreshold), |
| 243 | }) |
| 244 | batchCursor, err := driver.ListCollections( |
| 245 | ctx, cmd, |
| 246 | db.client.topology, |
| 247 | readSelector, |
| 248 | db.client.id, |
| 249 | db.client.topology.SessionPool, |
| 250 | opts..., |
| 251 | ) |
| 252 | if err != nil { |
| 253 | return nil, replaceTopologyErr(err) |
| 254 | } |
| 255 | |
| 256 | cursor, err := newCursor(batchCursor, db.registry) |
| 257 | return cursor, replaceTopologyErr(err) |
| 258 | } |
| 259 | |
| 260 | // ReadConcern returns the read concern of this database. |
| 261 | func (db *Database) ReadConcern() *readconcern.ReadConcern { |
| 262 | return db.readConcern |
| 263 | } |
| 264 | |
| 265 | // ReadPreference returns the read preference of this database. |
| 266 | func (db *Database) ReadPreference() *readpref.ReadPref { |
| 267 | return db.readPreference |
| 268 | } |
| 269 | |
| 270 | // WriteConcern returns the write concern of this database. |
| 271 | func (db *Database) WriteConcern() *writeconcern.WriteConcern { |
| 272 | return db.writeConcern |
| 273 | } |
| 274 | |
| 275 | // Watch returns a change stream cursor used to receive information of changes to the database. This method is preferred |
| 276 | // to running a raw aggregation with a $changeStream stage because it supports resumability in the case of some errors. |
| 277 | // The database must have read concern majority or no read concern for a change stream to be created successfully. |
| 278 | func (db *Database) Watch(ctx context.Context, pipeline interface{}, |
| 279 | opts ...*options.ChangeStreamOptions) (*ChangeStream, error) { |
| 280 | |
| 281 | return newDbChangeStream(ctx, db, pipeline, opts...) |
| 282 | } |