blob: c39702ec47c16eee1038e85345646f61f93668a5 [file] [log] [blame]
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package balancer
import (
"fmt"
"strconv"
"sync"
"time"
"go.etcd.io/etcd/clientv3/balancer/picker"
"go.uber.org/zap"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
_ "google.golang.org/grpc/resolver/dns" // register DNS resolver
_ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver
)
// RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it
// must be invoked at initialization time.
func RegisterBuilder(cfg Config) {
bb := &builder{cfg}
balancer.Register(bb)
bb.cfg.Logger.Debug(
"registered balancer",
zap.String("policy", bb.cfg.Policy.String()),
zap.String("name", bb.cfg.Name),
)
}
type builder struct {
cfg Config
}
// Build is called initially when creating "ccBalancerWrapper".
// "grpc.Dial" is called to this client connection.
// Then, resolved addresses will be handled via "HandleResolvedAddrs".
func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
bb := &baseBalancer{
id: strconv.FormatInt(time.Now().UnixNano(), 36),
policy: b.cfg.Policy,
name: b.cfg.Name,
lg: b.cfg.Logger,
addrToSc: make(map[resolver.Address]balancer.SubConn),
scToAddr: make(map[balancer.SubConn]resolver.Address),
scToSt: make(map[balancer.SubConn]connectivity.State),
currentConn: nil,
csEvltr: &connectivityStateEvaluator{},
// initialize picker always returns "ErrNoSubConnAvailable"
Picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
}
if bb.lg == nil {
bb.lg = zap.NewNop()
}
// TODO: support multiple connections
bb.mu.Lock()
bb.currentConn = cc
bb.mu.Unlock()
bb.lg.Info(
"built balancer",
zap.String("balancer-id", bb.id),
zap.String("policy", bb.policy.String()),
zap.String("resolver-target", cc.Target()),
)
return bb
}
// Name implements "grpc/balancer.Builder" interface.
func (b *builder) Name() string { return b.cfg.Name }
// Balancer defines client balancer interface.
type Balancer interface {
// Balancer is called on specified client connection. Client initiates gRPC
// connection with "grpc.Dial(addr, grpc.WithBalancerName)", and then those resolved
// addresses are passed to "grpc/balancer.Balancer.HandleResolvedAddrs".
// For each resolved address, balancer calls "balancer.ClientConn.NewSubConn".
// "grpc/balancer.Balancer.HandleSubConnStateChange" is called when connectivity state
// changes, thus requires failover logic in this method.
balancer.Balancer
// Picker calls "Pick" for every client request.
picker.Picker
}
type baseBalancer struct {
id string
policy picker.Policy
name string
lg *zap.Logger
mu sync.RWMutex
addrToSc map[resolver.Address]balancer.SubConn
scToAddr map[balancer.SubConn]resolver.Address
scToSt map[balancer.SubConn]connectivity.State
currentConn balancer.ClientConn
currentState connectivity.State
csEvltr *connectivityStateEvaluator
picker.Picker
}
// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
// gRPC sends initial or updated resolved addresses from "Build".
func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
if err != nil {
bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
return
}
bb.lg.Info("resolved", zap.String("balancer-id", bb.id), zap.Strings("addresses", addrsToStrings(addrs)))
bb.mu.Lock()
defer bb.mu.Unlock()
resolved := make(map[resolver.Address]struct{})
for _, addr := range addrs {
resolved[addr] = struct{}{}
if _, ok := bb.addrToSc[addr]; !ok {
sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
if err != nil {
bb.lg.Warn("NewSubConn failed", zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
continue
}
bb.addrToSc[addr] = sc
bb.scToAddr[sc] = addr
bb.scToSt[sc] = connectivity.Idle
sc.Connect()
}
}
for addr, sc := range bb.addrToSc {
if _, ok := resolved[addr]; !ok {
// was removed by resolver or failed to create subconn
bb.currentConn.RemoveSubConn(sc)
delete(bb.addrToSc, addr)
bb.lg.Info(
"removed subconn",
zap.String("balancer-id", bb.id),
zap.String("address", addr.Addr),
zap.String("subconn", scToString(sc)),
)
// Keep the state of this sc in bb.scToSt until sc's state becomes Shutdown.
// The entry will be deleted in HandleSubConnStateChange.
// (DO NOT) delete(bb.scToAddr, sc)
// (DO NOT) delete(bb.scToSt, sc)
}
}
}
// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.
func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
bb.mu.Lock()
defer bb.mu.Unlock()
old, ok := bb.scToSt[sc]
if !ok {
bb.lg.Warn(
"state change for an unknown subconn",
zap.String("balancer-id", bb.id),
zap.String("subconn", scToString(sc)),
zap.String("state", s.String()),
)
return
}
bb.lg.Info(
"state changed",
zap.String("balancer-id", bb.id),
zap.Bool("connected", s == connectivity.Ready),
zap.String("subconn", scToString(sc)),
zap.String("address", bb.scToAddr[sc].Addr),
zap.String("old-state", old.String()),
zap.String("new-state", s.String()),
)
bb.scToSt[sc] = s
switch s {
case connectivity.Idle:
sc.Connect()
case connectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scToSt. Remove state for this sc here.
delete(bb.scToAddr, sc)
delete(bb.scToSt, sc)
}
oldAggrState := bb.currentState
bb.currentState = bb.csEvltr.recordTransition(old, s)
// Regenerate picker when one of the following happens:
// - this sc became ready from not-ready
// - this sc became not-ready from ready
// - the aggregated state of balancer became TransientFailure from non-TransientFailure
// - the aggregated state of balancer became non-TransientFailure from TransientFailure
if (s == connectivity.Ready) != (old == connectivity.Ready) ||
(bb.currentState == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
bb.regeneratePicker()
}
bb.currentConn.UpdateBalancerState(bb.currentState, bb.Picker)
}
func (bb *baseBalancer) regeneratePicker() {
if bb.currentState == connectivity.TransientFailure {
bb.lg.Info(
"generated transient error picker",
zap.String("balancer-id", bb.id),
zap.String("policy", bb.policy.String()),
)
bb.Picker = picker.NewErr(balancer.ErrTransientFailure)
return
}
// only pass ready subconns to picker
scs := make([]balancer.SubConn, 0)
addrToSc := make(map[resolver.Address]balancer.SubConn)
scToAddr := make(map[balancer.SubConn]resolver.Address)
for addr, sc := range bb.addrToSc {
if st, ok := bb.scToSt[sc]; ok && st == connectivity.Ready {
scs = append(scs, sc)
addrToSc[addr] = sc
scToAddr[sc] = addr
}
}
switch bb.policy {
case picker.RoundrobinBalanced:
bb.Picker = picker.NewRoundrobinBalanced(bb.lg, scs, addrToSc, scToAddr)
default:
panic(fmt.Errorf("invalid balancer picker policy (%d)", bb.policy))
}
bb.lg.Info(
"generated picker",
zap.String("balancer-id", bb.id),
zap.String("policy", bb.policy.String()),
zap.Strings("subconn-ready", scsToStrings(addrToSc)),
zap.Int("subconn-size", len(addrToSc)),
)
}
// Close implements "grpc/balancer.Balancer" interface.
// Close is a nop because base balancer doesn't have internal state to clean up,
// and it doesn't need to call RemoveSubConn for the SubConns.
func (bb *baseBalancer) Close() {
// TODO
}