blob: 22c4240cf7e82720b8b2c9563b2a13042d08fd51 [file] [log] [blame]
khenaidoo5fc5cea2021-08-11 17:39:16 -04001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "encoding/json"
23 "errors"
24 "fmt"
25 "reflect"
26 "strconv"
27 "strings"
28 "time"
29
30 "google.golang.org/grpc/codes"
31 "google.golang.org/grpc/internal"
32 internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
33 "google.golang.org/grpc/serviceconfig"
34)
35
36const maxInt = int(^uint(0) >> 1)
37
38// MethodConfig defines the configuration recommended by the service providers for a
39// particular method.
40//
41// Deprecated: Users should not use this struct. Service config should be received
42// through name resolver, as specified here
43// https://github.com/grpc/grpc/blob/master/doc/service_config.md
44type MethodConfig = internalserviceconfig.MethodConfig
45
46type lbConfig struct {
47 name string
48 cfg serviceconfig.LoadBalancingConfig
49}
50
51// ServiceConfig is provided by the service provider and contains parameters for how
52// clients that connect to the service should behave.
53//
54// Deprecated: Users should not use this struct. Service config should be received
55// through name resolver, as specified here
56// https://github.com/grpc/grpc/blob/master/doc/service_config.md
57type ServiceConfig struct {
58 serviceconfig.Config
59
60 // LB is the load balancer the service providers recommends. The balancer
61 // specified via grpc.WithBalancerName will override this. This is deprecated;
62 // lbConfigs is preferred. If lbConfig and LB are both present, lbConfig
63 // will be used.
64 LB *string
65
66 // lbConfig is the service config's load balancing configuration. If
67 // lbConfig and LB are both present, lbConfig will be used.
68 lbConfig *lbConfig
69
70 // Methods contains a map for the methods in this service. If there is an
71 // exact match for a method (i.e. /service/method) in the map, use the
72 // corresponding MethodConfig. If there's no exact match, look for the
73 // default config for the service (/service/) and use the corresponding
74 // MethodConfig if it exists. Otherwise, the method has no MethodConfig to
75 // use.
76 Methods map[string]MethodConfig
77
78 // If a retryThrottlingPolicy is provided, gRPC will automatically throttle
79 // retry attempts and hedged RPCs when the client’s ratio of failures to
80 // successes exceeds a threshold.
81 //
82 // For each server name, the gRPC client will maintain a token_count which is
83 // initially set to maxTokens, and can take values between 0 and maxTokens.
84 //
85 // Every outgoing RPC (regardless of service or method invoked) will change
86 // token_count as follows:
87 //
88 // - Every failed RPC will decrement the token_count by 1.
89 // - Every successful RPC will increment the token_count by tokenRatio.
90 //
91 // If token_count is less than or equal to maxTokens / 2, then RPCs will not
92 // be retried and hedged RPCs will not be sent.
93 retryThrottling *retryThrottlingPolicy
94 // healthCheckConfig must be set as one of the requirement to enable LB channel
95 // health check.
96 healthCheckConfig *healthCheckConfig
97 // rawJSONString stores service config json string that get parsed into
98 // this service config struct.
99 rawJSONString string
100}
101
102// healthCheckConfig defines the go-native version of the LB channel health check config.
103type healthCheckConfig struct {
104 // serviceName is the service name to use in the health-checking request.
105 ServiceName string
106}
107
108type jsonRetryPolicy struct {
109 MaxAttempts int
110 InitialBackoff string
111 MaxBackoff string
112 BackoffMultiplier float64
113 RetryableStatusCodes []codes.Code
114}
115
116// retryThrottlingPolicy defines the go-native version of the retry throttling
117// policy defined by the service config here:
118// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#integration-with-service-config
119type retryThrottlingPolicy struct {
120 // The number of tokens starts at maxTokens. The token_count will always be
121 // between 0 and maxTokens.
122 //
123 // This field is required and must be greater than zero.
124 MaxTokens float64
125 // The amount of tokens to add on each successful RPC. Typically this will
126 // be some number between 0 and 1, e.g., 0.1.
127 //
128 // This field is required and must be greater than zero. Up to 3 decimal
129 // places are supported.
130 TokenRatio float64
131}
132
133func parseDuration(s *string) (*time.Duration, error) {
134 if s == nil {
135 return nil, nil
136 }
137 if !strings.HasSuffix(*s, "s") {
138 return nil, fmt.Errorf("malformed duration %q", *s)
139 }
140 ss := strings.SplitN((*s)[:len(*s)-1], ".", 3)
141 if len(ss) > 2 {
142 return nil, fmt.Errorf("malformed duration %q", *s)
143 }
144 // hasDigits is set if either the whole or fractional part of the number is
145 // present, since both are optional but one is required.
146 hasDigits := false
147 var d time.Duration
148 if len(ss[0]) > 0 {
149 i, err := strconv.ParseInt(ss[0], 10, 32)
150 if err != nil {
151 return nil, fmt.Errorf("malformed duration %q: %v", *s, err)
152 }
153 d = time.Duration(i) * time.Second
154 hasDigits = true
155 }
156 if len(ss) == 2 && len(ss[1]) > 0 {
157 if len(ss[1]) > 9 {
158 return nil, fmt.Errorf("malformed duration %q", *s)
159 }
160 f, err := strconv.ParseInt(ss[1], 10, 64)
161 if err != nil {
162 return nil, fmt.Errorf("malformed duration %q: %v", *s, err)
163 }
164 for i := 9; i > len(ss[1]); i-- {
165 f *= 10
166 }
167 d += time.Duration(f)
168 hasDigits = true
169 }
170 if !hasDigits {
171 return nil, fmt.Errorf("malformed duration %q", *s)
172 }
173
174 return &d, nil
175}
176
177type jsonName struct {
178 Service string
179 Method string
180}
181
182var (
183 errDuplicatedName = errors.New("duplicated name")
184 errEmptyServiceNonEmptyMethod = errors.New("cannot combine empty 'service' and non-empty 'method'")
185)
186
187func (j jsonName) generatePath() (string, error) {
188 if j.Service == "" {
189 if j.Method != "" {
190 return "", errEmptyServiceNonEmptyMethod
191 }
192 return "", nil
193 }
194 res := "/" + j.Service + "/"
195 if j.Method != "" {
196 res += j.Method
197 }
198 return res, nil
199}
200
201// TODO(lyuxuan): delete this struct after cleaning up old service config implementation.
202type jsonMC struct {
203 Name *[]jsonName
204 WaitForReady *bool
205 Timeout *string
206 MaxRequestMessageBytes *int64
207 MaxResponseMessageBytes *int64
208 RetryPolicy *jsonRetryPolicy
209}
210
211// TODO(lyuxuan): delete this struct after cleaning up old service config implementation.
212type jsonSC struct {
213 LoadBalancingPolicy *string
214 LoadBalancingConfig *internalserviceconfig.BalancerConfig
215 MethodConfig *[]jsonMC
216 RetryThrottling *retryThrottlingPolicy
217 HealthCheckConfig *healthCheckConfig
218}
219
220func init() {
221 internal.ParseServiceConfigForTesting = parseServiceConfig
222}
223func parseServiceConfig(js string) *serviceconfig.ParseResult {
224 if len(js) == 0 {
225 return &serviceconfig.ParseResult{Err: fmt.Errorf("no JSON service config provided")}
226 }
227 var rsc jsonSC
228 err := json.Unmarshal([]byte(js), &rsc)
229 if err != nil {
230 logger.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err)
231 return &serviceconfig.ParseResult{Err: err}
232 }
233 sc := ServiceConfig{
234 LB: rsc.LoadBalancingPolicy,
235 Methods: make(map[string]MethodConfig),
236 retryThrottling: rsc.RetryThrottling,
237 healthCheckConfig: rsc.HealthCheckConfig,
238 rawJSONString: js,
239 }
240 if c := rsc.LoadBalancingConfig; c != nil {
241 sc.lbConfig = &lbConfig{
242 name: c.Name,
243 cfg: c.Config,
244 }
245 }
246
247 if rsc.MethodConfig == nil {
248 return &serviceconfig.ParseResult{Config: &sc}
249 }
250
251 paths := map[string]struct{}{}
252 for _, m := range *rsc.MethodConfig {
253 if m.Name == nil {
254 continue
255 }
256 d, err := parseDuration(m.Timeout)
257 if err != nil {
258 logger.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err)
259 return &serviceconfig.ParseResult{Err: err}
260 }
261
262 mc := MethodConfig{
263 WaitForReady: m.WaitForReady,
264 Timeout: d,
265 }
266 if mc.RetryPolicy, err = convertRetryPolicy(m.RetryPolicy); err != nil {
267 logger.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err)
268 return &serviceconfig.ParseResult{Err: err}
269 }
270 if m.MaxRequestMessageBytes != nil {
271 if *m.MaxRequestMessageBytes > int64(maxInt) {
272 mc.MaxReqSize = newInt(maxInt)
273 } else {
274 mc.MaxReqSize = newInt(int(*m.MaxRequestMessageBytes))
275 }
276 }
277 if m.MaxResponseMessageBytes != nil {
278 if *m.MaxResponseMessageBytes > int64(maxInt) {
279 mc.MaxRespSize = newInt(maxInt)
280 } else {
281 mc.MaxRespSize = newInt(int(*m.MaxResponseMessageBytes))
282 }
283 }
284 for i, n := range *m.Name {
285 path, err := n.generatePath()
286 if err != nil {
287 logger.Warningf("grpc: parseServiceConfig error unmarshaling %s due to methodConfig[%d]: %v", js, i, err)
288 return &serviceconfig.ParseResult{Err: err}
289 }
290
291 if _, ok := paths[path]; ok {
292 err = errDuplicatedName
293 logger.Warningf("grpc: parseServiceConfig error unmarshaling %s due to methodConfig[%d]: %v", js, i, err)
294 return &serviceconfig.ParseResult{Err: err}
295 }
296 paths[path] = struct{}{}
297 sc.Methods[path] = mc
298 }
299 }
300
301 if sc.retryThrottling != nil {
302 if mt := sc.retryThrottling.MaxTokens; mt <= 0 || mt > 1000 {
303 return &serviceconfig.ParseResult{Err: fmt.Errorf("invalid retry throttling config: maxTokens (%v) out of range (0, 1000]", mt)}
304 }
305 if tr := sc.retryThrottling.TokenRatio; tr <= 0 {
306 return &serviceconfig.ParseResult{Err: fmt.Errorf("invalid retry throttling config: tokenRatio (%v) may not be negative", tr)}
307 }
308 }
309 return &serviceconfig.ParseResult{Config: &sc}
310}
311
312func convertRetryPolicy(jrp *jsonRetryPolicy) (p *internalserviceconfig.RetryPolicy, err error) {
313 if jrp == nil {
314 return nil, nil
315 }
316 ib, err := parseDuration(&jrp.InitialBackoff)
317 if err != nil {
318 return nil, err
319 }
320 mb, err := parseDuration(&jrp.MaxBackoff)
321 if err != nil {
322 return nil, err
323 }
324
325 if jrp.MaxAttempts <= 1 ||
326 *ib <= 0 ||
327 *mb <= 0 ||
328 jrp.BackoffMultiplier <= 0 ||
329 len(jrp.RetryableStatusCodes) == 0 {
330 logger.Warningf("grpc: ignoring retry policy %v due to illegal configuration", jrp)
331 return nil, nil
332 }
333
334 rp := &internalserviceconfig.RetryPolicy{
335 MaxAttempts: jrp.MaxAttempts,
336 InitialBackoff: *ib,
337 MaxBackoff: *mb,
338 BackoffMultiplier: jrp.BackoffMultiplier,
339 RetryableStatusCodes: make(map[codes.Code]bool),
340 }
341 if rp.MaxAttempts > 5 {
342 // TODO(retry): Make the max maxAttempts configurable.
343 rp.MaxAttempts = 5
344 }
345 for _, code := range jrp.RetryableStatusCodes {
346 rp.RetryableStatusCodes[code] = true
347 }
348 return rp, nil
349}
350
351func min(a, b *int) *int {
352 if *a < *b {
353 return a
354 }
355 return b
356}
357
358func getMaxSize(mcMax, doptMax *int, defaultVal int) *int {
359 if mcMax == nil && doptMax == nil {
360 return &defaultVal
361 }
362 if mcMax != nil && doptMax != nil {
363 return min(mcMax, doptMax)
364 }
365 if mcMax != nil {
366 return mcMax
367 }
368 return doptMax
369}
370
371func newInt(b int) *int {
372 return &b
373}
374
375func init() {
376 internal.EqualServiceConfigForTesting = equalServiceConfig
377}
378
379// equalServiceConfig compares two configs. The rawJSONString field is ignored,
380// because they may diff in white spaces.
381//
382// If any of them is NOT *ServiceConfig, return false.
383func equalServiceConfig(a, b serviceconfig.Config) bool {
384 aa, ok := a.(*ServiceConfig)
385 if !ok {
386 return false
387 }
388 bb, ok := b.(*ServiceConfig)
389 if !ok {
390 return false
391 }
392 aaRaw := aa.rawJSONString
393 aa.rawJSONString = ""
394 bbRaw := bb.rawJSONString
395 bb.rawJSONString = ""
396 defer func() {
397 aa.rawJSONString = aaRaw
398 bb.rawJSONString = bbRaw
399 }()
400 // Using reflect.DeepEqual instead of cmp.Equal because many balancer
401 // configs are unexported, and cmp.Equal cannot compare unexported fields
402 // from unexported structs.
403 return reflect.DeepEqual(aa, bb)
404}