blob: 89ba9fa3e098a75ac6e15a6f3e0b65250cae826a [file] [log] [blame]
Prince Pereirac1c21d62021-04-22 08:38:15 +00001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "fmt"
23 "strings"
24 "sync"
25 "time"
26
27 "google.golang.org/grpc/balancer"
28 "google.golang.org/grpc/credentials"
29 "google.golang.org/grpc/grpclog"
30 "google.golang.org/grpc/internal/channelz"
31 "google.golang.org/grpc/internal/grpcsync"
32 "google.golang.org/grpc/resolver"
33 "google.golang.org/grpc/serviceconfig"
34)
35
36// ccResolverWrapper is a wrapper on top of cc for resolvers.
37// It implements resolver.ClientConnection interface.
38type ccResolverWrapper struct {
39 cc *ClientConn
40 resolverMu sync.Mutex
41 resolver resolver.Resolver
42 done *grpcsync.Event
43 curState resolver.State
44
45 pollingMu sync.Mutex
46 polling chan struct{}
47}
48
49// split2 returns the values from strings.SplitN(s, sep, 2).
50// If sep is not found, it returns ("", "", false) instead.
51func split2(s, sep string) (string, string, bool) {
52 spl := strings.SplitN(s, sep, 2)
53 if len(spl) < 2 {
54 return "", "", false
55 }
56 return spl[0], spl[1], true
57}
58
59// parseTarget splits target into a struct containing scheme, authority and
60// endpoint.
61//
62// If target is not a valid scheme://authority/endpoint, it returns {Endpoint:
63// target}.
64func parseTarget(target string) (ret resolver.Target) {
65 var ok bool
66 ret.Scheme, ret.Endpoint, ok = split2(target, "://")
67 if !ok {
68 return resolver.Target{Endpoint: target}
69 }
70 ret.Authority, ret.Endpoint, ok = split2(ret.Endpoint, "/")
71 if !ok {
72 return resolver.Target{Endpoint: target}
73 }
74 return ret
75}
76
77// newCCResolverWrapper uses the resolver.Builder stored in the ClientConn to
78// build a Resolver and returns a ccResolverWrapper object which wraps the
79// newly built resolver.
80func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) {
81 rb := cc.dopts.resolverBuilder
82 if rb == nil {
83 return nil, fmt.Errorf("could not get resolver for scheme: %q", cc.parsedTarget.Scheme)
84 }
85
86 ccr := &ccResolverWrapper{
87 cc: cc,
88 done: grpcsync.NewEvent(),
89 }
90
91 var credsClone credentials.TransportCredentials
92 if creds := cc.dopts.copts.TransportCredentials; creds != nil {
93 credsClone = creds.Clone()
94 }
95 rbo := resolver.BuildOptions{
96 DisableServiceConfig: cc.dopts.disableServiceConfig,
97 DialCreds: credsClone,
98 CredsBundle: cc.dopts.copts.CredsBundle,
99 Dialer: cc.dopts.copts.Dialer,
100 }
101
102 var err error
103 // We need to hold the lock here while we assign to the ccr.resolver field
104 // to guard against a data race caused by the following code path,
105 // rb.Build-->ccr.ReportError-->ccr.poll-->ccr.resolveNow, would end up
106 // accessing ccr.resolver which is being assigned here.
107 ccr.resolverMu.Lock()
108 defer ccr.resolverMu.Unlock()
109 ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
110 if err != nil {
111 return nil, err
112 }
113 return ccr, nil
114}
115
116func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) {
117 ccr.resolverMu.Lock()
118 if !ccr.done.HasFired() {
119 ccr.resolver.ResolveNow(o)
120 }
121 ccr.resolverMu.Unlock()
122}
123
124func (ccr *ccResolverWrapper) close() {
125 ccr.resolverMu.Lock()
126 ccr.resolver.Close()
127 ccr.done.Fire()
128 ccr.resolverMu.Unlock()
129}
130
131// poll begins or ends asynchronous polling of the resolver based on whether
132// err is ErrBadResolverState.
133func (ccr *ccResolverWrapper) poll(err error) {
134 ccr.pollingMu.Lock()
135 defer ccr.pollingMu.Unlock()
136 if err != balancer.ErrBadResolverState {
137 // stop polling
138 if ccr.polling != nil {
139 close(ccr.polling)
140 ccr.polling = nil
141 }
142 return
143 }
144 if ccr.polling != nil {
145 // already polling
146 return
147 }
148 p := make(chan struct{})
149 ccr.polling = p
150 go func() {
151 for i := 0; ; i++ {
152 ccr.resolveNow(resolver.ResolveNowOptions{})
153 t := time.NewTimer(ccr.cc.dopts.resolveNowBackoff(i))
154 select {
155 case <-p:
156 t.Stop()
157 return
158 case <-ccr.done.Done():
159 // Resolver has been closed.
160 t.Stop()
161 return
162 case <-t.C:
163 select {
164 case <-p:
165 return
166 default:
167 }
168 // Timer expired; re-resolve.
169 }
170 }
171 }()
172}
173
174func (ccr *ccResolverWrapper) UpdateState(s resolver.State) {
175 if ccr.done.HasFired() {
176 return
177 }
178 grpclog.Infof("ccResolverWrapper: sending update to cc: %v", s)
179 if channelz.IsOn() {
180 ccr.addChannelzTraceEvent(s)
181 }
182 ccr.curState = s
183 ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
184}
185
186func (ccr *ccResolverWrapper) ReportError(err error) {
187 if ccr.done.HasFired() {
188 return
189 }
190 grpclog.Warningf("ccResolverWrapper: reporting error to cc: %v", err)
191 if channelz.IsOn() {
192 channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
193 Desc: fmt.Sprintf("Resolver reported error: %v", err),
194 Severity: channelz.CtWarning,
195 })
196 }
197 ccr.poll(ccr.cc.updateResolverState(resolver.State{}, err))
198}
199
200// NewAddress is called by the resolver implementation to send addresses to gRPC.
201func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
202 if ccr.done.HasFired() {
203 return
204 }
205 grpclog.Infof("ccResolverWrapper: sending new addresses to cc: %v", addrs)
206 if channelz.IsOn() {
207 ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig})
208 }
209 ccr.curState.Addresses = addrs
210 ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
211}
212
213// NewServiceConfig is called by the resolver implementation to send service
214// configs to gRPC.
215func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
216 if ccr.done.HasFired() {
217 return
218 }
219 grpclog.Infof("ccResolverWrapper: got new service config: %v", sc)
220 if ccr.cc.dopts.disableServiceConfig {
221 grpclog.Infof("Service config lookups disabled; ignoring config")
222 return
223 }
224 scpr := parseServiceConfig(sc)
225 if scpr.Err != nil {
226 grpclog.Warningf("ccResolverWrapper: error parsing service config: %v", scpr.Err)
227 if channelz.IsOn() {
228 channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
229 Desc: fmt.Sprintf("Error parsing service config: %v", scpr.Err),
230 Severity: channelz.CtWarning,
231 })
232 }
233 ccr.poll(balancer.ErrBadResolverState)
234 return
235 }
236 if channelz.IsOn() {
237 ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr})
238 }
239 ccr.curState.ServiceConfig = scpr
240 ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
241}
242
243func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult {
244 return parseServiceConfig(scJSON)
245}
246
247func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
248 var updates []string
249 var oldSC, newSC *ServiceConfig
250 var oldOK, newOK bool
251 if ccr.curState.ServiceConfig != nil {
252 oldSC, oldOK = ccr.curState.ServiceConfig.Config.(*ServiceConfig)
253 }
254 if s.ServiceConfig != nil {
255 newSC, newOK = s.ServiceConfig.Config.(*ServiceConfig)
256 }
257 if oldOK != newOK || (oldOK && newOK && oldSC.rawJSONString != newSC.rawJSONString) {
258 updates = append(updates, "service config updated")
259 }
260 if len(ccr.curState.Addresses) > 0 && len(s.Addresses) == 0 {
261 updates = append(updates, "resolver returned an empty address list")
262 } else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 {
263 updates = append(updates, "resolver returned new addresses")
264 }
265 channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
266 Desc: fmt.Sprintf("Resolver state updated: %+v (%v)", s, strings.Join(updates, "; ")),
267 Severity: channelz.CtINFO,
268 })
269}