blob: d7b65477ee3b1d19a2ee4e7b341dae09736ce940 [file] [log] [blame]
Don Newton379ae252019-04-01 12:17:06 -04001// Copyright (C) MongoDB, Inc. 2017-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package command
8
9import (
10 "context"
11
12 "fmt"
13
14 "github.com/mongodb/mongo-go-driver/bson"
15 "github.com/mongodb/mongo-go-driver/mongo/readconcern"
16 "github.com/mongodb/mongo-go-driver/mongo/readpref"
17 "github.com/mongodb/mongo-go-driver/x/bsonx"
18 "github.com/mongodb/mongo-go-driver/x/mongo/driver/session"
19 "github.com/mongodb/mongo-go-driver/x/network/description"
20 "github.com/mongodb/mongo-go-driver/x/network/wiremessage"
21)
22
23// Read represents a generic database read command.
24type Read struct {
25 DB string
26 Command bsonx.Doc
27 ReadPref *readpref.ReadPref
28 ReadConcern *readconcern.ReadConcern
29 Clock *session.ClusterClock
30 Session *session.Client
31
32 result bson.Raw
33 err error
34}
35
36func (r *Read) createReadPref(serverKind description.ServerKind, topologyKind description.TopologyKind, isOpQuery bool) bsonx.Doc {
37 doc := bsonx.Doc{}
38 rp := r.ReadPref
39
40 if rp == nil {
41 if topologyKind == description.Single && serverKind != description.Mongos {
42 return append(doc, bsonx.Elem{"mode", bsonx.String("primaryPreferred")})
43 }
44 return nil
45 }
46
47 switch rp.Mode() {
48 case readpref.PrimaryMode:
49 if serverKind == description.Mongos {
50 return nil
51 }
52 if topologyKind == description.Single {
53 return append(doc, bsonx.Elem{"mode", bsonx.String("primaryPreferred")})
54 }
55 doc = append(doc, bsonx.Elem{"mode", bsonx.String("primary")})
56 case readpref.PrimaryPreferredMode:
57 doc = append(doc, bsonx.Elem{"mode", bsonx.String("primaryPreferred")})
58 case readpref.SecondaryPreferredMode:
59 _, ok := r.ReadPref.MaxStaleness()
60 if serverKind == description.Mongos && isOpQuery && !ok && len(r.ReadPref.TagSets()) == 0 {
61 return nil
62 }
63 doc = append(doc, bsonx.Elem{"mode", bsonx.String("secondaryPreferred")})
64 case readpref.SecondaryMode:
65 doc = append(doc, bsonx.Elem{"mode", bsonx.String("secondary")})
66 case readpref.NearestMode:
67 doc = append(doc, bsonx.Elem{"mode", bsonx.String("nearest")})
68 }
69
70 sets := make([]bsonx.Val, 0, len(r.ReadPref.TagSets()))
71 for _, ts := range r.ReadPref.TagSets() {
72 if len(ts) == 0 {
73 continue
74 }
75 set := bsonx.Doc{}
76 for _, t := range ts {
77 set = append(set, bsonx.Elem{t.Name, bsonx.String(t.Value)})
78 }
79 sets = append(sets, bsonx.Document(set))
80 }
81 if len(sets) > 0 {
82 doc = append(doc, bsonx.Elem{"tags", bsonx.Array(sets)})
83 }
84
85 if d, ok := r.ReadPref.MaxStaleness(); ok {
86 doc = append(doc, bsonx.Elem{"maxStalenessSeconds", bsonx.Int32(int32(d.Seconds()))})
87 }
88
89 return doc
90}
91
92// addReadPref will add a read preference to the query document.
93//
94// NOTE: This method must always return either a valid bson.Reader or an error.
95func (r *Read) addReadPref(rp *readpref.ReadPref, serverKind description.ServerKind, topologyKind description.TopologyKind, query bson.Raw) (bson.Raw, error) {
96 doc := r.createReadPref(serverKind, topologyKind, true)
97 if doc == nil {
98 return query, nil
99 }
100
101 qdoc := bsonx.Doc{}
102 err := bson.Unmarshal(query, &qdoc)
103 if err != nil {
104 return query, err
105 }
106 return bsonx.Doc{
107 {"$query", bsonx.Document(qdoc)},
108 {"$readPreference", bsonx.Document(doc)},
109 }.MarshalBSON()
110}
111
112// Encode r as OP_MSG
113func (r *Read) encodeOpMsg(desc description.SelectedServer, cmd bsonx.Doc) (wiremessage.WireMessage, error) {
114 msg := wiremessage.Msg{
115 MsgHeader: wiremessage.Header{RequestID: wiremessage.NextRequestID()},
116 Sections: make([]wiremessage.Section, 0),
117 }
118
119 readPrefDoc := r.createReadPref(desc.Server.Kind, desc.Kind, false)
120 fullDocRdr, err := opmsgAddGlobals(cmd, r.DB, readPrefDoc)
121 if err != nil {
122 return nil, err
123 }
124
125 // type 0 doc
126 msg.Sections = append(msg.Sections, wiremessage.SectionBody{
127 PayloadType: wiremessage.SingleDocument,
128 Document: fullDocRdr,
129 })
130
131 // no flags to add
132
133 return msg, nil
134}
135
136func (r *Read) slaveOK(desc description.SelectedServer) wiremessage.QueryFlag {
137 if desc.Kind == description.Single && desc.Server.Kind != description.Mongos {
138 return wiremessage.SlaveOK
139 }
140
141 if r.ReadPref == nil {
142 // assume primary
143 return 0
144 }
145
146 if r.ReadPref.Mode() != readpref.PrimaryMode {
147 return wiremessage.SlaveOK
148 }
149
150 return 0
151}
152
153// Encode c as OP_QUERY
154func (r *Read) encodeOpQuery(desc description.SelectedServer, cmd bsonx.Doc) (wiremessage.WireMessage, error) {
155 rdr, err := marshalCommand(cmd)
156 if err != nil {
157 return nil, err
158 }
159
160 if desc.Server.Kind == description.Mongos {
161 rdr, err = r.addReadPref(r.ReadPref, desc.Server.Kind, desc.Kind, rdr)
162 if err != nil {
163 return nil, err
164 }
165 }
166
167 query := wiremessage.Query{
168 MsgHeader: wiremessage.Header{RequestID: wiremessage.NextRequestID()},
169 FullCollectionName: r.DB + ".$cmd",
170 Flags: r.slaveOK(desc),
171 NumberToReturn: -1,
172 Query: rdr,
173 }
174
175 return query, nil
176}
177
178func (r *Read) decodeOpMsg(wm wiremessage.WireMessage) {
179 msg, ok := wm.(wiremessage.Msg)
180 if !ok {
181 r.err = fmt.Errorf("unsupported response wiremessage type %T", wm)
182 return
183 }
184
185 r.result, r.err = decodeCommandOpMsg(msg)
186}
187
188func (r *Read) decodeOpReply(wm wiremessage.WireMessage) {
189 reply, ok := wm.(wiremessage.Reply)
190 if !ok {
191 r.err = fmt.Errorf("unsupported response wiremessage type %T", wm)
192 return
193 }
194 r.result, r.err = decodeCommandOpReply(reply)
195}
196
197// Encode will encode this command into a wire message for the given server description.
198func (r *Read) Encode(desc description.SelectedServer) (wiremessage.WireMessage, error) {
199 cmd := r.Command.Copy()
200 cmd, err := addReadConcern(cmd, desc, r.ReadConcern, r.Session)
201 if err != nil {
202 return nil, err
203 }
204
205 cmd, err = addSessionFields(cmd, desc, r.Session)
206 if err != nil {
207 return nil, err
208 }
209
210 cmd = addClusterTime(cmd, desc, r.Session, r.Clock)
211
212 if desc.WireVersion == nil || desc.WireVersion.Max < wiremessage.OpmsgWireVersion {
213 return r.encodeOpQuery(desc, cmd)
214 }
215
216 return r.encodeOpMsg(desc, cmd)
217}
218
219// Decode will decode the wire message using the provided server description. Errors during decoding
220// are deferred until either the Result or Err methods are called.
221func (r *Read) Decode(desc description.SelectedServer, wm wiremessage.WireMessage) *Read {
222 switch wm.(type) {
223 case wiremessage.Reply:
224 r.decodeOpReply(wm)
225 default:
226 r.decodeOpMsg(wm)
227 }
228
229 if r.err != nil {
230 // decode functions set error if an invalid response document was returned or if the OK flag in the response was 0
231 // if the OK flag was 0, a type Error is returned. otherwise, a special type is returned
232 if _, ok := r.err.(Error); !ok {
233 return r // for missing/invalid response docs, don't update cluster times
234 }
235 }
236
237 _ = updateClusterTimes(r.Session, r.Clock, r.result)
238 _ = updateOperationTime(r.Session, r.result)
239 return r
240}
241
242// Result returns the result of a decoded wire message and server description.
243func (r *Read) Result() (bson.Raw, error) {
244 if r.err != nil {
245 return nil, r.err
246 }
247
248 return r.result, nil
249}
250
251// Err returns the error set on this command.
252func (r *Read) Err() error {
253 return r.err
254}
255
256// RoundTrip handles the execution of this command using the provided wiremessage.ReadWriter.
257func (r *Read) RoundTrip(ctx context.Context, desc description.SelectedServer, rw wiremessage.ReadWriter) (bson.Raw, error) {
258 wm, err := r.Encode(desc)
259 if err != nil {
260 return nil, err
261 }
262
263 err = rw.WriteWireMessage(ctx, wm)
264 if err != nil {
265 if _, ok := err.(Error); ok {
266 return nil, err
267 }
268 // Connection errors are transient
269 return nil, Error{Message: err.Error(), Labels: []string{TransientTransactionError, NetworkError}}
270 }
271 wm, err = rw.ReadWireMessage(ctx)
272 if err != nil {
273 if _, ok := err.(Error); ok {
274 return nil, err
275 }
276 // Connection errors are transient
277 return nil, Error{Message: err.Error(), Labels: []string{TransientTransactionError, NetworkError}}
278 }
279
280 if r.Session != nil {
281 err = r.Session.UpdateUseTime()
282 if err != nil {
283 return nil, err
284 }
285 }
286 return r.Decode(desc, wm).Result()
287}