blob: d8d8d161f05c3b69d70d5388ab95d8c16c67d81a [file] [log] [blame]
Don Newton379ae252019-04-01 12:17:06 -04001// Copyright (C) MongoDB, Inc. 2017-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package driver
8
9import (
10 "context"
11
12 "github.com/mongodb/mongo-go-driver/mongo/options"
13 "github.com/mongodb/mongo-go-driver/x/bsonx"
14
15 "github.com/mongodb/mongo-go-driver/mongo/writeconcern"
16 "github.com/mongodb/mongo-go-driver/x/mongo/driver/session"
17 "github.com/mongodb/mongo-go-driver/x/mongo/driver/topology"
18 "github.com/mongodb/mongo-go-driver/x/mongo/driver/uuid"
19 "github.com/mongodb/mongo-go-driver/x/network/command"
20 "github.com/mongodb/mongo-go-driver/x/network/description"
21 "github.com/mongodb/mongo-go-driver/x/network/result"
22)
23
24// Update handles the full cycle dispatch and execution of an update command against the provided
25// topology.
26func Update(
27 ctx context.Context,
28 cmd command.Update,
29 topo *topology.Topology,
30 selector description.ServerSelector,
31 clientID uuid.UUID,
32 pool *session.Pool,
33 retryWrite bool,
34 opts ...*options.UpdateOptions,
35) (result.Update, error) {
36
37 ss, err := topo.SelectServer(ctx, selector)
38 if err != nil {
39 return result.Update{}, err
40 }
41
42 // If no explicit session and deployment supports sessions, start implicit session.
43 if cmd.Session == nil && topo.SupportsSessions() {
44 cmd.Session, err = session.NewClientSession(pool, clientID, session.Implicit)
45 if err != nil {
46 return result.Update{}, err
47 }
48 defer cmd.Session.EndSession()
49 }
50
51 updateOpts := options.MergeUpdateOptions(opts...)
52
53 if updateOpts.ArrayFilters != nil {
54 if ss.Description().WireVersion.Max < 6 {
55 return result.Update{}, ErrArrayFilters
56 }
57 arr, err := updateOpts.ArrayFilters.ToArray()
58 if err != nil {
59 return result.Update{}, err
60 }
61 cmd.Opts = append(cmd.Opts, bsonx.Elem{"arrayFilters", bsonx.Array(arr)})
62 }
63 if updateOpts.BypassDocumentValidation != nil && ss.Description().WireVersion.Includes(4) {
64 cmd.Opts = append(cmd.Opts, bsonx.Elem{"bypassDocumentValidation", bsonx.Boolean(*updateOpts.BypassDocumentValidation)})
65 }
66 if updateOpts.Collation != nil {
67 if ss.Description().WireVersion.Max < 5 {
68 return result.Update{}, ErrCollation
69 }
70 cmd.Opts = append(cmd.Opts, bsonx.Elem{"collation", bsonx.Document(updateOpts.Collation.ToDocument())})
71 }
72 if updateOpts.Upsert != nil {
73 cmd.Opts = append(cmd.Opts, bsonx.Elem{"upsert", bsonx.Boolean(*updateOpts.Upsert)})
74 }
75
76 // Execute in a single trip if retry writes not supported, or retry not enabled
77 if !retrySupported(topo, ss.Description(), cmd.Session, cmd.WriteConcern) || !retryWrite {
78 if cmd.Session != nil {
79 cmd.Session.RetryWrite = false // explicitly set to false to prevent encoding transaction number
80 }
81 return update(ctx, cmd, ss, nil)
82 }
83
84 cmd.Session.RetryWrite = retryWrite
85 cmd.Session.IncrementTxnNumber()
86
87 res, originalErr := update(ctx, cmd, ss, nil)
88
89 // Retry if appropriate
90 if cerr, ok := originalErr.(command.Error); ok && cerr.Retryable() ||
91 res.WriteConcernError != nil && command.IsWriteConcernErrorRetryable(res.WriteConcernError) {
92 ss, err := topo.SelectServer(ctx, selector)
93
94 // Return original error if server selection fails or new server does not support retryable writes
95 if err != nil || !retrySupported(topo, ss.Description(), cmd.Session, cmd.WriteConcern) {
96 return res, originalErr
97 }
98
99 return update(ctx, cmd, ss, cerr)
100 }
101 return res, originalErr
102
103}
104
105func update(
106 ctx context.Context,
107 cmd command.Update,
108 ss *topology.SelectedServer,
109 oldErr error,
110) (result.Update, error) {
111 desc := ss.Description()
112
113 conn, err := ss.Connection(ctx)
114 if err != nil {
115 if oldErr != nil {
116 return result.Update{}, oldErr
117 }
118 return result.Update{}, err
119 }
120
121 if !writeconcern.AckWrite(cmd.WriteConcern) {
122 go func() {
123 defer func() { _ = recover() }()
124 defer conn.Close()
125
126 _, _ = cmd.RoundTrip(ctx, desc, conn)
127 }()
128
129 return result.Update{}, command.ErrUnacknowledgedWrite
130 }
131 defer conn.Close()
132
133 return cmd.RoundTrip(ctx, desc, conn)
134}