blob: 9575e0691136a816470714344e673d0cfcfeba30 [file] [log] [blame]
Don Newton379ae252019-04-01 12:17:06 -04001// Copyright (C) MongoDB, Inc. 2017-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package mongo
8
9import (
10 "context"
11
12 "github.com/mongodb/mongo-go-driver/bson/bsoncodec"
13 "github.com/mongodb/mongo-go-driver/mongo/options"
14 "github.com/mongodb/mongo-go-driver/mongo/readconcern"
15 "github.com/mongodb/mongo-go-driver/mongo/readpref"
16 "github.com/mongodb/mongo-go-driver/mongo/writeconcern"
17 "github.com/mongodb/mongo-go-driver/x/mongo/driver"
18 "github.com/mongodb/mongo-go-driver/x/network/command"
19 "github.com/mongodb/mongo-go-driver/x/network/description"
20)
21
22// Database performs operations on a given database.
23type Database struct {
24 client *Client
25 name string
26 readConcern *readconcern.ReadConcern
27 writeConcern *writeconcern.WriteConcern
28 readPreference *readpref.ReadPref
29 readSelector description.ServerSelector
30 writeSelector description.ServerSelector
31 registry *bsoncodec.Registry
32}
33
34func newDatabase(client *Client, name string, opts ...*options.DatabaseOptions) *Database {
35 dbOpt := options.MergeDatabaseOptions(opts...)
36
37 rc := client.readConcern
38 if dbOpt.ReadConcern != nil {
39 rc = dbOpt.ReadConcern
40 }
41
42 rp := client.readPreference
43 if dbOpt.ReadPreference != nil {
44 rp = dbOpt.ReadPreference
45 }
46
47 wc := client.writeConcern
48 if dbOpt.WriteConcern != nil {
49 wc = dbOpt.WriteConcern
50 }
51
52 db := &Database{
53 client: client,
54 name: name,
55 readPreference: rp,
56 readConcern: rc,
57 writeConcern: wc,
58 registry: client.registry,
59 }
60
61 db.readSelector = description.CompositeSelector([]description.ServerSelector{
62 description.ReadPrefSelector(db.readPreference),
63 description.LatencySelector(db.client.localThreshold),
64 })
65
66 db.writeSelector = description.CompositeSelector([]description.ServerSelector{
67 description.WriteSelector(),
68 description.LatencySelector(db.client.localThreshold),
69 })
70
71 return db
72}
73
74// Client returns the Client the database was created from.
75func (db *Database) Client() *Client {
76 return db.client
77}
78
79// Name returns the name of the database.
80func (db *Database) Name() string {
81 return db.name
82}
83
84// Collection gets a handle for a given collection in the database.
85func (db *Database) Collection(name string, opts ...*options.CollectionOptions) *Collection {
86 return newCollection(db, name, opts...)
87}
88
89func (db *Database) processRunCommand(ctx context.Context, cmd interface{}, opts ...*options.RunCmdOptions) (command.Read,
90 description.ServerSelector, error) {
91
92 if ctx == nil {
93 ctx = context.Background()
94 }
95
96 sess := sessionFromContext(ctx)
97 runCmd := options.MergeRunCmdOptions(opts...)
98
99 if err := db.client.ValidSession(sess); err != nil {
100 return command.Read{}, nil, err
101 }
102
103 rp := runCmd.ReadPreference
104 if rp == nil {
105 if sess != nil && sess.TransactionRunning() {
106 rp = sess.CurrentRp // override with transaction read pref if specified
107 }
108 if rp == nil {
109 rp = readpref.Primary() // set to primary if nothing specified in options
110 }
111 }
112
113 runCmdDoc, err := transformDocument(db.registry, cmd)
114 if err != nil {
115 return command.Read{}, nil, err
116 }
117
118 readSelect := description.CompositeSelector([]description.ServerSelector{
119 description.ReadPrefSelector(rp),
120 description.LatencySelector(db.client.localThreshold),
121 })
122
123 return command.Read{
124 DB: db.Name(),
125 Command: runCmdDoc,
126 ReadPref: rp,
127 Session: sess,
128 Clock: db.client.clock,
129 }, readSelect, nil
130}
131
132// RunCommand runs a command on the database. A user can supply a custom
133// context to this method, or nil to default to context.Background().
134func (db *Database) RunCommand(ctx context.Context, runCommand interface{}, opts ...*options.RunCmdOptions) *SingleResult {
135 if ctx == nil {
136 ctx = context.Background()
137 }
138
139 readCmd, readSelect, err := db.processRunCommand(ctx, runCommand, opts...)
140 if err != nil {
141 return &SingleResult{err: err}
142 }
143
144 doc, err := driver.Read(ctx,
145 readCmd,
146 db.client.topology,
147 readSelect,
148 db.client.id,
149 db.client.topology.SessionPool,
150 )
151
152 return &SingleResult{err: replaceTopologyErr(err), rdr: doc, reg: db.registry}
153}
154
155// RunCommandCursor runs a command on the database and returns a cursor over the resulting reader. A user can supply
156// a custom context to this method, or nil to default to context.Background().
157func (db *Database) RunCommandCursor(ctx context.Context, runCommand interface{}, opts ...*options.RunCmdOptions) (*Cursor, error) {
158 if ctx == nil {
159 ctx = context.Background()
160 }
161
162 readCmd, readSelect, err := db.processRunCommand(ctx, runCommand, opts...)
163 if err != nil {
164 return nil, err
165 }
166
167 batchCursor, err := driver.ReadCursor(
168 ctx,
169 readCmd,
170 db.client.topology,
171 readSelect,
172 db.client.id,
173 db.client.topology.SessionPool,
174 )
175 if err != nil {
176 return nil, replaceTopologyErr(err)
177 }
178
179 cursor, err := newCursor(batchCursor, db.registry)
180 return cursor, replaceTopologyErr(err)
181}
182
183// Drop drops this database from mongodb.
184func (db *Database) Drop(ctx context.Context) error {
185 if ctx == nil {
186 ctx = context.Background()
187 }
188
189 sess := sessionFromContext(ctx)
190
191 err := db.client.ValidSession(sess)
192 if err != nil {
193 return err
194 }
195
196 cmd := command.DropDatabase{
197 DB: db.name,
198 Session: sess,
199 Clock: db.client.clock,
200 }
201 _, err = driver.DropDatabase(
202 ctx, cmd,
203 db.client.topology,
204 db.writeSelector,
205 db.client.id,
206 db.client.topology.SessionPool,
207 )
208 if err != nil && !command.IsNotFound(err) {
209 return replaceTopologyErr(err)
210 }
211 return nil
212}
213
214// ListCollections list collections from mongodb database.
215func (db *Database) ListCollections(ctx context.Context, filter interface{}, opts ...*options.ListCollectionsOptions) (*Cursor, error) {
216 if ctx == nil {
217 ctx = context.Background()
218 }
219
220 sess := sessionFromContext(ctx)
221
222 err := db.client.ValidSession(sess)
223 if err != nil {
224 return nil, err
225 }
226
227 filterDoc, err := transformDocument(db.registry, filter)
228 if err != nil {
229 return nil, err
230 }
231
232 cmd := command.ListCollections{
233 DB: db.name,
234 Filter: filterDoc,
235 ReadPref: readpref.Primary(), // list collections must be run on a primary by default
236 Session: sess,
237 Clock: db.client.clock,
238 }
239
240 readSelector := description.CompositeSelector([]description.ServerSelector{
241 description.ReadPrefSelector(readpref.Primary()),
242 description.LatencySelector(db.client.localThreshold),
243 })
244 batchCursor, err := driver.ListCollections(
245 ctx, cmd,
246 db.client.topology,
247 readSelector,
248 db.client.id,
249 db.client.topology.SessionPool,
250 opts...,
251 )
252 if err != nil {
253 return nil, replaceTopologyErr(err)
254 }
255
256 cursor, err := newCursor(batchCursor, db.registry)
257 return cursor, replaceTopologyErr(err)
258}
259
260// ReadConcern returns the read concern of this database.
261func (db *Database) ReadConcern() *readconcern.ReadConcern {
262 return db.readConcern
263}
264
265// ReadPreference returns the read preference of this database.
266func (db *Database) ReadPreference() *readpref.ReadPref {
267 return db.readPreference
268}
269
270// WriteConcern returns the write concern of this database.
271func (db *Database) WriteConcern() *writeconcern.WriteConcern {
272 return db.writeConcern
273}
274
275// Watch returns a change stream cursor used to receive information of changes to the database. This method is preferred
276// to running a raw aggregation with a $changeStream stage because it supports resumability in the case of some errors.
277// The database must have read concern majority or no read concern for a change stream to be created successfully.
278func (db *Database) Watch(ctx context.Context, pipeline interface{},
279 opts ...*options.ChangeStreamOptions) (*ChangeStream, error) {
280
281 return newDbChangeStream(ctx, db, pipeline, opts...)
282}