| // Copyright 2018 The etcd Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package balancer |
| |
| import ( |
| "fmt" |
| "strconv" |
| "sync" |
| "time" |
| |
| "go.etcd.io/etcd/clientv3/balancer/picker" |
| |
| "go.uber.org/zap" |
| "google.golang.org/grpc/balancer" |
| "google.golang.org/grpc/connectivity" |
| "google.golang.org/grpc/resolver" |
| _ "google.golang.org/grpc/resolver/dns" // register DNS resolver |
| _ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver |
| ) |
| |
| // RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it |
| // must be invoked at initialization time. |
| func RegisterBuilder(cfg Config) { |
| bb := &builder{cfg} |
| balancer.Register(bb) |
| |
| bb.cfg.Logger.Debug( |
| "registered balancer", |
| zap.String("policy", bb.cfg.Policy.String()), |
| zap.String("name", bb.cfg.Name), |
| ) |
| } |
| |
| type builder struct { |
| cfg Config |
| } |
| |
| // Build is called initially when creating "ccBalancerWrapper". |
| // "grpc.Dial" is called to this client connection. |
| // Then, resolved addresses will be handled via "HandleResolvedAddrs". |
| func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { |
| bb := &baseBalancer{ |
| id: strconv.FormatInt(time.Now().UnixNano(), 36), |
| policy: b.cfg.Policy, |
| name: b.cfg.Policy.String(), |
| lg: b.cfg.Logger, |
| |
| addrToSc: make(map[resolver.Address]balancer.SubConn), |
| scToAddr: make(map[balancer.SubConn]resolver.Address), |
| scToSt: make(map[balancer.SubConn]connectivity.State), |
| |
| currentConn: nil, |
| csEvltr: &connectivityStateEvaluator{}, |
| |
| // initialize picker always returns "ErrNoSubConnAvailable" |
| Picker: picker.NewErr(balancer.ErrNoSubConnAvailable), |
| } |
| if b.cfg.Name != "" { |
| bb.name = b.cfg.Name |
| } |
| if bb.lg == nil { |
| bb.lg = zap.NewNop() |
| } |
| |
| // TODO: support multiple connections |
| bb.mu.Lock() |
| bb.currentConn = cc |
| bb.mu.Unlock() |
| |
| bb.lg.Info( |
| "built balancer", |
| zap.String("balancer-id", bb.id), |
| zap.String("policy", bb.policy.String()), |
| zap.String("resolver-target", cc.Target()), |
| ) |
| return bb |
| } |
| |
| // Name implements "grpc/balancer.Builder" interface. |
| func (b *builder) Name() string { return b.cfg.Name } |
| |
| // Balancer defines client balancer interface. |
| type Balancer interface { |
| // Balancer is called on specified client connection. Client initiates gRPC |
| // connection with "grpc.Dial(addr, grpc.WithBalancerName)", and then those resolved |
| // addresses are passed to "grpc/balancer.Balancer.HandleResolvedAddrs". |
| // For each resolved address, balancer calls "balancer.ClientConn.NewSubConn". |
| // "grpc/balancer.Balancer.HandleSubConnStateChange" is called when connectivity state |
| // changes, thus requires failover logic in this method. |
| balancer.Balancer |
| |
| // Picker calls "Pick" for every client request. |
| picker.Picker |
| } |
| |
| type baseBalancer struct { |
| id string |
| policy picker.Policy |
| name string |
| lg *zap.Logger |
| |
| mu sync.RWMutex |
| |
| addrToSc map[resolver.Address]balancer.SubConn |
| scToAddr map[balancer.SubConn]resolver.Address |
| scToSt map[balancer.SubConn]connectivity.State |
| |
| currentConn balancer.ClientConn |
| currentState connectivity.State |
| csEvltr *connectivityStateEvaluator |
| |
| picker.Picker |
| } |
| |
| // HandleResolvedAddrs implements "grpc/balancer.Balancer" interface. |
| // gRPC sends initial or updated resolved addresses from "Build". |
| func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { |
| if err != nil { |
| bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err)) |
| return |
| } |
| bb.lg.Info("resolved", zap.String("balancer-id", bb.id), zap.Strings("addresses", addrsToStrings(addrs))) |
| |
| bb.mu.Lock() |
| defer bb.mu.Unlock() |
| |
| resolved := make(map[resolver.Address]struct{}) |
| for _, addr := range addrs { |
| resolved[addr] = struct{}{} |
| if _, ok := bb.addrToSc[addr]; !ok { |
| sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{}) |
| if err != nil { |
| bb.lg.Warn("NewSubConn failed", zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr)) |
| continue |
| } |
| bb.addrToSc[addr] = sc |
| bb.scToAddr[sc] = addr |
| bb.scToSt[sc] = connectivity.Idle |
| sc.Connect() |
| } |
| } |
| |
| for addr, sc := range bb.addrToSc { |
| if _, ok := resolved[addr]; !ok { |
| // was removed by resolver or failed to create subconn |
| bb.currentConn.RemoveSubConn(sc) |
| delete(bb.addrToSc, addr) |
| |
| bb.lg.Info( |
| "removed subconn", |
| zap.String("balancer-id", bb.id), |
| zap.String("address", addr.Addr), |
| zap.String("subconn", scToString(sc)), |
| ) |
| |
| // Keep the state of this sc in bb.scToSt until sc's state becomes Shutdown. |
| // The entry will be deleted in HandleSubConnStateChange. |
| // (DO NOT) delete(bb.scToAddr, sc) |
| // (DO NOT) delete(bb.scToSt, sc) |
| } |
| } |
| } |
| |
| // HandleSubConnStateChange implements "grpc/balancer.Balancer" interface. |
| func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { |
| bb.mu.Lock() |
| defer bb.mu.Unlock() |
| |
| old, ok := bb.scToSt[sc] |
| if !ok { |
| bb.lg.Warn( |
| "state change for an unknown subconn", |
| zap.String("balancer-id", bb.id), |
| zap.String("subconn", scToString(sc)), |
| zap.String("state", s.String()), |
| ) |
| return |
| } |
| |
| bb.lg.Info( |
| "state changed", |
| zap.String("balancer-id", bb.id), |
| zap.Bool("connected", s == connectivity.Ready), |
| zap.String("subconn", scToString(sc)), |
| zap.String("address", bb.scToAddr[sc].Addr), |
| zap.String("old-state", old.String()), |
| zap.String("new-state", s.String()), |
| ) |
| |
| bb.scToSt[sc] = s |
| switch s { |
| case connectivity.Idle: |
| sc.Connect() |
| case connectivity.Shutdown: |
| // When an address was removed by resolver, b called RemoveSubConn but |
| // kept the sc's state in scToSt. Remove state for this sc here. |
| delete(bb.scToAddr, sc) |
| delete(bb.scToSt, sc) |
| } |
| |
| oldAggrState := bb.currentState |
| bb.currentState = bb.csEvltr.recordTransition(old, s) |
| |
| // Regenerate picker when one of the following happens: |
| // - this sc became ready from not-ready |
| // - this sc became not-ready from ready |
| // - the aggregated state of balancer became TransientFailure from non-TransientFailure |
| // - the aggregated state of balancer became non-TransientFailure from TransientFailure |
| if (s == connectivity.Ready) != (old == connectivity.Ready) || |
| (bb.currentState == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) { |
| bb.regeneratePicker() |
| } |
| |
| bb.currentConn.UpdateBalancerState(bb.currentState, bb.Picker) |
| return |
| } |
| |
| func (bb *baseBalancer) regeneratePicker() { |
| if bb.currentState == connectivity.TransientFailure { |
| bb.lg.Info( |
| "generated transient error picker", |
| zap.String("balancer-id", bb.id), |
| zap.String("policy", bb.policy.String()), |
| ) |
| bb.Picker = picker.NewErr(balancer.ErrTransientFailure) |
| return |
| } |
| |
| // only pass ready subconns to picker |
| scs := make([]balancer.SubConn, 0) |
| addrToSc := make(map[resolver.Address]balancer.SubConn) |
| scToAddr := make(map[balancer.SubConn]resolver.Address) |
| for addr, sc := range bb.addrToSc { |
| if st, ok := bb.scToSt[sc]; ok && st == connectivity.Ready { |
| scs = append(scs, sc) |
| addrToSc[addr] = sc |
| scToAddr[sc] = addr |
| } |
| } |
| |
| switch bb.policy { |
| case picker.RoundrobinBalanced: |
| bb.Picker = picker.NewRoundrobinBalanced(bb.lg, scs, addrToSc, scToAddr) |
| |
| default: |
| panic(fmt.Errorf("invalid balancer picker policy (%d)", bb.policy)) |
| } |
| |
| bb.lg.Info( |
| "generated picker", |
| zap.String("balancer-id", bb.id), |
| zap.String("policy", bb.policy.String()), |
| zap.Strings("subconn-ready", scsToStrings(addrToSc)), |
| zap.Int("subconn-size", len(addrToSc)), |
| ) |
| } |
| |
| // Close implements "grpc/balancer.Balancer" interface. |
| // Close is a nop because base balancer doesn't have internal state to clean up, |
| // and it doesn't need to call RemoveSubConn for the SubConns. |
| func (bb *baseBalancer) Close() { |
| // TODO |
| } |