blob: 3682b575c229908c4940b434a1c3559e1ffcbe28 [file] [log] [blame]
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package topology
import (
"bytes"
"fmt"
"github.com/mongodb/mongo-go-driver/bson/primitive"
"github.com/mongodb/mongo-go-driver/x/network/address"
"github.com/mongodb/mongo-go-driver/x/network/description"
)
var supportedWireVersions = description.NewVersionRange(2, 6)
var minSupportedMongoDBVersion = "2.6"
type fsm struct {
description.Topology
SetName string
maxElectionID primitive.ObjectID
maxSetVersion uint32
}
func newFSM() *fsm {
return new(fsm)
}
// apply should operate on immutable TopologyDescriptions and Descriptions. This way we don't have to
// lock for the entire time we're applying server description.
func (f *fsm) apply(s description.Server) (description.Topology, error) {
newServers := make([]description.Server, len(f.Servers))
copy(newServers, f.Servers)
oldMinutes := f.SessionTimeoutMinutes
f.Topology = description.Topology{
Kind: f.Kind,
Servers: newServers,
}
// For data bearing servers, set SessionTimeoutMinutes to the lowest among them
if oldMinutes == 0 {
// If timeout currently 0, check all servers to see if any still don't have a timeout
// If they all have timeout, pick the lowest.
timeout := s.SessionTimeoutMinutes
for _, server := range f.Servers {
if server.DataBearing() && server.SessionTimeoutMinutes < timeout {
timeout = server.SessionTimeoutMinutes
}
}
f.SessionTimeoutMinutes = timeout
} else {
if s.DataBearing() && oldMinutes > s.SessionTimeoutMinutes {
f.SessionTimeoutMinutes = s.SessionTimeoutMinutes
} else {
f.SessionTimeoutMinutes = oldMinutes
}
}
if _, ok := f.findServer(s.Addr); !ok {
return f.Topology, nil
}
if s.WireVersion != nil {
if s.WireVersion.Max < supportedWireVersions.Min {
return description.Topology{}, fmt.Errorf(
"server at %s reports wire version %d, but this version of the Go driver requires "+
"at least %d (MongoDB %s)",
s.Addr.String(),
s.WireVersion.Max,
supportedWireVersions.Min,
minSupportedMongoDBVersion,
)
}
if s.WireVersion.Min > supportedWireVersions.Max {
return description.Topology{}, fmt.Errorf(
"server at %s requires wire version %d, but this version of the Go driver only "+
"supports up to %d",
s.Addr.String(),
s.WireVersion.Min,
supportedWireVersions.Max,
)
}
}
switch f.Kind {
case description.Unknown:
f.applyToUnknown(s)
case description.Sharded:
f.applyToSharded(s)
case description.ReplicaSetNoPrimary:
f.applyToReplicaSetNoPrimary(s)
case description.ReplicaSetWithPrimary:
f.applyToReplicaSetWithPrimary(s)
case description.Single:
f.applyToSingle(s)
}
return f.Topology, nil
}
func (f *fsm) applyToReplicaSetNoPrimary(s description.Server) {
switch s.Kind {
case description.Standalone, description.Mongos:
f.removeServerByAddr(s.Addr)
case description.RSPrimary:
f.updateRSFromPrimary(s)
case description.RSSecondary, description.RSArbiter, description.RSMember:
f.updateRSWithoutPrimary(s)
case description.Unknown, description.RSGhost:
f.replaceServer(s)
}
}
func (f *fsm) applyToReplicaSetWithPrimary(s description.Server) {
switch s.Kind {
case description.Standalone, description.Mongos:
f.removeServerByAddr(s.Addr)
f.checkIfHasPrimary()
case description.RSPrimary:
f.updateRSFromPrimary(s)
case description.RSSecondary, description.RSArbiter, description.RSMember:
f.updateRSWithPrimaryFromMember(s)
case description.Unknown, description.RSGhost:
f.replaceServer(s)
f.checkIfHasPrimary()
}
}
func (f *fsm) applyToSharded(s description.Server) {
switch s.Kind {
case description.Mongos, description.Unknown:
f.replaceServer(s)
case description.Standalone, description.RSPrimary, description.RSSecondary, description.RSArbiter, description.RSMember, description.RSGhost:
f.removeServerByAddr(s.Addr)
}
}
func (f *fsm) applyToSingle(s description.Server) {
switch s.Kind {
case description.Unknown:
f.replaceServer(s)
case description.Standalone, description.Mongos:
if f.SetName != "" {
f.removeServerByAddr(s.Addr)
return
}
f.replaceServer(s)
case description.RSPrimary, description.RSSecondary, description.RSArbiter, description.RSMember, description.RSGhost:
if f.SetName != "" && f.SetName != s.SetName {
f.removeServerByAddr(s.Addr)
return
}
f.replaceServer(s)
}
}
func (f *fsm) applyToUnknown(s description.Server) {
switch s.Kind {
case description.Mongos:
f.setKind(description.Sharded)
f.replaceServer(s)
case description.RSPrimary:
f.updateRSFromPrimary(s)
case description.RSSecondary, description.RSArbiter, description.RSMember:
f.setKind(description.ReplicaSetNoPrimary)
f.updateRSWithoutPrimary(s)
case description.Standalone:
f.updateUnknownWithStandalone(s)
case description.Unknown, description.RSGhost:
f.replaceServer(s)
}
}
func (f *fsm) checkIfHasPrimary() {
if _, ok := f.findPrimary(); ok {
f.setKind(description.ReplicaSetWithPrimary)
} else {
f.setKind(description.ReplicaSetNoPrimary)
}
}
func (f *fsm) updateRSFromPrimary(s description.Server) {
if f.SetName == "" {
f.SetName = s.SetName
} else if f.SetName != s.SetName {
f.removeServerByAddr(s.Addr)
f.checkIfHasPrimary()
return
}
if s.SetVersion != 0 && !bytes.Equal(s.ElectionID[:], primitive.NilObjectID[:]) {
if f.maxSetVersion > s.SetVersion || bytes.Compare(f.maxElectionID[:], s.ElectionID[:]) == 1 {
f.replaceServer(description.Server{
Addr: s.Addr,
LastError: fmt.Errorf("was a primary, but its set version or election id is stale"),
})
f.checkIfHasPrimary()
return
}
f.maxElectionID = s.ElectionID
}
if s.SetVersion > f.maxSetVersion {
f.maxSetVersion = s.SetVersion
}
if j, ok := f.findPrimary(); ok {
f.setServer(j, description.Server{
Addr: f.Servers[j].Addr,
LastError: fmt.Errorf("was a primary, but a new primary was discovered"),
})
}
f.replaceServer(s)
for j := len(f.Servers) - 1; j >= 0; j-- {
found := false
for _, member := range s.Members {
if member == f.Servers[j].Addr {
found = true
break
}
}
if !found {
f.removeServer(j)
}
}
for _, member := range s.Members {
if _, ok := f.findServer(member); !ok {
f.addServer(member)
}
}
f.checkIfHasPrimary()
}
func (f *fsm) updateRSWithPrimaryFromMember(s description.Server) {
if f.SetName != s.SetName {
f.removeServerByAddr(s.Addr)
f.checkIfHasPrimary()
return
}
if s.Addr != s.CanonicalAddr {
f.removeServerByAddr(s.Addr)
f.checkIfHasPrimary()
return
}
f.replaceServer(s)
if _, ok := f.findPrimary(); !ok {
f.setKind(description.ReplicaSetNoPrimary)
}
}
func (f *fsm) updateRSWithoutPrimary(s description.Server) {
if f.SetName == "" {
f.SetName = s.SetName
} else if f.SetName != s.SetName {
f.removeServerByAddr(s.Addr)
return
}
for _, member := range s.Members {
if _, ok := f.findServer(member); !ok {
f.addServer(member)
}
}
if s.Addr != s.CanonicalAddr {
f.removeServerByAddr(s.Addr)
return
}
f.replaceServer(s)
}
func (f *fsm) updateUnknownWithStandalone(s description.Server) {
if len(f.Servers) > 1 {
f.removeServerByAddr(s.Addr)
return
}
f.setKind(description.Single)
f.replaceServer(s)
}
func (f *fsm) addServer(addr address.Address) {
f.Servers = append(f.Servers, description.Server{
Addr: addr.Canonicalize(),
})
}
func (f *fsm) findPrimary() (int, bool) {
for i, s := range f.Servers {
if s.Kind == description.RSPrimary {
return i, true
}
}
return 0, false
}
func (f *fsm) findServer(addr address.Address) (int, bool) {
canon := addr.Canonicalize()
for i, s := range f.Servers {
if canon == s.Addr {
return i, true
}
}
return 0, false
}
func (f *fsm) removeServer(i int) {
f.Servers = append(f.Servers[:i], f.Servers[i+1:]...)
}
func (f *fsm) removeServerByAddr(addr address.Address) {
if i, ok := f.findServer(addr); ok {
f.removeServer(i)
}
}
func (f *fsm) replaceServer(s description.Server) bool {
if i, ok := f.findServer(s.Addr); ok {
f.setServer(i, s)
return true
}
return false
}
func (f *fsm) setServer(i int, s description.Server) {
f.Servers[i] = s
}
func (f *fsm) setKind(k description.TopologyKind) {
f.Kind = k
}