Don Newton | 379ae25 | 2019-04-01 12:17:06 -0400 | [diff] [blame^] | 1 | // Copyright (C) MongoDB, Inc. 2017-present. |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); you may |
| 4 | // not use this file except in compliance with the License. You may obtain |
| 5 | // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 |
| 6 | |
| 7 | package mongo |
| 8 | |
| 9 | import ( |
| 10 | "context" |
| 11 | "time" |
| 12 | |
| 13 | "github.com/mongodb/mongo-go-driver/bson" |
| 14 | "github.com/mongodb/mongo-go-driver/bson/bsoncodec" |
| 15 | "github.com/mongodb/mongo-go-driver/mongo/options" |
| 16 | "github.com/mongodb/mongo-go-driver/mongo/readconcern" |
| 17 | "github.com/mongodb/mongo-go-driver/mongo/readpref" |
| 18 | "github.com/mongodb/mongo-go-driver/mongo/writeconcern" |
| 19 | "github.com/mongodb/mongo-go-driver/tag" |
| 20 | "github.com/mongodb/mongo-go-driver/x/mongo/driver" |
| 21 | "github.com/mongodb/mongo-go-driver/x/mongo/driver/session" |
| 22 | "github.com/mongodb/mongo-go-driver/x/mongo/driver/topology" |
| 23 | "github.com/mongodb/mongo-go-driver/x/mongo/driver/uuid" |
| 24 | "github.com/mongodb/mongo-go-driver/x/network/command" |
| 25 | "github.com/mongodb/mongo-go-driver/x/network/connstring" |
| 26 | "github.com/mongodb/mongo-go-driver/x/network/description" |
| 27 | ) |
| 28 | |
| 29 | const defaultLocalThreshold = 15 * time.Millisecond |
| 30 | |
| 31 | // Client performs operations on a given topology. |
| 32 | type Client struct { |
| 33 | id uuid.UUID |
| 34 | topologyOptions []topology.Option |
| 35 | topology *topology.Topology |
| 36 | connString connstring.ConnString |
| 37 | localThreshold time.Duration |
| 38 | retryWrites bool |
| 39 | clock *session.ClusterClock |
| 40 | readPreference *readpref.ReadPref |
| 41 | readConcern *readconcern.ReadConcern |
| 42 | writeConcern *writeconcern.WriteConcern |
| 43 | registry *bsoncodec.Registry |
| 44 | marshaller BSONAppender |
| 45 | } |
| 46 | |
| 47 | // Connect creates a new Client and then initializes it using the Connect method. |
| 48 | func Connect(ctx context.Context, uri string, opts ...*options.ClientOptions) (*Client, error) { |
| 49 | c, err := NewClientWithOptions(uri, opts...) |
| 50 | if err != nil { |
| 51 | return nil, err |
| 52 | } |
| 53 | err = c.Connect(ctx) |
| 54 | if err != nil { |
| 55 | return nil, err |
| 56 | } |
| 57 | return c, nil |
| 58 | } |
| 59 | |
| 60 | // NewClient creates a new client to connect to a cluster specified by the uri. |
| 61 | func NewClient(uri string) (*Client, error) { |
| 62 | cs, err := connstring.Parse(uri) |
| 63 | if err != nil { |
| 64 | return nil, err |
| 65 | } |
| 66 | |
| 67 | return newClient(cs) |
| 68 | } |
| 69 | |
| 70 | // NewClientWithOptions creates a new client to connect to to a cluster specified by the connection |
| 71 | // string and the options manually passed in. If the same option is configured in both the |
| 72 | // connection string and the manual options, the manual option will be ignored. |
| 73 | func NewClientWithOptions(uri string, opts ...*options.ClientOptions) (*Client, error) { |
| 74 | cs, err := connstring.Parse(uri) |
| 75 | if err != nil { |
| 76 | return nil, err |
| 77 | } |
| 78 | |
| 79 | return newClient(cs, opts...) |
| 80 | } |
| 81 | |
| 82 | // Connect initializes the Client by starting background monitoring goroutines. |
| 83 | // This method must be called before a Client can be used. |
| 84 | func (c *Client) Connect(ctx context.Context) error { |
| 85 | err := c.topology.Connect(ctx) |
| 86 | if err != nil { |
| 87 | return replaceTopologyErr(err) |
| 88 | } |
| 89 | |
| 90 | return nil |
| 91 | |
| 92 | } |
| 93 | |
| 94 | // Disconnect closes sockets to the topology referenced by this Client. It will |
| 95 | // shut down any monitoring goroutines, close the idle connection pool, and will |
| 96 | // wait until all the in use connections have been returned to the connection |
| 97 | // pool and closed before returning. If the context expires via cancellation, |
| 98 | // deadline, or timeout before the in use connections have returned, the in use |
| 99 | // connections will be closed, resulting in the failure of any in flight read |
| 100 | // or write operations. If this method returns with no errors, all connections |
| 101 | // associated with this Client have been closed. |
| 102 | func (c *Client) Disconnect(ctx context.Context) error { |
| 103 | c.endSessions(ctx) |
| 104 | return replaceTopologyErr(c.topology.Disconnect(ctx)) |
| 105 | } |
| 106 | |
| 107 | // Ping verifies that the client can connect to the topology. |
| 108 | // If readPreference is nil then will use the client's default read |
| 109 | // preference. |
| 110 | func (c *Client) Ping(ctx context.Context, rp *readpref.ReadPref) error { |
| 111 | if ctx == nil { |
| 112 | ctx = context.Background() |
| 113 | } |
| 114 | |
| 115 | if rp == nil { |
| 116 | rp = c.readPreference |
| 117 | } |
| 118 | |
| 119 | _, err := c.topology.SelectServer(ctx, description.ReadPrefSelector(rp)) |
| 120 | return replaceTopologyErr(err) |
| 121 | } |
| 122 | |
| 123 | // StartSession starts a new session. |
| 124 | func (c *Client) StartSession(opts ...*options.SessionOptions) (Session, error) { |
| 125 | if c.topology.SessionPool == nil { |
| 126 | return nil, ErrClientDisconnected |
| 127 | } |
| 128 | |
| 129 | sopts := options.MergeSessionOptions(opts...) |
| 130 | coreOpts := &session.ClientOptions{ |
| 131 | DefaultReadConcern: c.readConcern, |
| 132 | DefaultReadPreference: c.readPreference, |
| 133 | DefaultWriteConcern: c.writeConcern, |
| 134 | } |
| 135 | if sopts.CausalConsistency != nil { |
| 136 | coreOpts.CausalConsistency = sopts.CausalConsistency |
| 137 | } |
| 138 | if sopts.DefaultReadConcern != nil { |
| 139 | coreOpts.DefaultReadConcern = sopts.DefaultReadConcern |
| 140 | } |
| 141 | if sopts.DefaultWriteConcern != nil { |
| 142 | coreOpts.DefaultWriteConcern = sopts.DefaultWriteConcern |
| 143 | } |
| 144 | if sopts.DefaultReadPreference != nil { |
| 145 | coreOpts.DefaultReadPreference = sopts.DefaultReadPreference |
| 146 | } |
| 147 | |
| 148 | sess, err := session.NewClientSession(c.topology.SessionPool, c.id, session.Explicit, coreOpts) |
| 149 | if err != nil { |
| 150 | return nil, replaceTopologyErr(err) |
| 151 | } |
| 152 | |
| 153 | sess.RetryWrite = c.retryWrites |
| 154 | |
| 155 | return &sessionImpl{ |
| 156 | Client: sess, |
| 157 | topo: c.topology, |
| 158 | }, nil |
| 159 | } |
| 160 | |
| 161 | func (c *Client) endSessions(ctx context.Context) { |
| 162 | if c.topology.SessionPool == nil { |
| 163 | return |
| 164 | } |
| 165 | cmd := command.EndSessions{ |
| 166 | Clock: c.clock, |
| 167 | SessionIDs: c.topology.SessionPool.IDSlice(), |
| 168 | } |
| 169 | |
| 170 | _, _ = driver.EndSessions(ctx, cmd, c.topology, description.ReadPrefSelector(readpref.PrimaryPreferred())) |
| 171 | } |
| 172 | |
| 173 | func newClient(cs connstring.ConnString, opts ...*options.ClientOptions) (*Client, error) { |
| 174 | clientOpt := options.MergeClientOptions(cs, opts...) |
| 175 | |
| 176 | client := &Client{ |
| 177 | topologyOptions: clientOpt.TopologyOptions, |
| 178 | connString: clientOpt.ConnString, |
| 179 | localThreshold: defaultLocalThreshold, |
| 180 | readPreference: clientOpt.ReadPreference, |
| 181 | readConcern: clientOpt.ReadConcern, |
| 182 | writeConcern: clientOpt.WriteConcern, |
| 183 | registry: clientOpt.Registry, |
| 184 | } |
| 185 | |
| 186 | if client.connString.RetryWritesSet { |
| 187 | client.retryWrites = client.connString.RetryWrites |
| 188 | } |
| 189 | if clientOpt.RetryWrites != nil { |
| 190 | client.retryWrites = *clientOpt.RetryWrites |
| 191 | } |
| 192 | |
| 193 | clientID, err := uuid.New() |
| 194 | if err != nil { |
| 195 | return nil, err |
| 196 | } |
| 197 | client.id = clientID |
| 198 | |
| 199 | topts := append( |
| 200 | client.topologyOptions, |
| 201 | topology.WithConnString(func(connstring.ConnString) connstring.ConnString { return client.connString }), |
| 202 | topology.WithServerOptions(func(opts ...topology.ServerOption) []topology.ServerOption { |
| 203 | return append(opts, topology.WithClock(func(clock *session.ClusterClock) *session.ClusterClock { |
| 204 | return client.clock |
| 205 | }), topology.WithRegistry(func(registry *bsoncodec.Registry) *bsoncodec.Registry { |
| 206 | return client.registry |
| 207 | })) |
| 208 | }), |
| 209 | ) |
| 210 | topo, err := topology.New(topts...) |
| 211 | if err != nil { |
| 212 | return nil, replaceTopologyErr(err) |
| 213 | } |
| 214 | client.topology = topo |
| 215 | client.clock = &session.ClusterClock{} |
| 216 | |
| 217 | if client.readConcern == nil { |
| 218 | client.readConcern = readConcernFromConnString(&client.connString) |
| 219 | |
| 220 | if client.readConcern == nil { |
| 221 | // no read concern in conn string |
| 222 | client.readConcern = readconcern.New() |
| 223 | } |
| 224 | } |
| 225 | |
| 226 | if client.writeConcern == nil { |
| 227 | client.writeConcern = writeConcernFromConnString(&client.connString) |
| 228 | } |
| 229 | if client.readPreference == nil { |
| 230 | rp, err := readPreferenceFromConnString(&client.connString) |
| 231 | if err != nil { |
| 232 | return nil, err |
| 233 | } |
| 234 | if rp != nil { |
| 235 | client.readPreference = rp |
| 236 | } else { |
| 237 | client.readPreference = readpref.Primary() |
| 238 | } |
| 239 | } |
| 240 | |
| 241 | if client.registry == nil { |
| 242 | client.registry = bson.DefaultRegistry |
| 243 | } |
| 244 | return client, nil |
| 245 | } |
| 246 | |
| 247 | func readConcernFromConnString(cs *connstring.ConnString) *readconcern.ReadConcern { |
| 248 | if len(cs.ReadConcernLevel) == 0 { |
| 249 | return nil |
| 250 | } |
| 251 | |
| 252 | rc := &readconcern.ReadConcern{} |
| 253 | readconcern.Level(cs.ReadConcernLevel)(rc) |
| 254 | |
| 255 | return rc |
| 256 | } |
| 257 | |
| 258 | func writeConcernFromConnString(cs *connstring.ConnString) *writeconcern.WriteConcern { |
| 259 | var wc *writeconcern.WriteConcern |
| 260 | |
| 261 | if len(cs.WString) > 0 { |
| 262 | if wc == nil { |
| 263 | wc = writeconcern.New() |
| 264 | } |
| 265 | |
| 266 | writeconcern.WTagSet(cs.WString)(wc) |
| 267 | } else if cs.WNumberSet { |
| 268 | if wc == nil { |
| 269 | wc = writeconcern.New() |
| 270 | } |
| 271 | |
| 272 | writeconcern.W(cs.WNumber)(wc) |
| 273 | } |
| 274 | |
| 275 | if cs.JSet { |
| 276 | if wc == nil { |
| 277 | wc = writeconcern.New() |
| 278 | } |
| 279 | |
| 280 | writeconcern.J(cs.J)(wc) |
| 281 | } |
| 282 | |
| 283 | if cs.WTimeoutSet { |
| 284 | if wc == nil { |
| 285 | wc = writeconcern.New() |
| 286 | } |
| 287 | |
| 288 | writeconcern.WTimeout(cs.WTimeout)(wc) |
| 289 | } |
| 290 | |
| 291 | return wc |
| 292 | } |
| 293 | |
| 294 | func readPreferenceFromConnString(cs *connstring.ConnString) (*readpref.ReadPref, error) { |
| 295 | var rp *readpref.ReadPref |
| 296 | var err error |
| 297 | options := make([]readpref.Option, 0, 1) |
| 298 | |
| 299 | tagSets := tag.NewTagSetsFromMaps(cs.ReadPreferenceTagSets) |
| 300 | if len(tagSets) > 0 { |
| 301 | options = append(options, readpref.WithTagSets(tagSets...)) |
| 302 | } |
| 303 | |
| 304 | if cs.MaxStaleness != 0 { |
| 305 | options = append(options, readpref.WithMaxStaleness(cs.MaxStaleness)) |
| 306 | } |
| 307 | |
| 308 | if len(cs.ReadPreference) > 0 { |
| 309 | if rp == nil { |
| 310 | mode, _ := readpref.ModeFromString(cs.ReadPreference) |
| 311 | rp, err = readpref.New(mode, options...) |
| 312 | if err != nil { |
| 313 | return nil, err |
| 314 | } |
| 315 | } |
| 316 | } |
| 317 | |
| 318 | return rp, nil |
| 319 | } |
| 320 | |
| 321 | // ValidSession returns an error if the session doesn't belong to the client |
| 322 | func (c *Client) ValidSession(sess *session.Client) error { |
| 323 | if sess != nil && !uuid.Equal(sess.ClientID, c.id) { |
| 324 | return ErrWrongClient |
| 325 | } |
| 326 | return nil |
| 327 | } |
| 328 | |
| 329 | // Database returns a handle for a given database. |
| 330 | func (c *Client) Database(name string, opts ...*options.DatabaseOptions) *Database { |
| 331 | return newDatabase(c, name, opts...) |
| 332 | } |
| 333 | |
| 334 | // ConnectionString returns the connection string of the cluster the client is connected to. |
| 335 | func (c *Client) ConnectionString() string { |
| 336 | return c.connString.Original |
| 337 | } |
| 338 | |
| 339 | // ListDatabases returns a ListDatabasesResult. |
| 340 | func (c *Client) ListDatabases(ctx context.Context, filter interface{}, opts ...*options.ListDatabasesOptions) (ListDatabasesResult, error) { |
| 341 | if ctx == nil { |
| 342 | ctx = context.Background() |
| 343 | } |
| 344 | |
| 345 | sess := sessionFromContext(ctx) |
| 346 | |
| 347 | err := c.ValidSession(sess) |
| 348 | if err != nil { |
| 349 | return ListDatabasesResult{}, err |
| 350 | } |
| 351 | |
| 352 | f, err := transformDocument(c.registry, filter) |
| 353 | if err != nil { |
| 354 | return ListDatabasesResult{}, err |
| 355 | } |
| 356 | |
| 357 | cmd := command.ListDatabases{ |
| 358 | Filter: f, |
| 359 | Session: sess, |
| 360 | Clock: c.clock, |
| 361 | } |
| 362 | |
| 363 | readSelector := description.CompositeSelector([]description.ServerSelector{ |
| 364 | description.ReadPrefSelector(readpref.Primary()), |
| 365 | description.LatencySelector(c.localThreshold), |
| 366 | }) |
| 367 | res, err := driver.ListDatabases( |
| 368 | ctx, cmd, |
| 369 | c.topology, |
| 370 | readSelector, |
| 371 | c.id, |
| 372 | c.topology.SessionPool, |
| 373 | opts..., |
| 374 | ) |
| 375 | if err != nil { |
| 376 | return ListDatabasesResult{}, replaceTopologyErr(err) |
| 377 | } |
| 378 | |
| 379 | return (ListDatabasesResult{}).fromResult(res), nil |
| 380 | } |
| 381 | |
| 382 | // ListDatabaseNames returns a slice containing the names of all of the databases on the server. |
| 383 | func (c *Client) ListDatabaseNames(ctx context.Context, filter interface{}, opts ...*options.ListDatabasesOptions) ([]string, error) { |
| 384 | opts = append(opts, options.ListDatabases().SetNameOnly(true)) |
| 385 | |
| 386 | res, err := c.ListDatabases(ctx, filter, opts...) |
| 387 | if err != nil { |
| 388 | return nil, err |
| 389 | } |
| 390 | |
| 391 | names := make([]string, 0) |
| 392 | for _, spec := range res.Databases { |
| 393 | names = append(names, spec.Name) |
| 394 | } |
| 395 | |
| 396 | return names, nil |
| 397 | } |
| 398 | |
| 399 | // WithSession allows a user to start a session themselves and manage |
| 400 | // its lifetime. The only way to provide a session to a CRUD method is |
| 401 | // to invoke that CRUD method with the mongo.SessionContext within the |
| 402 | // closure. The mongo.SessionContext can be used as a regular context, |
| 403 | // so methods like context.WithDeadline and context.WithTimeout are |
| 404 | // supported. |
| 405 | // |
| 406 | // If the context.Context already has a mongo.Session attached, that |
| 407 | // mongo.Session will be replaced with the one provided. |
| 408 | // |
| 409 | // Errors returned from the closure are transparently returned from |
| 410 | // this function. |
| 411 | func WithSession(ctx context.Context, sess Session, fn func(SessionContext) error) error { |
| 412 | return fn(contextWithSession(ctx, sess)) |
| 413 | } |
| 414 | |
| 415 | // UseSession creates a default session, that is only valid for the |
| 416 | // lifetime of the closure. No cleanup outside of closing the session |
| 417 | // is done upon exiting the closure. This means that an outstanding |
| 418 | // transaction will be aborted, even if the closure returns an error. |
| 419 | // |
| 420 | // If ctx already contains a mongo.Session, that mongo.Session will be |
| 421 | // replaced with the newly created mongo.Session. |
| 422 | // |
| 423 | // Errors returned from the closure are transparently returned from |
| 424 | // this method. |
| 425 | func (c *Client) UseSession(ctx context.Context, fn func(SessionContext) error) error { |
| 426 | return c.UseSessionWithOptions(ctx, options.Session(), fn) |
| 427 | } |
| 428 | |
| 429 | // UseSessionWithOptions works like UseSession but allows the caller |
| 430 | // to specify the options used to create the session. |
| 431 | func (c *Client) UseSessionWithOptions(ctx context.Context, opts *options.SessionOptions, fn func(SessionContext) error) error { |
| 432 | defaultSess, err := c.StartSession(opts) |
| 433 | if err != nil { |
| 434 | return err |
| 435 | } |
| 436 | |
| 437 | defer defaultSess.EndSession(ctx) |
| 438 | |
| 439 | sessCtx := sessionContext{ |
| 440 | Context: context.WithValue(ctx, sessionKey{}, defaultSess), |
| 441 | Session: defaultSess, |
| 442 | } |
| 443 | |
| 444 | return fn(sessCtx) |
| 445 | } |
| 446 | |
| 447 | // Watch returns a change stream cursor used to receive information of changes to the client. This method is preferred |
| 448 | // to running a raw aggregation with a $changeStream stage because it supports resumability in the case of some errors. |
| 449 | // The client must have read concern majority or no read concern for a change stream to be created successfully. |
| 450 | func (c *Client) Watch(ctx context.Context, pipeline interface{}, |
| 451 | opts ...*options.ChangeStreamOptions) (*ChangeStream, error) { |
| 452 | |
| 453 | return newClientChangeStream(ctx, c, pipeline, opts...) |
| 454 | } |