blob: 9c31b6ec89bff91509dd2bb3eb69adf051da5ffd [file] [log] [blame]
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package description
import (
"fmt"
"math"
"time"
"github.com/mongodb/mongo-go-driver/mongo/readpref"
"github.com/mongodb/mongo-go-driver/tag"
)
// ServerSelector is an interface implemented by types that can select a server given a
// topology description.
type ServerSelector interface {
SelectServer(Topology, []Server) ([]Server, error)
}
// ServerSelectorFunc is a function that can be used as a ServerSelector.
type ServerSelectorFunc func(Topology, []Server) ([]Server, error)
// SelectServer implements the ServerSelector interface.
func (ssf ServerSelectorFunc) SelectServer(t Topology, s []Server) ([]Server, error) {
return ssf(t, s)
}
type compositeSelector struct {
selectors []ServerSelector
}
// CompositeSelector combines multiple selectors into a single selector.
func CompositeSelector(selectors []ServerSelector) ServerSelector {
return &compositeSelector{selectors: selectors}
}
func (cs *compositeSelector) SelectServer(t Topology, candidates []Server) ([]Server, error) {
var err error
for _, sel := range cs.selectors {
candidates, err = sel.SelectServer(t, candidates)
if err != nil {
return nil, err
}
}
return candidates, nil
}
type latencySelector struct {
latency time.Duration
}
// LatencySelector creates a ServerSelector which selects servers based on their latency.
func LatencySelector(latency time.Duration) ServerSelector {
return &latencySelector{latency: latency}
}
func (ls *latencySelector) SelectServer(t Topology, candidates []Server) ([]Server, error) {
if ls.latency < 0 {
return candidates, nil
}
switch len(candidates) {
case 0, 1:
return candidates, nil
default:
min := time.Duration(math.MaxInt64)
for _, candidate := range candidates {
if candidate.AverageRTTSet {
if candidate.AverageRTT < min {
min = candidate.AverageRTT
}
}
}
if min == math.MaxInt64 {
return candidates, nil
}
max := min + ls.latency
var result []Server
for _, candidate := range candidates {
if candidate.AverageRTTSet {
if candidate.AverageRTT <= max {
result = append(result, candidate)
}
}
}
return result, nil
}
}
// WriteSelector selects all the writable servers.
func WriteSelector() ServerSelector {
return ServerSelectorFunc(func(t Topology, candidates []Server) ([]Server, error) {
switch t.Kind {
case Single:
return candidates, nil
default:
result := []Server{}
for _, candidate := range candidates {
switch candidate.Kind {
case Mongos, RSPrimary, Standalone:
result = append(result, candidate)
}
}
return result, nil
}
})
}
// ReadPrefSelector selects servers based on the provided read preference.
func ReadPrefSelector(rp *readpref.ReadPref) ServerSelector {
return ServerSelectorFunc(func(t Topology, candidates []Server) ([]Server, error) {
if _, set := rp.MaxStaleness(); set {
for _, s := range candidates {
if s.Kind != Unknown {
if err := MaxStalenessSupported(s.WireVersion); err != nil {
return nil, err
}
}
}
}
switch t.Kind {
case Single:
return candidates, nil
case ReplicaSetNoPrimary, ReplicaSetWithPrimary:
return selectForReplicaSet(rp, t, candidates)
case Sharded:
return selectByKind(candidates, Mongos), nil
}
return nil, nil
})
}
func selectForReplicaSet(rp *readpref.ReadPref, t Topology, candidates []Server) ([]Server, error) {
if err := verifyMaxStaleness(rp, t); err != nil {
return nil, err
}
switch rp.Mode() {
case readpref.PrimaryMode:
return selectByKind(candidates, RSPrimary), nil
case readpref.PrimaryPreferredMode:
selected := selectByKind(candidates, RSPrimary)
if len(selected) == 0 {
selected = selectSecondaries(rp, candidates)
return selectByTagSet(selected, rp.TagSets()), nil
}
return selected, nil
case readpref.SecondaryPreferredMode:
selected := selectSecondaries(rp, candidates)
selected = selectByTagSet(selected, rp.TagSets())
if len(selected) > 0 {
return selected, nil
}
return selectByKind(candidates, RSPrimary), nil
case readpref.SecondaryMode:
selected := selectSecondaries(rp, candidates)
return selectByTagSet(selected, rp.TagSets()), nil
case readpref.NearestMode:
selected := selectByKind(candidates, RSPrimary)
selected = append(selected, selectSecondaries(rp, candidates)...)
return selectByTagSet(selected, rp.TagSets()), nil
}
return nil, fmt.Errorf("unsupported mode: %d", rp.Mode())
}
func selectSecondaries(rp *readpref.ReadPref, candidates []Server) []Server {
secondaries := selectByKind(candidates, RSSecondary)
if len(secondaries) == 0 {
return secondaries
}
if maxStaleness, set := rp.MaxStaleness(); set {
primaries := selectByKind(candidates, RSPrimary)
if len(primaries) == 0 {
baseTime := secondaries[0].LastWriteTime
for i := 1; i < len(secondaries); i++ {
if secondaries[i].LastWriteTime.After(baseTime) {
baseTime = secondaries[i].LastWriteTime
}
}
var selected []Server
for _, secondary := range secondaries {
estimatedStaleness := baseTime.Sub(secondary.LastWriteTime) + secondary.HeartbeatInterval
if estimatedStaleness <= maxStaleness {
selected = append(selected, secondary)
}
}
return selected
}
primary := primaries[0]
var selected []Server
for _, secondary := range secondaries {
estimatedStaleness := secondary.LastUpdateTime.Sub(secondary.LastWriteTime) - primary.LastUpdateTime.Sub(primary.LastWriteTime) + secondary.HeartbeatInterval
if estimatedStaleness <= maxStaleness {
selected = append(selected, secondary)
}
}
return selected
}
return secondaries
}
func selectByTagSet(candidates []Server, tagSets []tag.Set) []Server {
if len(tagSets) == 0 {
return candidates
}
for _, ts := range tagSets {
var results []Server
for _, s := range candidates {
if len(s.Tags) > 0 && s.Tags.ContainsAll(ts) {
results = append(results, s)
}
}
if len(results) > 0 {
return results
}
}
return []Server{}
}
func selectByKind(candidates []Server, kind ServerKind) []Server {
var result []Server
for _, s := range candidates {
if s.Kind == kind {
result = append(result, s)
}
}
return result
}
func verifyMaxStaleness(rp *readpref.ReadPref, t Topology) error {
maxStaleness, set := rp.MaxStaleness()
if !set {
return nil
}
if maxStaleness < 90*time.Second {
return fmt.Errorf("max staleness (%s) must be greater than or equal to 90s", maxStaleness)
}
if len(t.Servers) < 1 {
// Maybe we should return an error here instead?
return nil
}
// we'll assume all candidates have the same heartbeat interval.
s := t.Servers[0]
idleWritePeriod := 10 * time.Second
if maxStaleness < s.HeartbeatInterval+idleWritePeriod {
return fmt.Errorf(
"max staleness (%s) must be greater than or equal to the heartbeat interval (%s) plus idle write period (%s)",
maxStaleness, s.HeartbeatInterval, idleWritePeriod,
)
}
return nil
}