blob: 7dcefcfa0f647e7c0ea0ff92fbbe55f07b8732ab [file] [log] [blame]
Don Newton98fd8812019-09-23 15:15:02 -04001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "fmt"
23 "strings"
Don Newtone0d34a82019-11-14 10:58:06 -050024 "sync"
25 "time"
Don Newton98fd8812019-09-23 15:15:02 -040026
Don Newtone0d34a82019-11-14 10:58:06 -050027 "google.golang.org/grpc/balancer"
28 "google.golang.org/grpc/credentials"
Don Newton98fd8812019-09-23 15:15:02 -040029 "google.golang.org/grpc/grpclog"
30 "google.golang.org/grpc/internal/channelz"
Don Newtone0d34a82019-11-14 10:58:06 -050031 "google.golang.org/grpc/internal/grpcsync"
Don Newton98fd8812019-09-23 15:15:02 -040032 "google.golang.org/grpc/resolver"
Don Newtone0d34a82019-11-14 10:58:06 -050033 "google.golang.org/grpc/serviceconfig"
Don Newton98fd8812019-09-23 15:15:02 -040034)
35
36// ccResolverWrapper is a wrapper on top of cc for resolvers.
37// It implements resolver.ClientConnection interface.
38type ccResolverWrapper struct {
Don Newtone0d34a82019-11-14 10:58:06 -050039 cc *ClientConn
40 resolverMu sync.Mutex
41 resolver resolver.Resolver
42 done *grpcsync.Event
43 curState resolver.State
44
45 pollingMu sync.Mutex
46 polling chan struct{}
Don Newton98fd8812019-09-23 15:15:02 -040047}
48
49// split2 returns the values from strings.SplitN(s, sep, 2).
50// If sep is not found, it returns ("", "", false) instead.
51func split2(s, sep string) (string, string, bool) {
52 spl := strings.SplitN(s, sep, 2)
53 if len(spl) < 2 {
54 return "", "", false
55 }
56 return spl[0], spl[1], true
57}
58
59// parseTarget splits target into a struct containing scheme, authority and
60// endpoint.
61//
62// If target is not a valid scheme://authority/endpoint, it returns {Endpoint:
63// target}.
64func parseTarget(target string) (ret resolver.Target) {
65 var ok bool
66 ret.Scheme, ret.Endpoint, ok = split2(target, "://")
67 if !ok {
68 return resolver.Target{Endpoint: target}
69 }
70 ret.Authority, ret.Endpoint, ok = split2(ret.Endpoint, "/")
71 if !ok {
72 return resolver.Target{Endpoint: target}
73 }
74 return ret
75}
76
Don Newtone0d34a82019-11-14 10:58:06 -050077// newCCResolverWrapper uses the resolver.Builder stored in the ClientConn to
78// build a Resolver and returns a ccResolverWrapper object which wraps the
79// newly built resolver.
Don Newton98fd8812019-09-23 15:15:02 -040080func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) {
81 rb := cc.dopts.resolverBuilder
82 if rb == nil {
83 return nil, fmt.Errorf("could not get resolver for scheme: %q", cc.parsedTarget.Scheme)
84 }
85
86 ccr := &ccResolverWrapper{
Don Newtone0d34a82019-11-14 10:58:06 -050087 cc: cc,
88 done: grpcsync.NewEvent(),
89 }
90
91 var credsClone credentials.TransportCredentials
92 if creds := cc.dopts.copts.TransportCredentials; creds != nil {
93 credsClone = creds.Clone()
94 }
95 rbo := resolver.BuildOption{
96 DisableServiceConfig: cc.dopts.disableServiceConfig,
97 DialCreds: credsClone,
98 CredsBundle: cc.dopts.copts.CredsBundle,
99 Dialer: cc.dopts.copts.Dialer,
Don Newton98fd8812019-09-23 15:15:02 -0400100 }
101
102 var err error
Don Newtone0d34a82019-11-14 10:58:06 -0500103 // We need to hold the lock here while we assign to the ccr.resolver field
104 // to guard against a data race caused by the following code path,
105 // rb.Build-->ccr.ReportError-->ccr.poll-->ccr.resolveNow, would end up
106 // accessing ccr.resolver which is being assigned here.
107 ccr.resolverMu.Lock()
108 ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
Don Newton98fd8812019-09-23 15:15:02 -0400109 if err != nil {
110 return nil, err
111 }
Don Newtone0d34a82019-11-14 10:58:06 -0500112 ccr.resolverMu.Unlock()
Don Newton98fd8812019-09-23 15:15:02 -0400113 return ccr, nil
114}
115
116func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOption) {
Don Newtone0d34a82019-11-14 10:58:06 -0500117 ccr.resolverMu.Lock()
118 if !ccr.done.HasFired() {
119 ccr.resolver.ResolveNow(o)
120 }
121 ccr.resolverMu.Unlock()
Don Newton98fd8812019-09-23 15:15:02 -0400122}
123
124func (ccr *ccResolverWrapper) close() {
Don Newtone0d34a82019-11-14 10:58:06 -0500125 ccr.resolverMu.Lock()
Don Newton98fd8812019-09-23 15:15:02 -0400126 ccr.resolver.Close()
Don Newtone0d34a82019-11-14 10:58:06 -0500127 ccr.done.Fire()
128 ccr.resolverMu.Unlock()
Don Newton98fd8812019-09-23 15:15:02 -0400129}
130
Don Newtone0d34a82019-11-14 10:58:06 -0500131// poll begins or ends asynchronous polling of the resolver based on whether
132// err is ErrBadResolverState.
133func (ccr *ccResolverWrapper) poll(err error) {
134 ccr.pollingMu.Lock()
135 defer ccr.pollingMu.Unlock()
136 if err != balancer.ErrBadResolverState {
137 // stop polling
138 if ccr.polling != nil {
139 close(ccr.polling)
140 ccr.polling = nil
141 }
142 return
143 }
144 if ccr.polling != nil {
145 // already polling
146 return
147 }
148 p := make(chan struct{})
149 ccr.polling = p
150 go func() {
151 for i := 0; ; i++ {
152 ccr.resolveNow(resolver.ResolveNowOption{})
153 t := time.NewTimer(ccr.cc.dopts.resolveNowBackoff(i))
154 select {
155 case <-p:
156 t.Stop()
157 return
158 case <-ccr.done.Done():
159 // Resolver has been closed.
160 t.Stop()
161 return
162 case <-t.C:
163 select {
164 case <-p:
165 return
166 default:
167 }
168 // Timer expired; re-resolve.
169 }
170 }
171 }()
Don Newton98fd8812019-09-23 15:15:02 -0400172}
173
174func (ccr *ccResolverWrapper) UpdateState(s resolver.State) {
Don Newtone0d34a82019-11-14 10:58:06 -0500175 if ccr.done.HasFired() {
Don Newton98fd8812019-09-23 15:15:02 -0400176 return
177 }
178 grpclog.Infof("ccResolverWrapper: sending update to cc: %v", s)
179 if channelz.IsOn() {
180 ccr.addChannelzTraceEvent(s)
181 }
Don Newton98fd8812019-09-23 15:15:02 -0400182 ccr.curState = s
Don Newtone0d34a82019-11-14 10:58:06 -0500183 ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
184}
185
186func (ccr *ccResolverWrapper) ReportError(err error) {
187 if ccr.done.HasFired() {
188 return
189 }
190 grpclog.Warningf("ccResolverWrapper: reporting error to cc: %v", err)
191 if channelz.IsOn() {
192 channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
193 Desc: fmt.Sprintf("Resolver reported error: %v", err),
194 Severity: channelz.CtWarning,
195 })
196 }
197 ccr.poll(ccr.cc.updateResolverState(resolver.State{}, err))
Don Newton98fd8812019-09-23 15:15:02 -0400198}
199
200// NewAddress is called by the resolver implementation to send addresses to gRPC.
201func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
Don Newtone0d34a82019-11-14 10:58:06 -0500202 if ccr.done.HasFired() {
Don Newton98fd8812019-09-23 15:15:02 -0400203 return
204 }
205 grpclog.Infof("ccResolverWrapper: sending new addresses to cc: %v", addrs)
206 if channelz.IsOn() {
207 ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig})
208 }
209 ccr.curState.Addresses = addrs
Don Newtone0d34a82019-11-14 10:58:06 -0500210 ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
Don Newton98fd8812019-09-23 15:15:02 -0400211}
212
213// NewServiceConfig is called by the resolver implementation to send service
214// configs to gRPC.
215func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
Don Newtone0d34a82019-11-14 10:58:06 -0500216 if ccr.done.HasFired() {
Don Newton98fd8812019-09-23 15:15:02 -0400217 return
218 }
219 grpclog.Infof("ccResolverWrapper: got new service config: %v", sc)
Don Newtone0d34a82019-11-14 10:58:06 -0500220 scpr := parseServiceConfig(sc)
221 if scpr.Err != nil {
222 grpclog.Warningf("ccResolverWrapper: error parsing service config: %v", scpr.Err)
223 if channelz.IsOn() {
224 channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
225 Desc: fmt.Sprintf("Error parsing service config: %v", scpr.Err),
226 Severity: channelz.CtWarning,
227 })
228 }
229 ccr.poll(balancer.ErrBadResolverState)
Don Newton98fd8812019-09-23 15:15:02 -0400230 return
231 }
232 if channelz.IsOn() {
Don Newtone0d34a82019-11-14 10:58:06 -0500233 ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr})
Don Newton98fd8812019-09-23 15:15:02 -0400234 }
Don Newtone0d34a82019-11-14 10:58:06 -0500235 ccr.curState.ServiceConfig = scpr
236 ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
237}
238
239func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult {
240 return parseServiceConfig(scJSON)
Don Newton98fd8812019-09-23 15:15:02 -0400241}
242
243func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
244 var updates []string
Don Newtone0d34a82019-11-14 10:58:06 -0500245 var oldSC, newSC *ServiceConfig
246 var oldOK, newOK bool
247 if ccr.curState.ServiceConfig != nil {
248 oldSC, oldOK = ccr.curState.ServiceConfig.Config.(*ServiceConfig)
249 }
250 if s.ServiceConfig != nil {
251 newSC, newOK = s.ServiceConfig.Config.(*ServiceConfig)
252 }
Don Newton98fd8812019-09-23 15:15:02 -0400253 if oldOK != newOK || (oldOK && newOK && oldSC.rawJSONString != newSC.rawJSONString) {
254 updates = append(updates, "service config updated")
255 }
256 if len(ccr.curState.Addresses) > 0 && len(s.Addresses) == 0 {
257 updates = append(updates, "resolver returned an empty address list")
258 } else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 {
259 updates = append(updates, "resolver returned new addresses")
260 }
261 channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
262 Desc: fmt.Sprintf("Resolver state updated: %+v (%v)", s, strings.Join(updates, "; ")),
263 Severity: channelz.CtINFO,
264 })
265}