blob: b408b3688f2eafdbb1536f0b2b81963fa64037d1 [file] [log] [blame]
khenaidoo5fc5cea2021-08-11 17:39:16 -04001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
Akash Kankanala761955c2024-02-21 19:32:20 +053022 "context"
khenaidoo5fc5cea2021-08-11 17:39:16 -040023 "strings"
24 "sync"
25
26 "google.golang.org/grpc/balancer"
khenaidoo5fc5cea2021-08-11 17:39:16 -040027 "google.golang.org/grpc/internal/channelz"
28 "google.golang.org/grpc/internal/grpcsync"
Akash Kankanala761955c2024-02-21 19:32:20 +053029 "google.golang.org/grpc/internal/pretty"
khenaidoo5fc5cea2021-08-11 17:39:16 -040030 "google.golang.org/grpc/resolver"
31 "google.golang.org/grpc/serviceconfig"
32)
33
Akash Kankanala761955c2024-02-21 19:32:20 +053034// resolverStateUpdater wraps the single method used by ccResolverWrapper to
35// report a state update from the actual resolver implementation.
36type resolverStateUpdater interface {
37 updateResolverState(s resolver.State, err error) error
38}
39
khenaidoo5fc5cea2021-08-11 17:39:16 -040040// ccResolverWrapper is a wrapper on top of cc for resolvers.
41// It implements resolver.ClientConn interface.
42type ccResolverWrapper struct {
Akash Kankanala761955c2024-02-21 19:32:20 +053043 // The following fields are initialized when the wrapper is created and are
44 // read-only afterwards, and therefore can be accessed without a mutex.
45 cc resolverStateUpdater
46 channelzID *channelz.Identifier
47 ignoreServiceConfig bool
48 opts ccResolverWrapperOpts
49 serializer *grpcsync.CallbackSerializer // To serialize all incoming calls.
50 serializerCancel context.CancelFunc // To close the serializer, accessed only from close().
khenaidoo5fc5cea2021-08-11 17:39:16 -040051
Akash Kankanala761955c2024-02-21 19:32:20 +053052 // All incoming (resolver --> gRPC) calls are guaranteed to execute in a
53 // mutually exclusive manner as they are scheduled on the serializer.
54 // Fields accessed *only* in these serializer callbacks, can therefore be
55 // accessed without a mutex.
56 curState resolver.State
57
58 // mu guards access to the below fields.
59 mu sync.Mutex
60 closed bool
61 resolver resolver.Resolver // Accessed only from outgoing calls.
62}
63
64// ccResolverWrapperOpts wraps the arguments to be passed when creating a new
65// ccResolverWrapper.
66type ccResolverWrapperOpts struct {
67 target resolver.Target // User specified dial target to resolve.
68 builder resolver.Builder // Resolver builder to use.
69 bOpts resolver.BuildOptions // Resolver build options to use.
70 channelzID *channelz.Identifier // Channelz identifier for the channel.
khenaidoo5fc5cea2021-08-11 17:39:16 -040071}
72
73// newCCResolverWrapper uses the resolver.Builder to build a Resolver and
74// returns a ccResolverWrapper object which wraps the newly built resolver.
Akash Kankanala761955c2024-02-21 19:32:20 +053075func newCCResolverWrapper(cc resolverStateUpdater, opts ccResolverWrapperOpts) (*ccResolverWrapper, error) {
76 ctx, cancel := context.WithCancel(context.Background())
khenaidoo5fc5cea2021-08-11 17:39:16 -040077 ccr := &ccResolverWrapper{
Akash Kankanala761955c2024-02-21 19:32:20 +053078 cc: cc,
79 channelzID: opts.channelzID,
80 ignoreServiceConfig: opts.bOpts.DisableServiceConfig,
81 opts: opts,
82 serializer: grpcsync.NewCallbackSerializer(ctx),
83 serializerCancel: cancel,
khenaidoo5fc5cea2021-08-11 17:39:16 -040084 }
85
Akash Kankanala761955c2024-02-21 19:32:20 +053086 // Cannot hold the lock at build time because the resolver can send an
87 // update or error inline and these incoming calls grab the lock to schedule
88 // a callback in the serializer.
89 r, err := opts.builder.Build(opts.target, ccr, opts.bOpts)
khenaidoo5fc5cea2021-08-11 17:39:16 -040090 if err != nil {
Akash Kankanala761955c2024-02-21 19:32:20 +053091 cancel()
khenaidoo5fc5cea2021-08-11 17:39:16 -040092 return nil, err
93 }
Akash Kankanala761955c2024-02-21 19:32:20 +053094
95 // Any error reported by the resolver at build time that leads to a
96 // re-resolution request from the balancer is dropped by grpc until we
97 // return from this function. So, we don't have to handle pending resolveNow
98 // requests here.
99 ccr.mu.Lock()
100 ccr.resolver = r
101 ccr.mu.Unlock()
102
khenaidoo5fc5cea2021-08-11 17:39:16 -0400103 return ccr, nil
104}
105
106func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) {
Akash Kankanala761955c2024-02-21 19:32:20 +0530107 ccr.mu.Lock()
108 defer ccr.mu.Unlock()
109
110 // ccr.resolver field is set only after the call to Build() returns. But in
111 // the process of building, the resolver may send an error update which when
112 // propagated to the balancer may result in a re-resolution request.
113 if ccr.closed || ccr.resolver == nil {
114 return
khenaidoo5fc5cea2021-08-11 17:39:16 -0400115 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530116 ccr.resolver.ResolveNow(o)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400117}
118
119func (ccr *ccResolverWrapper) close() {
Akash Kankanala761955c2024-02-21 19:32:20 +0530120 ccr.mu.Lock()
121 if ccr.closed {
122 ccr.mu.Unlock()
123 return
124 }
125
126 channelz.Info(logger, ccr.channelzID, "Closing the name resolver")
127
128 // Close the serializer to ensure that no more calls from the resolver are
129 // handled, before actually closing the resolver.
130 ccr.serializerCancel()
131 ccr.closed = true
132 r := ccr.resolver
133 ccr.mu.Unlock()
134
135 // Give enqueued callbacks a chance to finish.
136 <-ccr.serializer.Done
137
138 // Spawn a goroutine to close the resolver (since it may block trying to
139 // cleanup all allocated resources) and return early.
140 go r.Close()
khenaidoo5fc5cea2021-08-11 17:39:16 -0400141}
142
Akash Kankanala761955c2024-02-21 19:32:20 +0530143// serializerScheduleLocked is a convenience method to schedule a function to be
144// run on the serializer while holding ccr.mu.
145func (ccr *ccResolverWrapper) serializerScheduleLocked(f func(context.Context)) {
146 ccr.mu.Lock()
147 ccr.serializer.Schedule(f)
148 ccr.mu.Unlock()
149}
150
151// UpdateState is called by resolver implementations to report new state to gRPC
152// which includes addresses and service config.
khenaidoo5fc5cea2021-08-11 17:39:16 -0400153func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
Akash Kankanala761955c2024-02-21 19:32:20 +0530154 errCh := make(chan error, 1)
155 ok := ccr.serializer.Schedule(func(context.Context) {
156 ccr.addChannelzTraceEvent(s)
157 ccr.curState = s
158 if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {
159 errCh <- balancer.ErrBadResolverState
160 return
161 }
162 errCh <- nil
163 })
164 if !ok {
165 // The only time when Schedule() fail to add the callback to the
166 // serializer is when the serializer is closed, and this happens only
167 // when the resolver wrapper is closed.
khenaidoo5fc5cea2021-08-11 17:39:16 -0400168 return nil
169 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530170 return <-errCh
khenaidoo5fc5cea2021-08-11 17:39:16 -0400171}
172
Akash Kankanala761955c2024-02-21 19:32:20 +0530173// ReportError is called by resolver implementations to report errors
174// encountered during name resolution to gRPC.
khenaidoo5fc5cea2021-08-11 17:39:16 -0400175func (ccr *ccResolverWrapper) ReportError(err error) {
Akash Kankanala761955c2024-02-21 19:32:20 +0530176 ccr.serializerScheduleLocked(func(_ context.Context) {
177 channelz.Warningf(logger, ccr.channelzID, "ccResolverWrapper: reporting error to cc: %v", err)
178 ccr.cc.updateResolverState(resolver.State{}, err)
179 })
khenaidoo5fc5cea2021-08-11 17:39:16 -0400180}
181
Akash Kankanala761955c2024-02-21 19:32:20 +0530182// NewAddress is called by the resolver implementation to send addresses to
183// gRPC.
khenaidoo5fc5cea2021-08-11 17:39:16 -0400184func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
Akash Kankanala761955c2024-02-21 19:32:20 +0530185 ccr.serializerScheduleLocked(func(_ context.Context) {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400186 ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig})
Akash Kankanala761955c2024-02-21 19:32:20 +0530187 ccr.curState.Addresses = addrs
188 ccr.cc.updateResolverState(ccr.curState, nil)
189 })
khenaidoo5fc5cea2021-08-11 17:39:16 -0400190}
191
192// NewServiceConfig is called by the resolver implementation to send service
193// configs to gRPC.
194func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
Akash Kankanala761955c2024-02-21 19:32:20 +0530195 ccr.serializerScheduleLocked(func(_ context.Context) {
196 channelz.Infof(logger, ccr.channelzID, "ccResolverWrapper: got new service config: %s", sc)
197 if ccr.ignoreServiceConfig {
198 channelz.Info(logger, ccr.channelzID, "Service config lookups disabled; ignoring config")
199 return
200 }
201 scpr := parseServiceConfig(sc)
202 if scpr.Err != nil {
203 channelz.Warningf(logger, ccr.channelzID, "ccResolverWrapper: error parsing service config: %v", scpr.Err)
204 return
205 }
khenaidoo5fc5cea2021-08-11 17:39:16 -0400206 ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr})
Akash Kankanala761955c2024-02-21 19:32:20 +0530207 ccr.curState.ServiceConfig = scpr
208 ccr.cc.updateResolverState(ccr.curState, nil)
209 })
khenaidoo5fc5cea2021-08-11 17:39:16 -0400210}
211
Akash Kankanala761955c2024-02-21 19:32:20 +0530212// ParseServiceConfig is called by resolver implementations to parse a JSON
213// representation of the service config.
khenaidoo5fc5cea2021-08-11 17:39:16 -0400214func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult {
215 return parseServiceConfig(scJSON)
216}
217
Akash Kankanala761955c2024-02-21 19:32:20 +0530218// addChannelzTraceEvent adds a channelz trace event containing the new
219// state received from resolver implementations.
khenaidoo5fc5cea2021-08-11 17:39:16 -0400220func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
221 var updates []string
222 var oldSC, newSC *ServiceConfig
223 var oldOK, newOK bool
224 if ccr.curState.ServiceConfig != nil {
225 oldSC, oldOK = ccr.curState.ServiceConfig.Config.(*ServiceConfig)
226 }
227 if s.ServiceConfig != nil {
228 newSC, newOK = s.ServiceConfig.Config.(*ServiceConfig)
229 }
230 if oldOK != newOK || (oldOK && newOK && oldSC.rawJSONString != newSC.rawJSONString) {
231 updates = append(updates, "service config updated")
232 }
233 if len(ccr.curState.Addresses) > 0 && len(s.Addresses) == 0 {
234 updates = append(updates, "resolver returned an empty address list")
235 } else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 {
236 updates = append(updates, "resolver returned new addresses")
237 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530238 channelz.Infof(logger, ccr.channelzID, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; "))
khenaidoo5fc5cea2021-08-11 17:39:16 -0400239}