blob: 1a5c1aa7e909a4afd7ab9a61009651bde5faea46 [file] [log] [blame]
Zack Williamse940c7a2019-08-21 14:25:39 -07001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package base
20
21import (
22 "context"
23
24 "google.golang.org/grpc/balancer"
25 "google.golang.org/grpc/connectivity"
26 "google.golang.org/grpc/grpclog"
27 "google.golang.org/grpc/resolver"
28)
29
30type baseBuilder struct {
31 name string
32 pickerBuilder PickerBuilder
33 config Config
34}
35
36func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
37 return &baseBalancer{
38 cc: cc,
39 pickerBuilder: bb.pickerBuilder,
40
41 subConns: make(map[resolver.Address]balancer.SubConn),
42 scStates: make(map[balancer.SubConn]connectivity.State),
43 csEvltr: &balancer.ConnectivityStateEvaluator{},
44 // Initialize picker to a picker that always return
45 // ErrNoSubConnAvailable, because when state of a SubConn changes, we
46 // may call UpdateBalancerState with this picker.
47 picker: NewErrPicker(balancer.ErrNoSubConnAvailable),
48 config: bb.config,
49 }
50}
51
52func (bb *baseBuilder) Name() string {
53 return bb.name
54}
55
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000056var _ balancer.V2Balancer = (*baseBalancer)(nil) // Assert that we implement V2Balancer
57
Zack Williamse940c7a2019-08-21 14:25:39 -070058type baseBalancer struct {
59 cc balancer.ClientConn
60 pickerBuilder PickerBuilder
61
62 csEvltr *balancer.ConnectivityStateEvaluator
63 state connectivity.State
64
65 subConns map[resolver.Address]balancer.SubConn
66 scStates map[balancer.SubConn]connectivity.State
67 picker balancer.Picker
68 config Config
69}
70
71func (b *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
72 panic("not implemented")
73}
74
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000075func (b *baseBalancer) ResolverError(error) {
76 // Ignore
77}
78
79func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
divyadesai19009132020-03-04 12:58:08 +000080 // TODO: handle s.ResolverState.Err (log if not nil) once implemented.
81 // TODO: handle s.ResolverState.ServiceConfig?
82 if grpclog.V(2) {
83 grpclog.Infoln("base.baseBalancer: got new ClientConn state: ", s)
84 }
Zack Williamse940c7a2019-08-21 14:25:39 -070085 // addrsSet is the set converted from addrs, it's used for quick lookup of an address.
86 addrsSet := make(map[resolver.Address]struct{})
divyadesai19009132020-03-04 12:58:08 +000087 for _, a := range s.ResolverState.Addresses {
Zack Williamse940c7a2019-08-21 14:25:39 -070088 addrsSet[a] = struct{}{}
89 if _, ok := b.subConns[a]; !ok {
90 // a is a new address (not existing in b.subConns).
91 sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
92 if err != nil {
93 grpclog.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
94 continue
95 }
96 b.subConns[a] = sc
97 b.scStates[sc] = connectivity.Idle
98 sc.Connect()
99 }
100 }
101 for a, sc := range b.subConns {
102 // a was removed by resolver.
103 if _, ok := addrsSet[a]; !ok {
104 b.cc.RemoveSubConn(sc)
105 delete(b.subConns, a)
106 // Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
107 // The entry will be deleted in HandleSubConnStateChange.
108 }
109 }
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000110 return nil
Zack Williamse940c7a2019-08-21 14:25:39 -0700111}
112
113// regeneratePicker takes a snapshot of the balancer, and generates a picker
114// from it. The picker is
115// - errPicker with ErrTransientFailure if the balancer is in TransientFailure,
116// - built by the pickerBuilder with all READY SubConns otherwise.
117func (b *baseBalancer) regeneratePicker() {
118 if b.state == connectivity.TransientFailure {
119 b.picker = NewErrPicker(balancer.ErrTransientFailure)
120 return
121 }
122 readySCs := make(map[resolver.Address]balancer.SubConn)
123
124 // Filter out all ready SCs from full subConn map.
125 for addr, sc := range b.subConns {
126 if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
127 readySCs[addr] = sc
128 }
129 }
130 b.picker = b.pickerBuilder.Build(readySCs)
131}
132
133func (b *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
134 panic("not implemented")
135}
136
137func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
138 s := state.ConnectivityState
divyadesai19009132020-03-04 12:58:08 +0000139 if grpclog.V(2) {
140 grpclog.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s)
141 }
Zack Williamse940c7a2019-08-21 14:25:39 -0700142 oldS, ok := b.scStates[sc]
143 if !ok {
divyadesai19009132020-03-04 12:58:08 +0000144 if grpclog.V(2) {
145 grpclog.Infof("base.baseBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
146 }
Zack Williamse940c7a2019-08-21 14:25:39 -0700147 return
148 }
149 b.scStates[sc] = s
150 switch s {
151 case connectivity.Idle:
152 sc.Connect()
153 case connectivity.Shutdown:
154 // When an address was removed by resolver, b called RemoveSubConn but
155 // kept the sc's state in scStates. Remove state for this sc here.
156 delete(b.scStates, sc)
157 }
158
159 oldAggrState := b.state
160 b.state = b.csEvltr.RecordTransition(oldS, s)
161
162 // Regenerate picker when one of the following happens:
163 // - this sc became ready from not-ready
164 // - this sc became not-ready from ready
165 // - the aggregated state of balancer became TransientFailure from non-TransientFailure
166 // - the aggregated state of balancer became non-TransientFailure from TransientFailure
167 if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
168 (b.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
169 b.regeneratePicker()
170 }
171
172 b.cc.UpdateBalancerState(b.state, b.picker)
173}
174
175// Close is a nop because base balancer doesn't have internal state to clean up,
176// and it doesn't need to call RemoveSubConn for the SubConns.
177func (b *baseBalancer) Close() {
178}
179
180// NewErrPicker returns a picker that always returns err on Pick().
181func NewErrPicker(err error) balancer.Picker {
182 return &errPicker{err: err}
183}
184
185type errPicker struct {
186 err error // Pick() always returns this err.
187}
188
189func (p *errPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
190 return nil, nil, p.err
191}