blob: 7dcefcfa0f647e7c0ea0ff92fbbe55f07b8732ab [file] [log] [blame]
Zack Williamse940c7a2019-08-21 14:25:39 -07001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "fmt"
23 "strings"
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000024 "sync"
25 "time"
Zack Williamse940c7a2019-08-21 14:25:39 -070026
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000027 "google.golang.org/grpc/balancer"
28 "google.golang.org/grpc/credentials"
Zack Williamse940c7a2019-08-21 14:25:39 -070029 "google.golang.org/grpc/grpclog"
30 "google.golang.org/grpc/internal/channelz"
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000031 "google.golang.org/grpc/internal/grpcsync"
Zack Williamse940c7a2019-08-21 14:25:39 -070032 "google.golang.org/grpc/resolver"
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000033 "google.golang.org/grpc/serviceconfig"
Zack Williamse940c7a2019-08-21 14:25:39 -070034)
35
36// ccResolverWrapper is a wrapper on top of cc for resolvers.
37// It implements resolver.ClientConnection interface.
38type ccResolverWrapper struct {
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000039 cc *ClientConn
40 resolverMu sync.Mutex
41 resolver resolver.Resolver
42 done *grpcsync.Event
43 curState resolver.State
44
45 pollingMu sync.Mutex
46 polling chan struct{}
Zack Williamse940c7a2019-08-21 14:25:39 -070047}
48
49// split2 returns the values from strings.SplitN(s, sep, 2).
50// If sep is not found, it returns ("", "", false) instead.
51func split2(s, sep string) (string, string, bool) {
52 spl := strings.SplitN(s, sep, 2)
53 if len(spl) < 2 {
54 return "", "", false
55 }
56 return spl[0], spl[1], true
57}
58
59// parseTarget splits target into a struct containing scheme, authority and
60// endpoint.
61//
62// If target is not a valid scheme://authority/endpoint, it returns {Endpoint:
63// target}.
64func parseTarget(target string) (ret resolver.Target) {
65 var ok bool
66 ret.Scheme, ret.Endpoint, ok = split2(target, "://")
67 if !ok {
68 return resolver.Target{Endpoint: target}
69 }
70 ret.Authority, ret.Endpoint, ok = split2(ret.Endpoint, "/")
71 if !ok {
72 return resolver.Target{Endpoint: target}
73 }
74 return ret
75}
76
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000077// newCCResolverWrapper uses the resolver.Builder stored in the ClientConn to
78// build a Resolver and returns a ccResolverWrapper object which wraps the
79// newly built resolver.
Zack Williamse940c7a2019-08-21 14:25:39 -070080func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) {
81 rb := cc.dopts.resolverBuilder
82 if rb == nil {
83 return nil, fmt.Errorf("could not get resolver for scheme: %q", cc.parsedTarget.Scheme)
84 }
85
86 ccr := &ccResolverWrapper{
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000087 cc: cc,
88 done: grpcsync.NewEvent(),
89 }
90
91 var credsClone credentials.TransportCredentials
92 if creds := cc.dopts.copts.TransportCredentials; creds != nil {
93 credsClone = creds.Clone()
94 }
95 rbo := resolver.BuildOption{
96 DisableServiceConfig: cc.dopts.disableServiceConfig,
97 DialCreds: credsClone,
98 CredsBundle: cc.dopts.copts.CredsBundle,
99 Dialer: cc.dopts.copts.Dialer,
Zack Williamse940c7a2019-08-21 14:25:39 -0700100 }
101
102 var err error
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000103 // We need to hold the lock here while we assign to the ccr.resolver field
104 // to guard against a data race caused by the following code path,
105 // rb.Build-->ccr.ReportError-->ccr.poll-->ccr.resolveNow, would end up
106 // accessing ccr.resolver which is being assigned here.
107 ccr.resolverMu.Lock()
108 ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
Zack Williamse940c7a2019-08-21 14:25:39 -0700109 if err != nil {
110 return nil, err
111 }
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000112 ccr.resolverMu.Unlock()
Zack Williamse940c7a2019-08-21 14:25:39 -0700113 return ccr, nil
114}
115
116func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOption) {
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000117 ccr.resolverMu.Lock()
118 if !ccr.done.HasFired() {
119 ccr.resolver.ResolveNow(o)
120 }
121 ccr.resolverMu.Unlock()
Zack Williamse940c7a2019-08-21 14:25:39 -0700122}
123
124func (ccr *ccResolverWrapper) close() {
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000125 ccr.resolverMu.Lock()
Zack Williamse940c7a2019-08-21 14:25:39 -0700126 ccr.resolver.Close()
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000127 ccr.done.Fire()
128 ccr.resolverMu.Unlock()
Zack Williamse940c7a2019-08-21 14:25:39 -0700129}
130
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000131// poll begins or ends asynchronous polling of the resolver based on whether
132// err is ErrBadResolverState.
133func (ccr *ccResolverWrapper) poll(err error) {
134 ccr.pollingMu.Lock()
135 defer ccr.pollingMu.Unlock()
136 if err != balancer.ErrBadResolverState {
137 // stop polling
138 if ccr.polling != nil {
139 close(ccr.polling)
140 ccr.polling = nil
141 }
142 return
143 }
144 if ccr.polling != nil {
145 // already polling
146 return
147 }
148 p := make(chan struct{})
149 ccr.polling = p
150 go func() {
151 for i := 0; ; i++ {
152 ccr.resolveNow(resolver.ResolveNowOption{})
153 t := time.NewTimer(ccr.cc.dopts.resolveNowBackoff(i))
154 select {
155 case <-p:
156 t.Stop()
157 return
158 case <-ccr.done.Done():
159 // Resolver has been closed.
160 t.Stop()
161 return
162 case <-t.C:
163 select {
164 case <-p:
165 return
166 default:
167 }
168 // Timer expired; re-resolve.
169 }
170 }
171 }()
Zack Williamse940c7a2019-08-21 14:25:39 -0700172}
173
174func (ccr *ccResolverWrapper) UpdateState(s resolver.State) {
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000175 if ccr.done.HasFired() {
Zack Williamse940c7a2019-08-21 14:25:39 -0700176 return
177 }
178 grpclog.Infof("ccResolverWrapper: sending update to cc: %v", s)
179 if channelz.IsOn() {
180 ccr.addChannelzTraceEvent(s)
181 }
Zack Williamse940c7a2019-08-21 14:25:39 -0700182 ccr.curState = s
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000183 ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
184}
185
186func (ccr *ccResolverWrapper) ReportError(err error) {
187 if ccr.done.HasFired() {
188 return
189 }
190 grpclog.Warningf("ccResolverWrapper: reporting error to cc: %v", err)
191 if channelz.IsOn() {
192 channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
193 Desc: fmt.Sprintf("Resolver reported error: %v", err),
194 Severity: channelz.CtWarning,
195 })
196 }
197 ccr.poll(ccr.cc.updateResolverState(resolver.State{}, err))
Zack Williamse940c7a2019-08-21 14:25:39 -0700198}
199
200// NewAddress is called by the resolver implementation to send addresses to gRPC.
201func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000202 if ccr.done.HasFired() {
Zack Williamse940c7a2019-08-21 14:25:39 -0700203 return
204 }
205 grpclog.Infof("ccResolverWrapper: sending new addresses to cc: %v", addrs)
206 if channelz.IsOn() {
207 ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig})
208 }
209 ccr.curState.Addresses = addrs
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000210 ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
Zack Williamse940c7a2019-08-21 14:25:39 -0700211}
212
213// NewServiceConfig is called by the resolver implementation to send service
214// configs to gRPC.
215func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000216 if ccr.done.HasFired() {
Zack Williamse940c7a2019-08-21 14:25:39 -0700217 return
218 }
219 grpclog.Infof("ccResolverWrapper: got new service config: %v", sc)
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000220 scpr := parseServiceConfig(sc)
221 if scpr.Err != nil {
222 grpclog.Warningf("ccResolverWrapper: error parsing service config: %v", scpr.Err)
223 if channelz.IsOn() {
224 channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
225 Desc: fmt.Sprintf("Error parsing service config: %v", scpr.Err),
226 Severity: channelz.CtWarning,
227 })
228 }
229 ccr.poll(balancer.ErrBadResolverState)
divyadesai19009132020-03-04 12:58:08 +0000230 return
Zack Williamse940c7a2019-08-21 14:25:39 -0700231 }
divyadesai19009132020-03-04 12:58:08 +0000232 if channelz.IsOn() {
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000233 ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr})
divyadesai19009132020-03-04 12:58:08 +0000234 }
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000235 ccr.curState.ServiceConfig = scpr
236 ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
237}
238
239func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult {
240 return parseServiceConfig(scJSON)
Zack Williamse940c7a2019-08-21 14:25:39 -0700241}
242
243func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
Zack Williamse940c7a2019-08-21 14:25:39 -0700244 var updates []string
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000245 var oldSC, newSC *ServiceConfig
246 var oldOK, newOK bool
247 if ccr.curState.ServiceConfig != nil {
248 oldSC, oldOK = ccr.curState.ServiceConfig.Config.(*ServiceConfig)
249 }
250 if s.ServiceConfig != nil {
251 newSC, newOK = s.ServiceConfig.Config.(*ServiceConfig)
252 }
divyadesai19009132020-03-04 12:58:08 +0000253 if oldOK != newOK || (oldOK && newOK && oldSC.rawJSONString != newSC.rawJSONString) {
Zack Williamse940c7a2019-08-21 14:25:39 -0700254 updates = append(updates, "service config updated")
255 }
256 if len(ccr.curState.Addresses) > 0 && len(s.Addresses) == 0 {
257 updates = append(updates, "resolver returned an empty address list")
258 } else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 {
259 updates = append(updates, "resolver returned new addresses")
260 }
261 channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
262 Desc: fmt.Sprintf("Resolver state updated: %+v (%v)", s, strings.Join(updates, "; ")),
263 Severity: channelz.CtINFO,
264 })
265}