blob: 3eaf724cd6d05fa1d34c6c0429c1af7bd129069b [file] [log] [blame]
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "fmt"
23 "strings"
24 "sync"
25 "time"
26
27 "google.golang.org/grpc/balancer"
28 "google.golang.org/grpc/credentials"
29 "google.golang.org/grpc/grpclog"
30 "google.golang.org/grpc/internal/channelz"
31 "google.golang.org/grpc/internal/grpcsync"
32 "google.golang.org/grpc/resolver"
33 "google.golang.org/grpc/serviceconfig"
34)
35
36// ccResolverWrapper is a wrapper on top of cc for resolvers.
Dinesh Belwalkar396b6522020-02-06 22:11:53 +000037// It implements resolver.ClientConn interface.
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +000038type ccResolverWrapper struct {
39 cc *ClientConn
40 resolverMu sync.Mutex
41 resolver resolver.Resolver
42 done *grpcsync.Event
43 curState resolver.State
44
45 pollingMu sync.Mutex
46 polling chan struct{}
47}
48
49// split2 returns the values from strings.SplitN(s, sep, 2).
50// If sep is not found, it returns ("", "", false) instead.
51func split2(s, sep string) (string, string, bool) {
52 spl := strings.SplitN(s, sep, 2)
53 if len(spl) < 2 {
54 return "", "", false
55 }
56 return spl[0], spl[1], true
57}
58
59// parseTarget splits target into a struct containing scheme, authority and
60// endpoint.
61//
62// If target is not a valid scheme://authority/endpoint, it returns {Endpoint:
63// target}.
64func parseTarget(target string) (ret resolver.Target) {
65 var ok bool
66 ret.Scheme, ret.Endpoint, ok = split2(target, "://")
67 if !ok {
68 return resolver.Target{Endpoint: target}
69 }
70 ret.Authority, ret.Endpoint, ok = split2(ret.Endpoint, "/")
71 if !ok {
72 return resolver.Target{Endpoint: target}
73 }
74 return ret
75}
76
Dinesh Belwalkar396b6522020-02-06 22:11:53 +000077// newCCResolverWrapper uses the resolver.Builder to build a Resolver and
78// returns a ccResolverWrapper object which wraps the newly built resolver.
79func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) {
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +000080 ccr := &ccResolverWrapper{
81 cc: cc,
82 done: grpcsync.NewEvent(),
83 }
84
85 var credsClone credentials.TransportCredentials
86 if creds := cc.dopts.copts.TransportCredentials; creds != nil {
87 credsClone = creds.Clone()
88 }
Dinesh Belwalkar396b6522020-02-06 22:11:53 +000089 rbo := resolver.BuildOptions{
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +000090 DisableServiceConfig: cc.dopts.disableServiceConfig,
91 DialCreds: credsClone,
92 CredsBundle: cc.dopts.copts.CredsBundle,
93 Dialer: cc.dopts.copts.Dialer,
94 }
95
96 var err error
97 // We need to hold the lock here while we assign to the ccr.resolver field
98 // to guard against a data race caused by the following code path,
99 // rb.Build-->ccr.ReportError-->ccr.poll-->ccr.resolveNow, would end up
100 // accessing ccr.resolver which is being assigned here.
101 ccr.resolverMu.Lock()
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000102 defer ccr.resolverMu.Unlock()
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000103 ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
104 if err != nil {
105 return nil, err
106 }
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000107 return ccr, nil
108}
109
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000110func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) {
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000111 ccr.resolverMu.Lock()
112 if !ccr.done.HasFired() {
113 ccr.resolver.ResolveNow(o)
114 }
115 ccr.resolverMu.Unlock()
116}
117
118func (ccr *ccResolverWrapper) close() {
119 ccr.resolverMu.Lock()
120 ccr.resolver.Close()
121 ccr.done.Fire()
122 ccr.resolverMu.Unlock()
123}
124
125// poll begins or ends asynchronous polling of the resolver based on whether
126// err is ErrBadResolverState.
127func (ccr *ccResolverWrapper) poll(err error) {
128 ccr.pollingMu.Lock()
129 defer ccr.pollingMu.Unlock()
130 if err != balancer.ErrBadResolverState {
131 // stop polling
132 if ccr.polling != nil {
133 close(ccr.polling)
134 ccr.polling = nil
135 }
136 return
137 }
138 if ccr.polling != nil {
139 // already polling
140 return
141 }
142 p := make(chan struct{})
143 ccr.polling = p
144 go func() {
145 for i := 0; ; i++ {
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000146 ccr.resolveNow(resolver.ResolveNowOptions{})
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000147 t := time.NewTimer(ccr.cc.dopts.resolveNowBackoff(i))
148 select {
149 case <-p:
150 t.Stop()
151 return
152 case <-ccr.done.Done():
153 // Resolver has been closed.
154 t.Stop()
155 return
156 case <-t.C:
157 select {
158 case <-p:
159 return
160 default:
161 }
162 // Timer expired; re-resolve.
163 }
164 }
165 }()
166}
167
168func (ccr *ccResolverWrapper) UpdateState(s resolver.State) {
169 if ccr.done.HasFired() {
170 return
171 }
172 grpclog.Infof("ccResolverWrapper: sending update to cc: %v", s)
173 if channelz.IsOn() {
174 ccr.addChannelzTraceEvent(s)
175 }
176 ccr.curState = s
177 ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
178}
179
180func (ccr *ccResolverWrapper) ReportError(err error) {
181 if ccr.done.HasFired() {
182 return
183 }
184 grpclog.Warningf("ccResolverWrapper: reporting error to cc: %v", err)
185 if channelz.IsOn() {
186 channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
187 Desc: fmt.Sprintf("Resolver reported error: %v", err),
188 Severity: channelz.CtWarning,
189 })
190 }
191 ccr.poll(ccr.cc.updateResolverState(resolver.State{}, err))
192}
193
194// NewAddress is called by the resolver implementation to send addresses to gRPC.
195func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
196 if ccr.done.HasFired() {
197 return
198 }
199 grpclog.Infof("ccResolverWrapper: sending new addresses to cc: %v", addrs)
200 if channelz.IsOn() {
201 ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig})
202 }
203 ccr.curState.Addresses = addrs
204 ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
205}
206
207// NewServiceConfig is called by the resolver implementation to send service
208// configs to gRPC.
209func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
210 if ccr.done.HasFired() {
211 return
212 }
213 grpclog.Infof("ccResolverWrapper: got new service config: %v", sc)
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000214 if ccr.cc.dopts.disableServiceConfig {
215 grpclog.Infof("Service config lookups disabled; ignoring config")
216 return
217 }
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000218 scpr := parseServiceConfig(sc)
219 if scpr.Err != nil {
220 grpclog.Warningf("ccResolverWrapper: error parsing service config: %v", scpr.Err)
221 if channelz.IsOn() {
222 channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
223 Desc: fmt.Sprintf("Error parsing service config: %v", scpr.Err),
224 Severity: channelz.CtWarning,
225 })
226 }
227 ccr.poll(balancer.ErrBadResolverState)
228 return
229 }
230 if channelz.IsOn() {
231 ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr})
232 }
233 ccr.curState.ServiceConfig = scpr
234 ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
235}
236
237func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult {
238 return parseServiceConfig(scJSON)
239}
240
241func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
242 var updates []string
243 var oldSC, newSC *ServiceConfig
244 var oldOK, newOK bool
245 if ccr.curState.ServiceConfig != nil {
246 oldSC, oldOK = ccr.curState.ServiceConfig.Config.(*ServiceConfig)
247 }
248 if s.ServiceConfig != nil {
249 newSC, newOK = s.ServiceConfig.Config.(*ServiceConfig)
250 }
251 if oldOK != newOK || (oldOK && newOK && oldSC.rawJSONString != newSC.rawJSONString) {
252 updates = append(updates, "service config updated")
253 }
254 if len(ccr.curState.Addresses) > 0 && len(s.Addresses) == 0 {
255 updates = append(updates, "resolver returned an empty address list")
256 } else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 {
257 updates = append(updates, "resolver returned new addresses")
258 }
259 channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
260 Desc: fmt.Sprintf("Resolver state updated: %+v (%v)", s, strings.Join(updates, "; ")),
261 Severity: channelz.CtINFO,
262 })
263}