blob: 7984bc0583ad791a96be12c0d58aac3bb1e5fe03 [file] [log] [blame]
Don Newton379ae252019-04-01 12:17:06 -04001// Copyright (C) MongoDB, Inc. 2017-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package mongo
8
9import (
10 "context"
11 "time"
12
13 "github.com/mongodb/mongo-go-driver/bson"
14 "github.com/mongodb/mongo-go-driver/bson/bsoncodec"
15 "github.com/mongodb/mongo-go-driver/mongo/options"
16 "github.com/mongodb/mongo-go-driver/mongo/readconcern"
17 "github.com/mongodb/mongo-go-driver/mongo/readpref"
18 "github.com/mongodb/mongo-go-driver/mongo/writeconcern"
19 "github.com/mongodb/mongo-go-driver/tag"
20 "github.com/mongodb/mongo-go-driver/x/mongo/driver"
21 "github.com/mongodb/mongo-go-driver/x/mongo/driver/session"
22 "github.com/mongodb/mongo-go-driver/x/mongo/driver/topology"
23 "github.com/mongodb/mongo-go-driver/x/mongo/driver/uuid"
24 "github.com/mongodb/mongo-go-driver/x/network/command"
25 "github.com/mongodb/mongo-go-driver/x/network/connstring"
26 "github.com/mongodb/mongo-go-driver/x/network/description"
27)
28
29const defaultLocalThreshold = 15 * time.Millisecond
30
31// Client performs operations on a given topology.
32type Client struct {
33 id uuid.UUID
34 topologyOptions []topology.Option
35 topology *topology.Topology
36 connString connstring.ConnString
37 localThreshold time.Duration
38 retryWrites bool
39 clock *session.ClusterClock
40 readPreference *readpref.ReadPref
41 readConcern *readconcern.ReadConcern
42 writeConcern *writeconcern.WriteConcern
43 registry *bsoncodec.Registry
44 marshaller BSONAppender
45}
46
47// Connect creates a new Client and then initializes it using the Connect method.
48func Connect(ctx context.Context, uri string, opts ...*options.ClientOptions) (*Client, error) {
49 c, err := NewClientWithOptions(uri, opts...)
50 if err != nil {
51 return nil, err
52 }
53 err = c.Connect(ctx)
54 if err != nil {
55 return nil, err
56 }
57 return c, nil
58}
59
60// NewClient creates a new client to connect to a cluster specified by the uri.
61func NewClient(uri string) (*Client, error) {
62 cs, err := connstring.Parse(uri)
63 if err != nil {
64 return nil, err
65 }
66
67 return newClient(cs)
68}
69
70// NewClientWithOptions creates a new client to connect to to a cluster specified by the connection
71// string and the options manually passed in. If the same option is configured in both the
72// connection string and the manual options, the manual option will be ignored.
73func NewClientWithOptions(uri string, opts ...*options.ClientOptions) (*Client, error) {
74 cs, err := connstring.Parse(uri)
75 if err != nil {
76 return nil, err
77 }
78
79 return newClient(cs, opts...)
80}
81
82// Connect initializes the Client by starting background monitoring goroutines.
83// This method must be called before a Client can be used.
84func (c *Client) Connect(ctx context.Context) error {
85 err := c.topology.Connect(ctx)
86 if err != nil {
87 return replaceTopologyErr(err)
88 }
89
90 return nil
91
92}
93
94// Disconnect closes sockets to the topology referenced by this Client. It will
95// shut down any monitoring goroutines, close the idle connection pool, and will
96// wait until all the in use connections have been returned to the connection
97// pool and closed before returning. If the context expires via cancellation,
98// deadline, or timeout before the in use connections have returned, the in use
99// connections will be closed, resulting in the failure of any in flight read
100// or write operations. If this method returns with no errors, all connections
101// associated with this Client have been closed.
102func (c *Client) Disconnect(ctx context.Context) error {
103 c.endSessions(ctx)
104 return replaceTopologyErr(c.topology.Disconnect(ctx))
105}
106
107// Ping verifies that the client can connect to the topology.
108// If readPreference is nil then will use the client's default read
109// preference.
110func (c *Client) Ping(ctx context.Context, rp *readpref.ReadPref) error {
111 if ctx == nil {
112 ctx = context.Background()
113 }
114
115 if rp == nil {
116 rp = c.readPreference
117 }
118
119 _, err := c.topology.SelectServer(ctx, description.ReadPrefSelector(rp))
120 return replaceTopologyErr(err)
121}
122
123// StartSession starts a new session.
124func (c *Client) StartSession(opts ...*options.SessionOptions) (Session, error) {
125 if c.topology.SessionPool == nil {
126 return nil, ErrClientDisconnected
127 }
128
129 sopts := options.MergeSessionOptions(opts...)
130 coreOpts := &session.ClientOptions{
131 DefaultReadConcern: c.readConcern,
132 DefaultReadPreference: c.readPreference,
133 DefaultWriteConcern: c.writeConcern,
134 }
135 if sopts.CausalConsistency != nil {
136 coreOpts.CausalConsistency = sopts.CausalConsistency
137 }
138 if sopts.DefaultReadConcern != nil {
139 coreOpts.DefaultReadConcern = sopts.DefaultReadConcern
140 }
141 if sopts.DefaultWriteConcern != nil {
142 coreOpts.DefaultWriteConcern = sopts.DefaultWriteConcern
143 }
144 if sopts.DefaultReadPreference != nil {
145 coreOpts.DefaultReadPreference = sopts.DefaultReadPreference
146 }
147
148 sess, err := session.NewClientSession(c.topology.SessionPool, c.id, session.Explicit, coreOpts)
149 if err != nil {
150 return nil, replaceTopologyErr(err)
151 }
152
153 sess.RetryWrite = c.retryWrites
154
155 return &sessionImpl{
156 Client: sess,
157 topo: c.topology,
158 }, nil
159}
160
161func (c *Client) endSessions(ctx context.Context) {
162 if c.topology.SessionPool == nil {
163 return
164 }
165 cmd := command.EndSessions{
166 Clock: c.clock,
167 SessionIDs: c.topology.SessionPool.IDSlice(),
168 }
169
170 _, _ = driver.EndSessions(ctx, cmd, c.topology, description.ReadPrefSelector(readpref.PrimaryPreferred()))
171}
172
173func newClient(cs connstring.ConnString, opts ...*options.ClientOptions) (*Client, error) {
174 clientOpt := options.MergeClientOptions(cs, opts...)
175
176 client := &Client{
177 topologyOptions: clientOpt.TopologyOptions,
178 connString: clientOpt.ConnString,
179 localThreshold: defaultLocalThreshold,
180 readPreference: clientOpt.ReadPreference,
181 readConcern: clientOpt.ReadConcern,
182 writeConcern: clientOpt.WriteConcern,
183 registry: clientOpt.Registry,
184 }
185
186 if client.connString.RetryWritesSet {
187 client.retryWrites = client.connString.RetryWrites
188 }
189 if clientOpt.RetryWrites != nil {
190 client.retryWrites = *clientOpt.RetryWrites
191 }
192
193 clientID, err := uuid.New()
194 if err != nil {
195 return nil, err
196 }
197 client.id = clientID
198
199 topts := append(
200 client.topologyOptions,
201 topology.WithConnString(func(connstring.ConnString) connstring.ConnString { return client.connString }),
202 topology.WithServerOptions(func(opts ...topology.ServerOption) []topology.ServerOption {
203 return append(opts, topology.WithClock(func(clock *session.ClusterClock) *session.ClusterClock {
204 return client.clock
205 }), topology.WithRegistry(func(registry *bsoncodec.Registry) *bsoncodec.Registry {
206 return client.registry
207 }))
208 }),
209 )
210 topo, err := topology.New(topts...)
211 if err != nil {
212 return nil, replaceTopologyErr(err)
213 }
214 client.topology = topo
215 client.clock = &session.ClusterClock{}
216
217 if client.readConcern == nil {
218 client.readConcern = readConcernFromConnString(&client.connString)
219
220 if client.readConcern == nil {
221 // no read concern in conn string
222 client.readConcern = readconcern.New()
223 }
224 }
225
226 if client.writeConcern == nil {
227 client.writeConcern = writeConcernFromConnString(&client.connString)
228 }
229 if client.readPreference == nil {
230 rp, err := readPreferenceFromConnString(&client.connString)
231 if err != nil {
232 return nil, err
233 }
234 if rp != nil {
235 client.readPreference = rp
236 } else {
237 client.readPreference = readpref.Primary()
238 }
239 }
240
241 if client.registry == nil {
242 client.registry = bson.DefaultRegistry
243 }
244 return client, nil
245}
246
247func readConcernFromConnString(cs *connstring.ConnString) *readconcern.ReadConcern {
248 if len(cs.ReadConcernLevel) == 0 {
249 return nil
250 }
251
252 rc := &readconcern.ReadConcern{}
253 readconcern.Level(cs.ReadConcernLevel)(rc)
254
255 return rc
256}
257
258func writeConcernFromConnString(cs *connstring.ConnString) *writeconcern.WriteConcern {
259 var wc *writeconcern.WriteConcern
260
261 if len(cs.WString) > 0 {
262 if wc == nil {
263 wc = writeconcern.New()
264 }
265
266 writeconcern.WTagSet(cs.WString)(wc)
267 } else if cs.WNumberSet {
268 if wc == nil {
269 wc = writeconcern.New()
270 }
271
272 writeconcern.W(cs.WNumber)(wc)
273 }
274
275 if cs.JSet {
276 if wc == nil {
277 wc = writeconcern.New()
278 }
279
280 writeconcern.J(cs.J)(wc)
281 }
282
283 if cs.WTimeoutSet {
284 if wc == nil {
285 wc = writeconcern.New()
286 }
287
288 writeconcern.WTimeout(cs.WTimeout)(wc)
289 }
290
291 return wc
292}
293
294func readPreferenceFromConnString(cs *connstring.ConnString) (*readpref.ReadPref, error) {
295 var rp *readpref.ReadPref
296 var err error
297 options := make([]readpref.Option, 0, 1)
298
299 tagSets := tag.NewTagSetsFromMaps(cs.ReadPreferenceTagSets)
300 if len(tagSets) > 0 {
301 options = append(options, readpref.WithTagSets(tagSets...))
302 }
303
304 if cs.MaxStaleness != 0 {
305 options = append(options, readpref.WithMaxStaleness(cs.MaxStaleness))
306 }
307
308 if len(cs.ReadPreference) > 0 {
309 if rp == nil {
310 mode, _ := readpref.ModeFromString(cs.ReadPreference)
311 rp, err = readpref.New(mode, options...)
312 if err != nil {
313 return nil, err
314 }
315 }
316 }
317
318 return rp, nil
319}
320
321// ValidSession returns an error if the session doesn't belong to the client
322func (c *Client) ValidSession(sess *session.Client) error {
323 if sess != nil && !uuid.Equal(sess.ClientID, c.id) {
324 return ErrWrongClient
325 }
326 return nil
327}
328
329// Database returns a handle for a given database.
330func (c *Client) Database(name string, opts ...*options.DatabaseOptions) *Database {
331 return newDatabase(c, name, opts...)
332}
333
334// ConnectionString returns the connection string of the cluster the client is connected to.
335func (c *Client) ConnectionString() string {
336 return c.connString.Original
337}
338
339// ListDatabases returns a ListDatabasesResult.
340func (c *Client) ListDatabases(ctx context.Context, filter interface{}, opts ...*options.ListDatabasesOptions) (ListDatabasesResult, error) {
341 if ctx == nil {
342 ctx = context.Background()
343 }
344
345 sess := sessionFromContext(ctx)
346
347 err := c.ValidSession(sess)
348 if err != nil {
349 return ListDatabasesResult{}, err
350 }
351
352 f, err := transformDocument(c.registry, filter)
353 if err != nil {
354 return ListDatabasesResult{}, err
355 }
356
357 cmd := command.ListDatabases{
358 Filter: f,
359 Session: sess,
360 Clock: c.clock,
361 }
362
363 readSelector := description.CompositeSelector([]description.ServerSelector{
364 description.ReadPrefSelector(readpref.Primary()),
365 description.LatencySelector(c.localThreshold),
366 })
367 res, err := driver.ListDatabases(
368 ctx, cmd,
369 c.topology,
370 readSelector,
371 c.id,
372 c.topology.SessionPool,
373 opts...,
374 )
375 if err != nil {
376 return ListDatabasesResult{}, replaceTopologyErr(err)
377 }
378
379 return (ListDatabasesResult{}).fromResult(res), nil
380}
381
382// ListDatabaseNames returns a slice containing the names of all of the databases on the server.
383func (c *Client) ListDatabaseNames(ctx context.Context, filter interface{}, opts ...*options.ListDatabasesOptions) ([]string, error) {
384 opts = append(opts, options.ListDatabases().SetNameOnly(true))
385
386 res, err := c.ListDatabases(ctx, filter, opts...)
387 if err != nil {
388 return nil, err
389 }
390
391 names := make([]string, 0)
392 for _, spec := range res.Databases {
393 names = append(names, spec.Name)
394 }
395
396 return names, nil
397}
398
399// WithSession allows a user to start a session themselves and manage
400// its lifetime. The only way to provide a session to a CRUD method is
401// to invoke that CRUD method with the mongo.SessionContext within the
402// closure. The mongo.SessionContext can be used as a regular context,
403// so methods like context.WithDeadline and context.WithTimeout are
404// supported.
405//
406// If the context.Context already has a mongo.Session attached, that
407// mongo.Session will be replaced with the one provided.
408//
409// Errors returned from the closure are transparently returned from
410// this function.
411func WithSession(ctx context.Context, sess Session, fn func(SessionContext) error) error {
412 return fn(contextWithSession(ctx, sess))
413}
414
415// UseSession creates a default session, that is only valid for the
416// lifetime of the closure. No cleanup outside of closing the session
417// is done upon exiting the closure. This means that an outstanding
418// transaction will be aborted, even if the closure returns an error.
419//
420// If ctx already contains a mongo.Session, that mongo.Session will be
421// replaced with the newly created mongo.Session.
422//
423// Errors returned from the closure are transparently returned from
424// this method.
425func (c *Client) UseSession(ctx context.Context, fn func(SessionContext) error) error {
426 return c.UseSessionWithOptions(ctx, options.Session(), fn)
427}
428
429// UseSessionWithOptions works like UseSession but allows the caller
430// to specify the options used to create the session.
431func (c *Client) UseSessionWithOptions(ctx context.Context, opts *options.SessionOptions, fn func(SessionContext) error) error {
432 defaultSess, err := c.StartSession(opts)
433 if err != nil {
434 return err
435 }
436
437 defer defaultSess.EndSession(ctx)
438
439 sessCtx := sessionContext{
440 Context: context.WithValue(ctx, sessionKey{}, defaultSess),
441 Session: defaultSess,
442 }
443
444 return fn(sessCtx)
445}
446
447// Watch returns a change stream cursor used to receive information of changes to the client. This method is preferred
448// to running a raw aggregation with a $changeStream stage because it supports resumability in the case of some errors.
449// The client must have read concern majority or no read concern for a change stream to be created successfully.
450func (c *Client) Watch(ctx context.Context, pipeline interface{},
451 opts ...*options.ChangeStreamOptions) (*ChangeStream, error) {
452
453 return newClientChangeStream(ctx, c, pipeline, opts...)
454}