blob: d6c8e67d9fda8ad2b1d34b85ff40c3b500343464 [file] [log] [blame]
Don Newton379ae252019-04-01 12:17:06 -04001// Copyright (C) MongoDB, Inc. 2017-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package driver
8
9import (
10 "context"
11
12 "time"
13
14 "github.com/mongodb/mongo-go-driver/bson/bsoncodec"
15 "github.com/mongodb/mongo-go-driver/mongo/options"
16 "github.com/mongodb/mongo-go-driver/mongo/writeconcern"
17 "github.com/mongodb/mongo-go-driver/x/bsonx"
18 "github.com/mongodb/mongo-go-driver/x/mongo/driver/session"
19 "github.com/mongodb/mongo-go-driver/x/mongo/driver/topology"
20 "github.com/mongodb/mongo-go-driver/x/mongo/driver/uuid"
21 "github.com/mongodb/mongo-go-driver/x/network/command"
22 "github.com/mongodb/mongo-go-driver/x/network/description"
23 "github.com/mongodb/mongo-go-driver/x/network/result"
24)
25
26// FindOneAndUpdate handles the full cycle dispatch and execution of a FindOneAndUpdate command against the provided
27// topology.
28func FindOneAndUpdate(
29 ctx context.Context,
30 cmd command.FindOneAndUpdate,
31 topo *topology.Topology,
32 selector description.ServerSelector,
33 clientID uuid.UUID,
34 pool *session.Pool,
35 retryWrite bool,
36 registry *bsoncodec.Registry,
37 opts ...*options.FindOneAndUpdateOptions,
38) (result.FindAndModify, error) {
39
40 ss, err := topo.SelectServer(ctx, selector)
41 if err != nil {
42 return result.FindAndModify{}, err
43 }
44
45 // If no explicit session and deployment supports sessions, start implicit session.
46 if cmd.Session == nil && topo.SupportsSessions() {
47 cmd.Session, err = session.NewClientSession(pool, clientID, session.Implicit)
48 if err != nil {
49 return result.FindAndModify{}, err
50 }
51 defer cmd.Session.EndSession()
52 }
53
54 uo := options.MergeFindOneAndUpdateOptions(opts...)
55 if uo.ArrayFilters != nil {
56 arr, err := uo.ArrayFilters.ToArray()
57 if err != nil {
58 return result.FindAndModify{}, err
59 }
60
61 cmd.Opts = append(cmd.Opts, bsonx.Elem{"arrayFilters", bsonx.Array(arr)})
62 }
63 if uo.BypassDocumentValidation != nil {
64 cmd.Opts = append(cmd.Opts, bsonx.Elem{"bypassDocumentValidation", bsonx.Boolean(*uo.BypassDocumentValidation)})
65 }
66 if uo.Collation != nil {
67 if ss.Description().WireVersion.Max < 5 {
68 return result.FindAndModify{}, ErrCollation
69 }
70 cmd.Opts = append(cmd.Opts, bsonx.Elem{"collation", bsonx.Document(uo.Collation.ToDocument())})
71 }
72 if uo.MaxTime != nil {
73 cmd.Opts = append(cmd.Opts, bsonx.Elem{"maxTimeMS", bsonx.Int64(int64(*uo.MaxTime / time.Millisecond))})
74 }
75 if uo.Projection != nil {
76 projElem, err := interfaceToElement("fields", uo.Projection, registry)
77 if err != nil {
78 return result.FindAndModify{}, err
79 }
80
81 cmd.Opts = append(cmd.Opts, projElem)
82 }
83 if uo.ReturnDocument != nil {
84 cmd.Opts = append(cmd.Opts, bsonx.Elem{"new", bsonx.Boolean(*uo.ReturnDocument == options.After)})
85 }
86 if uo.Sort != nil {
87 sortElem, err := interfaceToElement("sort", uo.Sort, registry)
88 if err != nil {
89 return result.FindAndModify{}, err
90 }
91
92 cmd.Opts = append(cmd.Opts, sortElem)
93 }
94 if uo.Upsert != nil {
95 cmd.Opts = append(cmd.Opts, bsonx.Elem{"upsert", bsonx.Boolean(*uo.Upsert)})
96 }
97
98 // Execute in a single trip if retry writes not supported, or retry not enabled
99 if !retrySupported(topo, ss.Description(), cmd.Session, cmd.WriteConcern) || !retryWrite {
100 if cmd.Session != nil {
101 cmd.Session.RetryWrite = false // explicitly set to false to prevent encoding transaction number
102 }
103 return findOneAndUpdate(ctx, cmd, ss, nil)
104 }
105
106 cmd.Session.RetryWrite = retryWrite
107 cmd.Session.IncrementTxnNumber()
108
109 res, originalErr := findOneAndUpdate(ctx, cmd, ss, nil)
110
111 // Retry if appropriate
112 if cerr, ok := originalErr.(command.Error); ok && cerr.Retryable() {
113 ss, err := topo.SelectServer(ctx, selector)
114
115 // Return original error if server selection fails or new server does not support retryable writes
116 if err != nil || !retrySupported(topo, ss.Description(), cmd.Session, cmd.WriteConcern) {
117 return result.FindAndModify{}, originalErr
118 }
119
120 return findOneAndUpdate(ctx, cmd, ss, cerr)
121 }
122
123 return res, originalErr
124}
125
126func findOneAndUpdate(
127 ctx context.Context,
128 cmd command.FindOneAndUpdate,
129 ss *topology.SelectedServer,
130 oldErr error,
131) (result.FindAndModify, error) {
132 desc := ss.Description()
133 conn, err := ss.Connection(ctx)
134 if err != nil {
135 if oldErr != nil {
136 return result.FindAndModify{}, oldErr
137 }
138 return result.FindAndModify{}, err
139 }
140
141 if !writeconcern.AckWrite(cmd.WriteConcern) {
142 go func() {
143 defer func() { _ = recover() }()
144 defer conn.Close()
145
146 _, _ = cmd.RoundTrip(ctx, desc, conn)
147 }()
148
149 return result.FindAndModify{}, command.ErrUnacknowledgedWrite
150 }
151 defer conn.Close()
152
153 return cmd.RoundTrip(ctx, desc, conn)
154}