blob: 3682b575c229908c4940b434a1c3559e1ffcbe28 [file] [log] [blame]
Don Newton379ae252019-04-01 12:17:06 -04001// Copyright (C) MongoDB, Inc. 2017-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package topology
8
9import (
10 "bytes"
11 "fmt"
12
13 "github.com/mongodb/mongo-go-driver/bson/primitive"
14 "github.com/mongodb/mongo-go-driver/x/network/address"
15 "github.com/mongodb/mongo-go-driver/x/network/description"
16)
17
18var supportedWireVersions = description.NewVersionRange(2, 6)
19var minSupportedMongoDBVersion = "2.6"
20
21type fsm struct {
22 description.Topology
23 SetName string
24 maxElectionID primitive.ObjectID
25 maxSetVersion uint32
26}
27
28func newFSM() *fsm {
29 return new(fsm)
30}
31
32// apply should operate on immutable TopologyDescriptions and Descriptions. This way we don't have to
33// lock for the entire time we're applying server description.
34func (f *fsm) apply(s description.Server) (description.Topology, error) {
35
36 newServers := make([]description.Server, len(f.Servers))
37 copy(newServers, f.Servers)
38
39 oldMinutes := f.SessionTimeoutMinutes
40 f.Topology = description.Topology{
41 Kind: f.Kind,
42 Servers: newServers,
43 }
44
45 // For data bearing servers, set SessionTimeoutMinutes to the lowest among them
46 if oldMinutes == 0 {
47 // If timeout currently 0, check all servers to see if any still don't have a timeout
48 // If they all have timeout, pick the lowest.
49 timeout := s.SessionTimeoutMinutes
50 for _, server := range f.Servers {
51 if server.DataBearing() && server.SessionTimeoutMinutes < timeout {
52 timeout = server.SessionTimeoutMinutes
53 }
54 }
55 f.SessionTimeoutMinutes = timeout
56 } else {
57 if s.DataBearing() && oldMinutes > s.SessionTimeoutMinutes {
58 f.SessionTimeoutMinutes = s.SessionTimeoutMinutes
59 } else {
60 f.SessionTimeoutMinutes = oldMinutes
61 }
62 }
63
64 if _, ok := f.findServer(s.Addr); !ok {
65 return f.Topology, nil
66 }
67
68 if s.WireVersion != nil {
69 if s.WireVersion.Max < supportedWireVersions.Min {
70 return description.Topology{}, fmt.Errorf(
71 "server at %s reports wire version %d, but this version of the Go driver requires "+
72 "at least %d (MongoDB %s)",
73 s.Addr.String(),
74 s.WireVersion.Max,
75 supportedWireVersions.Min,
76 minSupportedMongoDBVersion,
77 )
78 }
79
80 if s.WireVersion.Min > supportedWireVersions.Max {
81 return description.Topology{}, fmt.Errorf(
82 "server at %s requires wire version %d, but this version of the Go driver only "+
83 "supports up to %d",
84 s.Addr.String(),
85 s.WireVersion.Min,
86 supportedWireVersions.Max,
87 )
88 }
89 }
90
91 switch f.Kind {
92 case description.Unknown:
93 f.applyToUnknown(s)
94 case description.Sharded:
95 f.applyToSharded(s)
96 case description.ReplicaSetNoPrimary:
97 f.applyToReplicaSetNoPrimary(s)
98 case description.ReplicaSetWithPrimary:
99 f.applyToReplicaSetWithPrimary(s)
100 case description.Single:
101 f.applyToSingle(s)
102 }
103
104 return f.Topology, nil
105}
106
107func (f *fsm) applyToReplicaSetNoPrimary(s description.Server) {
108 switch s.Kind {
109 case description.Standalone, description.Mongos:
110 f.removeServerByAddr(s.Addr)
111 case description.RSPrimary:
112 f.updateRSFromPrimary(s)
113 case description.RSSecondary, description.RSArbiter, description.RSMember:
114 f.updateRSWithoutPrimary(s)
115 case description.Unknown, description.RSGhost:
116 f.replaceServer(s)
117 }
118}
119
120func (f *fsm) applyToReplicaSetWithPrimary(s description.Server) {
121 switch s.Kind {
122 case description.Standalone, description.Mongos:
123 f.removeServerByAddr(s.Addr)
124 f.checkIfHasPrimary()
125 case description.RSPrimary:
126 f.updateRSFromPrimary(s)
127 case description.RSSecondary, description.RSArbiter, description.RSMember:
128 f.updateRSWithPrimaryFromMember(s)
129 case description.Unknown, description.RSGhost:
130 f.replaceServer(s)
131 f.checkIfHasPrimary()
132 }
133}
134
135func (f *fsm) applyToSharded(s description.Server) {
136 switch s.Kind {
137 case description.Mongos, description.Unknown:
138 f.replaceServer(s)
139 case description.Standalone, description.RSPrimary, description.RSSecondary, description.RSArbiter, description.RSMember, description.RSGhost:
140 f.removeServerByAddr(s.Addr)
141 }
142}
143
144func (f *fsm) applyToSingle(s description.Server) {
145 switch s.Kind {
146 case description.Unknown:
147 f.replaceServer(s)
148 case description.Standalone, description.Mongos:
149 if f.SetName != "" {
150 f.removeServerByAddr(s.Addr)
151 return
152 }
153
154 f.replaceServer(s)
155 case description.RSPrimary, description.RSSecondary, description.RSArbiter, description.RSMember, description.RSGhost:
156 if f.SetName != "" && f.SetName != s.SetName {
157 f.removeServerByAddr(s.Addr)
158 return
159 }
160
161 f.replaceServer(s)
162 }
163}
164
165func (f *fsm) applyToUnknown(s description.Server) {
166 switch s.Kind {
167 case description.Mongos:
168 f.setKind(description.Sharded)
169 f.replaceServer(s)
170 case description.RSPrimary:
171 f.updateRSFromPrimary(s)
172 case description.RSSecondary, description.RSArbiter, description.RSMember:
173 f.setKind(description.ReplicaSetNoPrimary)
174 f.updateRSWithoutPrimary(s)
175 case description.Standalone:
176 f.updateUnknownWithStandalone(s)
177 case description.Unknown, description.RSGhost:
178 f.replaceServer(s)
179 }
180}
181
182func (f *fsm) checkIfHasPrimary() {
183 if _, ok := f.findPrimary(); ok {
184 f.setKind(description.ReplicaSetWithPrimary)
185 } else {
186 f.setKind(description.ReplicaSetNoPrimary)
187 }
188}
189
190func (f *fsm) updateRSFromPrimary(s description.Server) {
191 if f.SetName == "" {
192 f.SetName = s.SetName
193 } else if f.SetName != s.SetName {
194 f.removeServerByAddr(s.Addr)
195 f.checkIfHasPrimary()
196 return
197 }
198
199 if s.SetVersion != 0 && !bytes.Equal(s.ElectionID[:], primitive.NilObjectID[:]) {
200 if f.maxSetVersion > s.SetVersion || bytes.Compare(f.maxElectionID[:], s.ElectionID[:]) == 1 {
201 f.replaceServer(description.Server{
202 Addr: s.Addr,
203 LastError: fmt.Errorf("was a primary, but its set version or election id is stale"),
204 })
205 f.checkIfHasPrimary()
206 return
207 }
208
209 f.maxElectionID = s.ElectionID
210 }
211
212 if s.SetVersion > f.maxSetVersion {
213 f.maxSetVersion = s.SetVersion
214 }
215
216 if j, ok := f.findPrimary(); ok {
217 f.setServer(j, description.Server{
218 Addr: f.Servers[j].Addr,
219 LastError: fmt.Errorf("was a primary, but a new primary was discovered"),
220 })
221 }
222
223 f.replaceServer(s)
224
225 for j := len(f.Servers) - 1; j >= 0; j-- {
226 found := false
227 for _, member := range s.Members {
228 if member == f.Servers[j].Addr {
229 found = true
230 break
231 }
232 }
233 if !found {
234 f.removeServer(j)
235 }
236 }
237
238 for _, member := range s.Members {
239 if _, ok := f.findServer(member); !ok {
240 f.addServer(member)
241 }
242 }
243
244 f.checkIfHasPrimary()
245}
246
247func (f *fsm) updateRSWithPrimaryFromMember(s description.Server) {
248 if f.SetName != s.SetName {
249 f.removeServerByAddr(s.Addr)
250 f.checkIfHasPrimary()
251 return
252 }
253
254 if s.Addr != s.CanonicalAddr {
255 f.removeServerByAddr(s.Addr)
256 f.checkIfHasPrimary()
257 return
258 }
259
260 f.replaceServer(s)
261
262 if _, ok := f.findPrimary(); !ok {
263 f.setKind(description.ReplicaSetNoPrimary)
264 }
265}
266
267func (f *fsm) updateRSWithoutPrimary(s description.Server) {
268 if f.SetName == "" {
269 f.SetName = s.SetName
270 } else if f.SetName != s.SetName {
271 f.removeServerByAddr(s.Addr)
272 return
273 }
274
275 for _, member := range s.Members {
276 if _, ok := f.findServer(member); !ok {
277 f.addServer(member)
278 }
279 }
280
281 if s.Addr != s.CanonicalAddr {
282 f.removeServerByAddr(s.Addr)
283 return
284 }
285
286 f.replaceServer(s)
287}
288
289func (f *fsm) updateUnknownWithStandalone(s description.Server) {
290 if len(f.Servers) > 1 {
291 f.removeServerByAddr(s.Addr)
292 return
293 }
294
295 f.setKind(description.Single)
296 f.replaceServer(s)
297}
298
299func (f *fsm) addServer(addr address.Address) {
300 f.Servers = append(f.Servers, description.Server{
301 Addr: addr.Canonicalize(),
302 })
303}
304
305func (f *fsm) findPrimary() (int, bool) {
306 for i, s := range f.Servers {
307 if s.Kind == description.RSPrimary {
308 return i, true
309 }
310 }
311
312 return 0, false
313}
314
315func (f *fsm) findServer(addr address.Address) (int, bool) {
316 canon := addr.Canonicalize()
317 for i, s := range f.Servers {
318 if canon == s.Addr {
319 return i, true
320 }
321 }
322
323 return 0, false
324}
325
326func (f *fsm) removeServer(i int) {
327 f.Servers = append(f.Servers[:i], f.Servers[i+1:]...)
328}
329
330func (f *fsm) removeServerByAddr(addr address.Address) {
331 if i, ok := f.findServer(addr); ok {
332 f.removeServer(i)
333 }
334}
335
336func (f *fsm) replaceServer(s description.Server) bool {
337 if i, ok := f.findServer(s.Addr); ok {
338 f.setServer(i, s)
339 return true
340 }
341 return false
342}
343
344func (f *fsm) setServer(i int, s description.Server) {
345 f.Servers[i] = s
346}
347
348func (f *fsm) setKind(k description.TopologyKind) {
349 f.Kind = k
350}