| // Copyright 2018 The etcd Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| // Package balancer implements client balancer. |
| package balancer |
| |
| import ( |
| "strconv" |
| "sync" |
| "time" |
| |
| "go.etcd.io/etcd/clientv3/balancer/connectivity" |
| "go.etcd.io/etcd/clientv3/balancer/picker" |
| |
| "go.uber.org/zap" |
| "google.golang.org/grpc/balancer" |
| grpcconnectivity "google.golang.org/grpc/connectivity" |
| "google.golang.org/grpc/resolver" |
| _ "google.golang.org/grpc/resolver/dns" // register DNS resolver |
| _ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver |
| ) |
| |
| // Config defines balancer configurations. |
| type Config struct { |
| // Policy configures balancer policy. |
| Policy picker.Policy |
| |
| // Picker implements gRPC picker. |
| // Leave empty if "Policy" field is not custom. |
| // TODO: currently custom policy is not supported. |
| // Picker picker.Picker |
| |
| // Name defines an additional name for balancer. |
| // Useful for balancer testing to avoid register conflicts. |
| // If empty, defaults to policy name. |
| Name string |
| |
| // Logger configures balancer logging. |
| // If nil, logs are discarded. |
| Logger *zap.Logger |
| } |
| |
| // RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it |
| // must be invoked at initialization time. |
| func RegisterBuilder(cfg Config) { |
| bb := &builder{cfg} |
| balancer.Register(bb) |
| |
| bb.cfg.Logger.Debug( |
| "registered balancer", |
| zap.String("policy", bb.cfg.Policy.String()), |
| zap.String("name", bb.cfg.Name), |
| ) |
| } |
| |
| type builder struct { |
| cfg Config |
| } |
| |
| // Build is called initially when creating "ccBalancerWrapper". |
| // "grpc.Dial" is called to this client connection. |
| // Then, resolved addresses will be handled via "HandleResolvedAddrs". |
| func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { |
| bb := &baseBalancer{ |
| id: strconv.FormatInt(time.Now().UnixNano(), 36), |
| policy: b.cfg.Policy, |
| name: b.cfg.Name, |
| lg: b.cfg.Logger, |
| |
| addrToSc: make(map[resolver.Address]balancer.SubConn), |
| scToAddr: make(map[balancer.SubConn]resolver.Address), |
| scToSt: make(map[balancer.SubConn]grpcconnectivity.State), |
| |
| currentConn: nil, |
| connectivityRecorder: connectivity.New(b.cfg.Logger), |
| |
| // initialize picker always returns "ErrNoSubConnAvailable" |
| picker: picker.NewErr(balancer.ErrNoSubConnAvailable), |
| } |
| |
| // TODO: support multiple connections |
| bb.mu.Lock() |
| bb.currentConn = cc |
| bb.mu.Unlock() |
| |
| bb.lg.Info( |
| "built balancer", |
| zap.String("balancer-id", bb.id), |
| zap.String("policy", bb.policy.String()), |
| zap.String("resolver-target", cc.Target()), |
| ) |
| return bb |
| } |
| |
| // Name implements "grpc/balancer.Builder" interface. |
| func (b *builder) Name() string { return b.cfg.Name } |
| |
| // Balancer defines client balancer interface. |
| type Balancer interface { |
| // Balancer is called on specified client connection. Client initiates gRPC |
| // connection with "grpc.Dial(addr, grpc.WithBalancerName)", and then those resolved |
| // addresses are passed to "grpc/balancer.Balancer.HandleResolvedAddrs". |
| // For each resolved address, balancer calls "balancer.ClientConn.NewSubConn". |
| // "grpc/balancer.Balancer.HandleSubConnStateChange" is called when connectivity state |
| // changes, thus requires failover logic in this method. |
| balancer.Balancer |
| |
| // Picker calls "Pick" for every client request. |
| picker.Picker |
| } |
| |
| type baseBalancer struct { |
| id string |
| policy picker.Policy |
| name string |
| lg *zap.Logger |
| |
| mu sync.RWMutex |
| |
| addrToSc map[resolver.Address]balancer.SubConn |
| scToAddr map[balancer.SubConn]resolver.Address |
| scToSt map[balancer.SubConn]grpcconnectivity.State |
| |
| currentConn balancer.ClientConn |
| connectivityRecorder connectivity.Recorder |
| |
| picker picker.Picker |
| } |
| |
| // HandleResolvedAddrs implements "grpc/balancer.Balancer" interface. |
| // gRPC sends initial or updated resolved addresses from "Build". |
| func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { |
| if err != nil { |
| bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err)) |
| return |
| } |
| bb.lg.Info("resolved", |
| zap.String("picker", bb.picker.String()), |
| zap.String("balancer-id", bb.id), |
| zap.Strings("addresses", addrsToStrings(addrs)), |
| ) |
| |
| bb.mu.Lock() |
| defer bb.mu.Unlock() |
| |
| resolved := make(map[resolver.Address]struct{}) |
| for _, addr := range addrs { |
| resolved[addr] = struct{}{} |
| if _, ok := bb.addrToSc[addr]; !ok { |
| sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{}) |
| if err != nil { |
| bb.lg.Warn("NewSubConn failed", zap.String("picker", bb.picker.String()), zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr)) |
| continue |
| } |
| bb.lg.Info("created subconn", zap.String("address", addr.Addr)) |
| bb.addrToSc[addr] = sc |
| bb.scToAddr[sc] = addr |
| bb.scToSt[sc] = grpcconnectivity.Idle |
| sc.Connect() |
| } |
| } |
| |
| for addr, sc := range bb.addrToSc { |
| if _, ok := resolved[addr]; !ok { |
| // was removed by resolver or failed to create subconn |
| bb.currentConn.RemoveSubConn(sc) |
| delete(bb.addrToSc, addr) |
| |
| bb.lg.Info( |
| "removed subconn", |
| zap.String("picker", bb.picker.String()), |
| zap.String("balancer-id", bb.id), |
| zap.String("address", addr.Addr), |
| zap.String("subconn", scToString(sc)), |
| ) |
| |
| // Keep the state of this sc in bb.scToSt until sc's state becomes Shutdown. |
| // The entry will be deleted in HandleSubConnStateChange. |
| // (DO NOT) delete(bb.scToAddr, sc) |
| // (DO NOT) delete(bb.scToSt, sc) |
| } |
| } |
| } |
| |
| // HandleSubConnStateChange implements "grpc/balancer.Balancer" interface. |
| func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s grpcconnectivity.State) { |
| bb.mu.Lock() |
| defer bb.mu.Unlock() |
| |
| old, ok := bb.scToSt[sc] |
| if !ok { |
| bb.lg.Warn( |
| "state change for an unknown subconn", |
| zap.String("picker", bb.picker.String()), |
| zap.String("balancer-id", bb.id), |
| zap.String("subconn", scToString(sc)), |
| zap.Int("subconn-size", len(bb.scToAddr)), |
| zap.String("state", s.String()), |
| ) |
| return |
| } |
| |
| bb.lg.Info( |
| "state changed", |
| zap.String("picker", bb.picker.String()), |
| zap.String("balancer-id", bb.id), |
| zap.Bool("connected", s == grpcconnectivity.Ready), |
| zap.String("subconn", scToString(sc)), |
| zap.Int("subconn-size", len(bb.scToAddr)), |
| zap.String("address", bb.scToAddr[sc].Addr), |
| zap.String("old-state", old.String()), |
| zap.String("new-state", s.String()), |
| ) |
| |
| bb.scToSt[sc] = s |
| switch s { |
| case grpcconnectivity.Idle: |
| sc.Connect() |
| case grpcconnectivity.Shutdown: |
| // When an address was removed by resolver, b called RemoveSubConn but |
| // kept the sc's state in scToSt. Remove state for this sc here. |
| delete(bb.scToAddr, sc) |
| delete(bb.scToSt, sc) |
| } |
| |
| oldAggrState := bb.connectivityRecorder.GetCurrentState() |
| bb.connectivityRecorder.RecordTransition(old, s) |
| |
| // Update balancer picker when one of the following happens: |
| // - this sc became ready from not-ready |
| // - this sc became not-ready from ready |
| // - the aggregated state of balancer became TransientFailure from non-TransientFailure |
| // - the aggregated state of balancer became non-TransientFailure from TransientFailure |
| if (s == grpcconnectivity.Ready) != (old == grpcconnectivity.Ready) || |
| (bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure) != (oldAggrState == grpcconnectivity.TransientFailure) { |
| bb.updatePicker() |
| } |
| |
| bb.currentConn.UpdateBalancerState(bb.connectivityRecorder.GetCurrentState(), bb.picker) |
| } |
| |
| func (bb *baseBalancer) updatePicker() { |
| if bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure { |
| bb.picker = picker.NewErr(balancer.ErrTransientFailure) |
| bb.lg.Info( |
| "updated picker to transient error picker", |
| zap.String("picker", bb.picker.String()), |
| zap.String("balancer-id", bb.id), |
| zap.String("policy", bb.policy.String()), |
| ) |
| return |
| } |
| |
| // only pass ready subconns to picker |
| scToAddr := make(map[balancer.SubConn]resolver.Address) |
| for addr, sc := range bb.addrToSc { |
| if st, ok := bb.scToSt[sc]; ok && st == grpcconnectivity.Ready { |
| scToAddr[sc] = addr |
| } |
| } |
| |
| bb.picker = picker.New(picker.Config{ |
| Policy: bb.policy, |
| Logger: bb.lg, |
| SubConnToResolverAddress: scToAddr, |
| }) |
| bb.lg.Info( |
| "updated picker", |
| zap.String("picker", bb.picker.String()), |
| zap.String("balancer-id", bb.id), |
| zap.String("policy", bb.policy.String()), |
| zap.Strings("subconn-ready", scsToStrings(scToAddr)), |
| zap.Int("subconn-size", len(scToAddr)), |
| ) |
| } |
| |
| // Close implements "grpc/balancer.Balancer" interface. |
| // Close is a nop because base balancer doesn't have internal state to clean up, |
| // and it doesn't need to call RemoveSubConn for the SubConns. |
| func (bb *baseBalancer) Close() { |
| // TODO |
| } |