gRPC migration
Change-Id: Ib390f6dde0d5a8d6db12ccd7da41135570ad1354
diff --git a/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go b/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go
index 8b10516..4062c02 100644
--- a/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go
+++ b/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go
@@ -98,7 +98,7 @@
// New methodLogger with same service overrides the old one.
func (l *logger) setServiceMethodLogger(service string, ml *methodLoggerConfig) error {
if _, ok := l.services[service]; ok {
- return fmt.Errorf("conflicting service rules for service %v found", service)
+ return fmt.Errorf("conflicting rules for service %v found", service)
}
if l.services == nil {
l.services = make(map[string]*methodLoggerConfig)
@@ -112,10 +112,10 @@
// New methodLogger with same method overrides the old one.
func (l *logger) setMethodMethodLogger(method string, ml *methodLoggerConfig) error {
if _, ok := l.blacklist[method]; ok {
- return fmt.Errorf("conflicting blacklist rules for method %v found", method)
+ return fmt.Errorf("conflicting rules for method %v found", method)
}
if _, ok := l.methods[method]; ok {
- return fmt.Errorf("conflicting method rules for method %v found", method)
+ return fmt.Errorf("conflicting rules for method %v found", method)
}
if l.methods == nil {
l.methods = make(map[string]*methodLoggerConfig)
@@ -127,10 +127,10 @@
// Set blacklist method for "-service/method".
func (l *logger) setBlacklist(method string) error {
if _, ok := l.blacklist[method]; ok {
- return fmt.Errorf("conflicting blacklist rules for method %v found", method)
+ return fmt.Errorf("conflicting rules for method %v found", method)
}
if _, ok := l.methods[method]; ok {
- return fmt.Errorf("conflicting method rules for method %v found", method)
+ return fmt.Errorf("conflicting rules for method %v found", method)
}
if l.blacklist == nil {
l.blacklist = make(map[string]struct{})
diff --git a/vendor/google.golang.org/grpc/internal/buffer/unbounded.go b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go
index 9f6a0c1..2cb3109 100644
--- a/vendor/google.golang.org/grpc/internal/buffer/unbounded.go
+++ b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go
@@ -26,13 +26,6 @@
//
// All methods on this type are thread-safe and don't block on anything except
// the underlying mutex used for synchronization.
-//
-// Unbounded supports values of any type to be stored in it by using a channel
-// of `interface{}`. This means that a call to Put() incurs an extra memory
-// allocation, and also that users need a type assertion while reading. For
-// performance critical code paths, using Unbounded is strongly discouraged and
-// defining a new type specific implementation of this buffer is preferred. See
-// internal/transport/transport.go for an example of this.
type Unbounded struct {
c chan interface{}
mu sync.Mutex
diff --git a/vendor/google.golang.org/grpc/internal/channelz/funcs.go b/vendor/google.golang.org/grpc/internal/channelz/funcs.go
index e4252e5..f0744f9 100644
--- a/vendor/google.golang.org/grpc/internal/channelz/funcs.go
+++ b/vendor/google.golang.org/grpc/internal/channelz/funcs.go
@@ -30,7 +30,7 @@
"sync/atomic"
"time"
- "google.golang.org/grpc/internal/grpclog"
+ "google.golang.org/grpc/grpclog"
)
const (
@@ -216,7 +216,7 @@
// by pid). It returns the unique channelz tracking id assigned to this subchannel.
func RegisterSubChannel(c Channel, pid int64, ref string) int64 {
if pid == 0 {
- grpclog.ErrorDepth(0, "a SubChannel's parent id cannot be 0")
+ grpclog.Error("a SubChannel's parent id cannot be 0")
return 0
}
id := idGen.genID()
@@ -253,7 +253,7 @@
// this listen socket.
func RegisterListenSocket(s Socket, pid int64, ref string) int64 {
if pid == 0 {
- grpclog.ErrorDepth(0, "a ListenSocket's parent id cannot be 0")
+ grpclog.Error("a ListenSocket's parent id cannot be 0")
return 0
}
id := idGen.genID()
@@ -268,7 +268,7 @@
// this normal socket.
func RegisterNormalSocket(s Socket, pid int64, ref string) int64 {
if pid == 0 {
- grpclog.ErrorDepth(0, "a NormalSocket's parent id cannot be 0")
+ grpclog.Error("a NormalSocket's parent id cannot be 0")
return 0
}
id := idGen.genID()
@@ -294,19 +294,7 @@
}
// AddTraceEvent adds trace related to the entity with specified id, using the provided TraceEventDesc.
-func AddTraceEvent(id int64, depth int, desc *TraceEventDesc) {
- for d := desc; d != nil; d = d.Parent {
- switch d.Severity {
- case CtUNKNOWN:
- grpclog.InfoDepth(depth+1, d.Desc)
- case CtINFO:
- grpclog.InfoDepth(depth+1, d.Desc)
- case CtWarning:
- grpclog.WarningDepth(depth+1, d.Desc)
- case CtError:
- grpclog.ErrorDepth(depth+1, d.Desc)
- }
- }
+func AddTraceEvent(id int64, desc *TraceEventDesc) {
if getMaxTraceEntry() == 0 {
return
}
diff --git a/vendor/google.golang.org/grpc/internal/channelz/logging.go b/vendor/google.golang.org/grpc/internal/channelz/logging.go
deleted file mode 100644
index 59c7bed..0000000
--- a/vendor/google.golang.org/grpc/internal/channelz/logging.go
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- *
- * Copyright 2020 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package channelz
-
-import (
- "fmt"
-
- "google.golang.org/grpc/internal/grpclog"
-)
-
-// Info logs through grpclog.Info and adds a trace event if channelz is on.
-func Info(id int64, args ...interface{}) {
- if IsOn() {
- AddTraceEvent(id, 1, &TraceEventDesc{
- Desc: fmt.Sprint(args...),
- Severity: CtINFO,
- })
- } else {
- grpclog.InfoDepth(1, args...)
- }
-}
-
-// Infof logs through grpclog.Infof and adds a trace event if channelz is on.
-func Infof(id int64, format string, args ...interface{}) {
- msg := fmt.Sprintf(format, args...)
- if IsOn() {
- AddTraceEvent(id, 1, &TraceEventDesc{
- Desc: msg,
- Severity: CtINFO,
- })
- } else {
- grpclog.InfoDepth(1, msg)
- }
-}
-
-// Warning logs through grpclog.Warning and adds a trace event if channelz is on.
-func Warning(id int64, args ...interface{}) {
- if IsOn() {
- AddTraceEvent(id, 1, &TraceEventDesc{
- Desc: fmt.Sprint(args...),
- Severity: CtWarning,
- })
- } else {
- grpclog.WarningDepth(1, args...)
- }
-}
-
-// Warningf logs through grpclog.Warningf and adds a trace event if channelz is on.
-func Warningf(id int64, format string, args ...interface{}) {
- msg := fmt.Sprintf(format, args...)
- if IsOn() {
- AddTraceEvent(id, 1, &TraceEventDesc{
- Desc: msg,
- Severity: CtWarning,
- })
- } else {
- grpclog.WarningDepth(1, msg)
- }
-}
-
-// Error logs through grpclog.Error and adds a trace event if channelz is on.
-func Error(id int64, args ...interface{}) {
- if IsOn() {
- AddTraceEvent(id, 1, &TraceEventDesc{
- Desc: fmt.Sprint(args...),
- Severity: CtError,
- })
- } else {
- grpclog.ErrorDepth(1, args...)
- }
-}
-
-// Errorf logs through grpclog.Errorf and adds a trace event if channelz is on.
-func Errorf(id int64, format string, args ...interface{}) {
- msg := fmt.Sprintf(format, args...)
- if IsOn() {
- AddTraceEvent(id, 1, &TraceEventDesc{
- Desc: msg,
- Severity: CtError,
- })
- } else {
- grpclog.ErrorDepth(1, msg)
- }
-}
diff --git a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
index ae6c897..3ee8740 100644
--- a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
+++ b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
@@ -25,14 +25,11 @@
)
const (
- prefix = "GRPC_GO_"
- retryStr = prefix + "RETRY"
- txtErrIgnoreStr = prefix + "IGNORE_TXT_ERRORS"
+ prefix = "GRPC_GO_"
+ retryStr = prefix + "RETRY"
)
var (
// Retry is set if retry is explicitly enabled via "GRPC_GO_RETRY=on".
Retry = strings.EqualFold(os.Getenv(retryStr), "on")
- // TXTErrIgnore is set if TXT errors should be ignored ("GRPC_GO_IGNORE_TXT_ERRORS" is not "false").
- TXTErrIgnore = !strings.EqualFold(os.Getenv(retryStr), "false")
)
diff --git a/vendor/google.golang.org/grpc/internal/grpclog/grpclog.go b/vendor/google.golang.org/grpc/internal/grpclog/grpclog.go
deleted file mode 100644
index 8c8e19f..0000000
--- a/vendor/google.golang.org/grpc/internal/grpclog/grpclog.go
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- *
- * Copyright 2020 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-// Package grpclog (internal) defines depth logging for grpc.
-package grpclog
-
-// Logger is the logger used for the non-depth log functions.
-var Logger LoggerV2
-
-// DepthLogger is the logger used for the depth log functions.
-var DepthLogger DepthLoggerV2
-
-// InfoDepth logs to the INFO log at the specified depth.
-func InfoDepth(depth int, args ...interface{}) {
- if DepthLogger != nil {
- DepthLogger.InfoDepth(depth, args...)
- } else {
- Logger.Info(args...)
- }
-}
-
-// WarningDepth logs to the WARNING log at the specified depth.
-func WarningDepth(depth int, args ...interface{}) {
- if DepthLogger != nil {
- DepthLogger.WarningDepth(depth, args...)
- } else {
- Logger.Warning(args...)
- }
-}
-
-// ErrorDepth logs to the ERROR log at the specified depth.
-func ErrorDepth(depth int, args ...interface{}) {
- if DepthLogger != nil {
- DepthLogger.ErrorDepth(depth, args...)
- } else {
- Logger.Error(args...)
- }
-}
-
-// FatalDepth logs to the FATAL log at the specified depth.
-func FatalDepth(depth int, args ...interface{}) {
- if DepthLogger != nil {
- DepthLogger.FatalDepth(depth, args...)
- } else {
- Logger.Fatal(args...)
- }
-}
-
-// LoggerV2 does underlying logging work for grpclog.
-// This is a copy of the LoggerV2 defined in the external grpclog package. It
-// is defined here to avoid a circular dependency.
-type LoggerV2 interface {
- // Info logs to INFO log. Arguments are handled in the manner of fmt.Print.
- Info(args ...interface{})
- // Infoln logs to INFO log. Arguments are handled in the manner of fmt.Println.
- Infoln(args ...interface{})
- // Infof logs to INFO log. Arguments are handled in the manner of fmt.Printf.
- Infof(format string, args ...interface{})
- // Warning logs to WARNING log. Arguments are handled in the manner of fmt.Print.
- Warning(args ...interface{})
- // Warningln logs to WARNING log. Arguments are handled in the manner of fmt.Println.
- Warningln(args ...interface{})
- // Warningf logs to WARNING log. Arguments are handled in the manner of fmt.Printf.
- Warningf(format string, args ...interface{})
- // Error logs to ERROR log. Arguments are handled in the manner of fmt.Print.
- Error(args ...interface{})
- // Errorln logs to ERROR log. Arguments are handled in the manner of fmt.Println.
- Errorln(args ...interface{})
- // Errorf logs to ERROR log. Arguments are handled in the manner of fmt.Printf.
- Errorf(format string, args ...interface{})
- // Fatal logs to ERROR log. Arguments are handled in the manner of fmt.Print.
- // gRPC ensures that all Fatal logs will exit with os.Exit(1).
- // Implementations may also call os.Exit() with a non-zero exit code.
- Fatal(args ...interface{})
- // Fatalln logs to ERROR log. Arguments are handled in the manner of fmt.Println.
- // gRPC ensures that all Fatal logs will exit with os.Exit(1).
- // Implementations may also call os.Exit() with a non-zero exit code.
- Fatalln(args ...interface{})
- // Fatalf logs to ERROR log. Arguments are handled in the manner of fmt.Printf.
- // gRPC ensures that all Fatal logs will exit with os.Exit(1).
- // Implementations may also call os.Exit() with a non-zero exit code.
- Fatalf(format string, args ...interface{})
- // V reports whether verbosity level l is at least the requested verbose level.
- V(l int) bool
-}
-
-// DepthLoggerV2 logs at a specified call frame. If a LoggerV2 also implements
-// DepthLoggerV2, the below functions will be called with the appropriate stack
-// depth set for trivial functions the logger may ignore.
-// This is a copy of the DepthLoggerV2 defined in the external grpclog package.
-// It is defined here to avoid a circular dependency.
-//
-// This API is EXPERIMENTAL.
-type DepthLoggerV2 interface {
- // InfoDepth logs to INFO log at the specified depth. Arguments are handled in the manner of fmt.Print.
- InfoDepth(depth int, args ...interface{})
- // WarningDepth logs to WARNING log at the specified depth. Arguments are handled in the manner of fmt.Print.
- WarningDepth(depth int, args ...interface{})
- // ErrorDetph logs to ERROR log at the specified depth. Arguments are handled in the manner of fmt.Print.
- ErrorDepth(depth int, args ...interface{})
- // FatalDepth logs to FATAL log at the specified depth. Arguments are handled in the manner of fmt.Print.
- FatalDepth(depth int, args ...interface{})
-}
diff --git a/vendor/google.golang.org/grpc/internal/grpclog/prefixLogger.go b/vendor/google.golang.org/grpc/internal/grpclog/prefixLogger.go
deleted file mode 100644
index f6e0dc1..0000000
--- a/vendor/google.golang.org/grpc/internal/grpclog/prefixLogger.go
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- * Copyright 2020 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package grpclog
-
-// PrefixLogger does logging with a prefix.
-//
-// Logging method on a nil logs without any prefix.
-type PrefixLogger struct {
- prefix string
-}
-
-// Infof does info logging.
-func (pl *PrefixLogger) Infof(format string, args ...interface{}) {
- if pl != nil {
- // Handle nil, so the tests can pass in a nil logger.
- format = pl.prefix + format
- }
- Logger.Infof(format, args...)
-}
-
-// Warningf does warning logging.
-func (pl *PrefixLogger) Warningf(format string, args ...interface{}) {
- if pl != nil {
- format = pl.prefix + format
- }
- Logger.Warningf(format, args...)
-}
-
-// Errorf does error logging.
-func (pl *PrefixLogger) Errorf(format string, args ...interface{}) {
- if pl != nil {
- format = pl.prefix + format
- }
- Logger.Errorf(format, args...)
-}
-
-// Debugf does info logging at verbose level 2.
-func (pl *PrefixLogger) Debugf(format string, args ...interface{}) {
- if Logger.V(2) {
- pl.Infof(format, args...)
- }
-}
-
-// NewPrefixLogger creates a prefix logger with the given prefix.
-func NewPrefixLogger(prefix string) *PrefixLogger {
- return &PrefixLogger{prefix: prefix}
-}
diff --git a/vendor/google.golang.org/grpc/internal/grpcutil/target.go b/vendor/google.golang.org/grpc/internal/grpcutil/target.go
deleted file mode 100644
index 80b33cd..0000000
--- a/vendor/google.golang.org/grpc/internal/grpcutil/target.go
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- *
- * Copyright 2020 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-// Package grpcutil provides a bunch of utility functions to be used across the
-// gRPC codebase.
-package grpcutil
-
-import (
- "strings"
-
- "google.golang.org/grpc/resolver"
-)
-
-// split2 returns the values from strings.SplitN(s, sep, 2).
-// If sep is not found, it returns ("", "", false) instead.
-func split2(s, sep string) (string, string, bool) {
- spl := strings.SplitN(s, sep, 2)
- if len(spl) < 2 {
- return "", "", false
- }
- return spl[0], spl[1], true
-}
-
-// ParseTarget splits target into a resolver.Target struct containing scheme,
-// authority and endpoint.
-//
-// If target is not a valid scheme://authority/endpoint, it returns {Endpoint:
-// target}.
-func ParseTarget(target string) (ret resolver.Target) {
- var ok bool
- ret.Scheme, ret.Endpoint, ok = split2(target, "://")
- if !ok {
- return resolver.Target{Endpoint: target}
- }
- ret.Authority, ret.Endpoint, ok = split2(ret.Endpoint, "/")
- if !ok {
- return resolver.Target{Endpoint: target}
- }
- return ret
-}
diff --git a/vendor/google.golang.org/grpc/internal/internal.go b/vendor/google.golang.org/grpc/internal/internal.go
index c6fbe8b..b96b359 100644
--- a/vendor/google.golang.org/grpc/internal/internal.go
+++ b/vendor/google.golang.org/grpc/internal/internal.go
@@ -28,6 +28,8 @@
)
var (
+ // WithResolverBuilder is set by dialoptions.go
+ WithResolverBuilder interface{} // func (resolver.Builder) grpc.DialOption
// WithHealthCheckFunc is set by dialoptions.go
WithHealthCheckFunc interface{} // func (HealthChecker) DialOption
// HealthCheckFunc is used to provide client-side LB channel health checking
@@ -37,6 +39,11 @@
// KeepaliveMinPingTime is the minimum ping interval. This must be 10s by
// default, but tests may wish to set it lower for convenience.
KeepaliveMinPingTime = 10 * time.Second
+ // StatusRawProto is exported by status/status.go. This func returns a
+ // pointer to the wrapped Status proto for a given status.Status without a
+ // call to proto.Clone(). The returned Status proto should not be mutated by
+ // the caller.
+ StatusRawProto interface{} // func (*status.Status) *spb.Status
// NewRequestInfoContext creates a new context based on the argument context attaching
// the passed in RequestInfo to the new context.
NewRequestInfoContext interface{} // func(context.Context, credentials.RequestInfo) context.Context
@@ -53,7 +60,7 @@
//
// The health checking protocol is defined at:
// https://github.com/grpc/grpc/blob/master/doc/health-checking.md
-type HealthChecker func(ctx context.Context, newStream func(string) (interface{}, error), setConnectivityState func(connectivity.State, error), serviceName string) error
+type HealthChecker func(ctx context.Context, newStream func(string) (interface{}, error), setConnectivityState func(connectivity.State), serviceName string) error
const (
// CredsBundleModeFallback switches GoogleDefaultCreds to fallback mode.
diff --git a/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
index c368db6..abc0f92 100644
--- a/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
+++ b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
@@ -32,23 +32,20 @@
"sync"
"time"
+ "google.golang.org/grpc/backoff"
"google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/internal/envconfig"
+ internalbackoff "google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/resolver"
- "google.golang.org/grpc/serviceconfig"
)
-// EnableSRVLookups controls whether the DNS resolver attempts to fetch gRPCLB
-// addresses from SRV records. Must not be changed after init time.
-var EnableSRVLookups = false
-
func init() {
resolver.Register(NewBuilder())
}
const (
defaultPort = "443"
+ defaultFreq = time.Minute * 30
defaultDNSSvrPort = "53"
golang = "GO"
// txtPrefix is the prefix string to be prepended to the host name for txt record lookup.
@@ -98,33 +95,49 @@
// NewBuilder creates a dnsBuilder which is used to factory DNS resolvers.
func NewBuilder() resolver.Builder {
- return &dnsBuilder{}
+ return &dnsBuilder{minFreq: defaultFreq}
}
-type dnsBuilder struct{}
+type dnsBuilder struct {
+ // minimum frequency of polling the DNS server.
+ minFreq time.Duration
+}
// Build creates and starts a DNS resolver that watches the name resolution of the target.
-func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
+func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
host, port, err := parseTarget(target.Endpoint, defaultPort)
if err != nil {
return nil, err
}
// IP address.
- if ipAddr, ok := formatIP(host); ok {
- addr := []resolver.Address{{Addr: ipAddr + ":" + port}}
- cc.UpdateState(resolver.State{Addresses: addr})
- return deadResolver{}, nil
+ if net.ParseIP(host) != nil {
+ host, _ = formatIP(host)
+ addr := []resolver.Address{{Addr: host + ":" + port}}
+ i := &ipResolver{
+ cc: cc,
+ ip: addr,
+ rn: make(chan struct{}, 1),
+ q: make(chan struct{}),
+ }
+ cc.NewAddress(addr)
+ go i.watcher()
+ return i, nil
}
// DNS address (non-IP).
ctx, cancel := context.WithCancel(context.Background())
+ bc := backoff.DefaultConfig
+ bc.MaxDelay = b.minFreq
d := &dnsResolver{
+ freq: b.minFreq,
+ backoff: internalbackoff.Exponential{Config: bc},
host: host,
port: port,
ctx: ctx,
cancel: cancel,
cc: cc,
+ t: time.NewTimer(0),
rn: make(chan struct{}, 1),
disableServiceConfig: opts.DisableServiceConfig,
}
@@ -140,7 +153,6 @@
d.wg.Add(1)
go d.watcher()
- d.ResolveNow(resolver.ResolveNowOptions{})
return d, nil
}
@@ -155,23 +167,53 @@
LookupTXT(ctx context.Context, name string) (txts []string, err error)
}
-// deadResolver is a resolver that does nothing.
-type deadResolver struct{}
+// ipResolver watches for the name resolution update for an IP address.
+type ipResolver struct {
+ cc resolver.ClientConn
+ ip []resolver.Address
+ // rn channel is used by ResolveNow() to force an immediate resolution of the target.
+ rn chan struct{}
+ q chan struct{}
+}
-func (deadResolver) ResolveNow(resolver.ResolveNowOptions) {}
+// ResolveNow resend the address it stores, no resolution is needed.
+func (i *ipResolver) ResolveNow(opt resolver.ResolveNowOption) {
+ select {
+ case i.rn <- struct{}{}:
+ default:
+ }
+}
-func (deadResolver) Close() {}
+// Close closes the ipResolver.
+func (i *ipResolver) Close() {
+ close(i.q)
+}
+
+func (i *ipResolver) watcher() {
+ for {
+ select {
+ case <-i.rn:
+ i.cc.NewAddress(i.ip)
+ case <-i.q:
+ return
+ }
+ }
+}
// dnsResolver watches for the name resolution update for a non-IP target.
type dnsResolver struct {
- host string
- port string
- resolver netResolver
- ctx context.Context
- cancel context.CancelFunc
- cc resolver.ClientConn
+ freq time.Duration
+ backoff internalbackoff.Exponential
+ retryCount int
+ host string
+ port string
+ resolver netResolver
+ ctx context.Context
+ cancel context.CancelFunc
+ cc resolver.ClientConn
// rn channel is used by ResolveNow() to force an immediate resolution of the target.
rn chan struct{}
+ t *time.Timer
// wg is used to enforce Close() to return after the watcher() goroutine has finished.
// Otherwise, data race will be possible. [Race Example] in dns_resolver_test we
// replace the real lookup functions with mocked ones to facilitate testing.
@@ -183,7 +225,7 @@
}
// ResolveNow invoke an immediate resolution of the target that this dnsResolver watches.
-func (d *dnsResolver) ResolveNow(resolver.ResolveNowOptions) {
+func (d *dnsResolver) ResolveNow(opt resolver.ResolveNowOption) {
select {
case d.rn <- struct{}{}:
default:
@@ -194,6 +236,7 @@
func (d *dnsResolver) Close() {
d.cancel()
d.wg.Wait()
+ d.t.Stop()
}
func (d *dnsResolver) watcher() {
@@ -202,15 +245,27 @@
select {
case <-d.ctx.Done():
return
+ case <-d.t.C:
case <-d.rn:
+ if !d.t.Stop() {
+ // Before resetting a timer, it should be stopped to prevent racing with
+ // reads on it's channel.
+ <-d.t.C
+ }
}
- state, err := d.lookup()
- if err != nil {
- d.cc.ReportError(err)
+ result, sc := d.lookup()
+ // Next lookup should happen within an interval defined by d.freq. It may be
+ // more often due to exponential retry on empty address list.
+ if len(result) == 0 {
+ d.retryCount++
+ d.t.Reset(d.backoff.Backoff(d.retryCount))
} else {
- d.cc.UpdateState(*state)
+ d.retryCount = 0
+ d.t.Reset(d.freq)
}
+ d.cc.NewServiceConfig(sc)
+ d.cc.NewAddress(result)
// Sleep to prevent excessive re-resolutions. Incoming resolution requests
// will be queued in d.rn.
@@ -224,68 +279,37 @@
}
}
-func (d *dnsResolver) lookupSRV() ([]resolver.Address, error) {
- if !EnableSRVLookups {
- return nil, nil
- }
+func (d *dnsResolver) lookupSRV() []resolver.Address {
var newAddrs []resolver.Address
_, srvs, err := d.resolver.LookupSRV(d.ctx, "grpclb", "tcp", d.host)
if err != nil {
- err = handleDNSError(err, "SRV") // may become nil
- return nil, err
+ grpclog.Infof("grpc: failed dns SRV record lookup due to %v.\n", err)
+ return nil
}
for _, s := range srvs {
lbAddrs, err := d.resolver.LookupHost(d.ctx, s.Target)
if err != nil {
- err = handleDNSError(err, "A") // may become nil
- if err == nil {
- // If there are other SRV records, look them up and ignore this
- // one that does not exist.
- continue
- }
- return nil, err
+ grpclog.Infof("grpc: failed load balancer address dns lookup due to %v.\n", err)
+ continue
}
for _, a := range lbAddrs {
- ip, ok := formatIP(a)
+ a, ok := formatIP(a)
if !ok {
- return nil, fmt.Errorf("dns: error parsing A record IP address %v", a)
+ grpclog.Errorf("grpc: failed IP parsing due to %v.\n", err)
+ continue
}
- addr := ip + ":" + strconv.Itoa(int(s.Port))
+ addr := a + ":" + strconv.Itoa(int(s.Port))
newAddrs = append(newAddrs, resolver.Address{Addr: addr, Type: resolver.GRPCLB, ServerName: s.Target})
}
}
- return newAddrs, nil
+ return newAddrs
}
-var filterError = func(err error) error {
- if dnsErr, ok := err.(*net.DNSError); ok && !dnsErr.IsTimeout && !dnsErr.IsTemporary {
- // Timeouts and temporary errors should be communicated to gRPC to
- // attempt another DNS query (with backoff). Other errors should be
- // suppressed (they may represent the absence of a TXT record).
- return nil
- }
- return err
-}
-
-func handleDNSError(err error, lookupType string) error {
- err = filterError(err)
- if err != nil {
- err = fmt.Errorf("dns: %v record lookup error: %v", lookupType, err)
- grpclog.Infoln(err)
- }
- return err
-}
-
-func (d *dnsResolver) lookupTXT() *serviceconfig.ParseResult {
+func (d *dnsResolver) lookupTXT() string {
ss, err := d.resolver.LookupTXT(d.ctx, txtPrefix+d.host)
if err != nil {
- if envconfig.TXTErrIgnore {
- return nil
- }
- if err = handleDNSError(err, "TXT"); err != nil {
- return &serviceconfig.ParseResult{Err: err}
- }
- return nil
+ grpclog.Infof("grpc: failed dns TXT record lookup due to %v.\n", err)
+ return ""
}
var res string
for _, s := range ss {
@@ -294,45 +318,40 @@
// TXT record must have "grpc_config=" attribute in order to be used as service config.
if !strings.HasPrefix(res, txtAttribute) {
- grpclog.Warningf("dns: TXT record %v missing %v attribute", res, txtAttribute)
- // This is not an error; it is the equivalent of not having a service config.
- return nil
+ grpclog.Warningf("grpc: TXT record %v missing %v attribute", res, txtAttribute)
+ return ""
}
- sc := canaryingSC(strings.TrimPrefix(res, txtAttribute))
- return d.cc.ParseServiceConfig(sc)
+ return strings.TrimPrefix(res, txtAttribute)
}
-func (d *dnsResolver) lookupHost() ([]resolver.Address, error) {
+func (d *dnsResolver) lookupHost() []resolver.Address {
var newAddrs []resolver.Address
addrs, err := d.resolver.LookupHost(d.ctx, d.host)
if err != nil {
- err = handleDNSError(err, "A")
- return nil, err
+ grpclog.Warningf("grpc: failed dns A record lookup due to %v.\n", err)
+ return nil
}
for _, a := range addrs {
- ip, ok := formatIP(a)
+ a, ok := formatIP(a)
if !ok {
- return nil, fmt.Errorf("dns: error parsing A record IP address %v", a)
+ grpclog.Errorf("grpc: failed IP parsing due to %v.\n", err)
+ continue
}
- addr := ip + ":" + d.port
+ addr := a + ":" + d.port
newAddrs = append(newAddrs, resolver.Address{Addr: addr})
}
- return newAddrs, nil
+ return newAddrs
}
-func (d *dnsResolver) lookup() (*resolver.State, error) {
- srv, srvErr := d.lookupSRV()
- addrs, hostErr := d.lookupHost()
- if hostErr != nil && (srvErr != nil || len(srv) == 0) {
- return nil, hostErr
+func (d *dnsResolver) lookup() ([]resolver.Address, string) {
+ newAddrs := d.lookupSRV()
+ // Support fallback to non-balancer address.
+ newAddrs = append(newAddrs, d.lookupHost()...)
+ if d.disableServiceConfig {
+ return newAddrs, ""
}
- state := &resolver.State{
- Addresses: append(addrs, srv...),
- }
- if !d.disableServiceConfig {
- state.ServiceConfig = d.lookupTXT()
- }
- return state, nil
+ sc := d.lookupTXT()
+ return newAddrs, canaryingSC(sc)
}
// formatIP returns ok = false if addr is not a valid textual representation of an IP address.
@@ -418,12 +437,12 @@
var rcs []rawChoice
err := json.Unmarshal([]byte(js), &rcs)
if err != nil {
- grpclog.Warningf("dns: error parsing service config json: %v", err)
+ grpclog.Warningf("grpc: failed to parse service config json string due to %v.\n", err)
return ""
}
cliHostname, err := os.Hostname()
if err != nil {
- grpclog.Warningf("dns: error getting client hostname: %v", err)
+ grpclog.Warningf("grpc: failed to get client hostname due to %v.\n", err)
return ""
}
var sc string
diff --git a/vendor/google.golang.org/grpc/internal/resolver/dns/go113.go b/vendor/google.golang.org/grpc/internal/resolver/dns/go113.go
deleted file mode 100644
index 8783a8c..0000000
--- a/vendor/google.golang.org/grpc/internal/resolver/dns/go113.go
+++ /dev/null
@@ -1,33 +0,0 @@
-// +build go1.13
-
-/*
- *
- * Copyright 2019 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package dns
-
-import "net"
-
-func init() {
- filterError = func(err error) error {
- if dnsErr, ok := err.(*net.DNSError); ok && dnsErr.IsNotFound {
- // The name does not exist; not an error.
- return nil
- }
- return err
- }
-}
diff --git a/vendor/google.golang.org/grpc/internal/resolver/passthrough/passthrough.go b/vendor/google.golang.org/grpc/internal/resolver/passthrough/passthrough.go
index 520d922..893d5d1 100644
--- a/vendor/google.golang.org/grpc/internal/resolver/passthrough/passthrough.go
+++ b/vendor/google.golang.org/grpc/internal/resolver/passthrough/passthrough.go
@@ -26,7 +26,7 @@
type passthroughBuilder struct{}
-func (*passthroughBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
+func (*passthroughBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
r := &passthroughResolver{
target: target,
cc: cc,
@@ -48,7 +48,7 @@
r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: r.target.Endpoint}}})
}
-func (*passthroughResolver) ResolveNow(o resolver.ResolveNowOptions) {}
+func (*passthroughResolver) ResolveNow(o resolver.ResolveNowOption) {}
func (*passthroughResolver) Close() {}
diff --git a/vendor/google.golang.org/grpc/internal/status/status.go b/vendor/google.golang.org/grpc/internal/status/status.go
deleted file mode 100644
index 6812606..0000000
--- a/vendor/google.golang.org/grpc/internal/status/status.go
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- *
- * Copyright 2020 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-// Package status implements errors returned by gRPC. These errors are
-// serialized and transmitted on the wire between server and client, and allow
-// for additional data to be transmitted via the Details field in the status
-// proto. gRPC service handlers should return an error created by this
-// package, and gRPC clients should expect a corresponding error to be
-// returned from the RPC call.
-//
-// This package upholds the invariants that a non-nil error may not
-// contain an OK code, and an OK code must result in a nil error.
-package status
-
-import (
- "errors"
- "fmt"
-
- "github.com/golang/protobuf/proto"
- "github.com/golang/protobuf/ptypes"
- spb "google.golang.org/genproto/googleapis/rpc/status"
- "google.golang.org/grpc/codes"
-)
-
-// Status represents an RPC status code, message, and details. It is immutable
-// and should be created with New, Newf, or FromProto.
-type Status struct {
- s *spb.Status
-}
-
-// New returns a Status representing c and msg.
-func New(c codes.Code, msg string) *Status {
- return &Status{s: &spb.Status{Code: int32(c), Message: msg}}
-}
-
-// Newf returns New(c, fmt.Sprintf(format, a...)).
-func Newf(c codes.Code, format string, a ...interface{}) *Status {
- return New(c, fmt.Sprintf(format, a...))
-}
-
-// FromProto returns a Status representing s.
-func FromProto(s *spb.Status) *Status {
- return &Status{s: proto.Clone(s).(*spb.Status)}
-}
-
-// Err returns an error representing c and msg. If c is OK, returns nil.
-func Err(c codes.Code, msg string) error {
- return New(c, msg).Err()
-}
-
-// Errorf returns Error(c, fmt.Sprintf(format, a...)).
-func Errorf(c codes.Code, format string, a ...interface{}) error {
- return Err(c, fmt.Sprintf(format, a...))
-}
-
-// Code returns the status code contained in s.
-func (s *Status) Code() codes.Code {
- if s == nil || s.s == nil {
- return codes.OK
- }
- return codes.Code(s.s.Code)
-}
-
-// Message returns the message contained in s.
-func (s *Status) Message() string {
- if s == nil || s.s == nil {
- return ""
- }
- return s.s.Message
-}
-
-// Proto returns s's status as an spb.Status proto message.
-func (s *Status) Proto() *spb.Status {
- if s == nil {
- return nil
- }
- return proto.Clone(s.s).(*spb.Status)
-}
-
-// Err returns an immutable error representing s; returns nil if s.Code() is OK.
-func (s *Status) Err() error {
- if s.Code() == codes.OK {
- return nil
- }
- return (*Error)(s.Proto())
-}
-
-// WithDetails returns a new status with the provided details messages appended to the status.
-// If any errors are encountered, it returns nil and the first error encountered.
-func (s *Status) WithDetails(details ...proto.Message) (*Status, error) {
- if s.Code() == codes.OK {
- return nil, errors.New("no error details for status with code OK")
- }
- // s.Code() != OK implies that s.Proto() != nil.
- p := s.Proto()
- for _, detail := range details {
- any, err := ptypes.MarshalAny(detail)
- if err != nil {
- return nil, err
- }
- p.Details = append(p.Details, any)
- }
- return &Status{s: p}, nil
-}
-
-// Details returns a slice of details messages attached to the status.
-// If a detail cannot be decoded, the error is returned in place of the detail.
-func (s *Status) Details() []interface{} {
- if s == nil || s.s == nil {
- return nil
- }
- details := make([]interface{}, 0, len(s.s.Details))
- for _, any := range s.s.Details {
- detail := &ptypes.DynamicAny{}
- if err := ptypes.UnmarshalAny(any, detail); err != nil {
- details = append(details, err)
- continue
- }
- details = append(details, detail.Message)
- }
- return details
-}
-
-// Error is an alias of a status proto. It implements error and Status,
-// and a nil Error should never be returned by this package.
-type Error spb.Status
-
-func (se *Error) Error() string {
- p := (*spb.Status)(se)
- return fmt.Sprintf("rpc error: code = %s desc = %s", codes.Code(p.GetCode()), p.GetMessage())
-}
-
-// GRPCStatus returns the Status represented by se.
-func (se *Error) GRPCStatus() *Status {
- return FromProto((*spb.Status)(se))
-}
-
-// Is implements future error.Is functionality.
-// A Error is equivalent if the code and message are identical.
-func (se *Error) Is(target error) bool {
- tse, ok := target.(*Error)
- if !ok {
- return false
- }
- return proto.Equal((*spb.Status)(se), (*spb.Status)(tse))
-}
diff --git a/vendor/google.golang.org/grpc/internal/transport/handler_server.go b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
index fc44e97..78f9ddc 100644
--- a/vendor/google.golang.org/grpc/internal/transport/handler_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
@@ -112,10 +112,11 @@
// at this point to be speaking over HTTP/2, so it's able to speak valid
// gRPC.
type serverHandlerTransport struct {
- rw http.ResponseWriter
- req *http.Request
- timeoutSet bool
- timeout time.Duration
+ rw http.ResponseWriter
+ req *http.Request
+ timeoutSet bool
+ timeout time.Duration
+ didCommonHeaders bool
headerMD metadata.MD
@@ -185,11 +186,8 @@
ht.writeStatusMu.Lock()
defer ht.writeStatusMu.Unlock()
- headersWritten := s.updateHeaderSent()
err := ht.do(func() {
- if !headersWritten {
- ht.writePendingHeaders(s)
- }
+ ht.writeCommonHeaders(s)
// And flush, in case no header or body has been sent yet.
// This forces a separation of headers and trailers if this is the
@@ -229,27 +227,21 @@
if err == nil { // transport has not been closed
if ht.stats != nil {
- // Note: The trailer fields are compressed with hpack after this call returns.
- // No WireLength field is set here.
- ht.stats.HandleRPC(s.Context(), &stats.OutTrailer{
- Trailer: s.trailer.Copy(),
- })
+ ht.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
}
}
ht.Close()
return err
}
-// writePendingHeaders sets common and custom headers on the first
-// write call (Write, WriteHeader, or WriteStatus)
-func (ht *serverHandlerTransport) writePendingHeaders(s *Stream) {
- ht.writeCommonHeaders(s)
- ht.writeCustomHeaders(s)
-}
-
// writeCommonHeaders sets common headers on the first write
// call (Write, WriteHeader, or WriteStatus).
func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) {
+ if ht.didCommonHeaders {
+ return
+ }
+ ht.didCommonHeaders = true
+
h := ht.rw.Header()
h["Date"] = nil // suppress Date to make tests happy; TODO: restore
h.Set("Content-Type", ht.contentType)
@@ -268,30 +260,9 @@
}
}
-// writeCustomHeaders sets custom headers set on the stream via SetHeader
-// on the first write call (Write, WriteHeader, or WriteStatus).
-func (ht *serverHandlerTransport) writeCustomHeaders(s *Stream) {
- h := ht.rw.Header()
-
- s.hdrMu.Lock()
- for k, vv := range s.header {
- if isReservedHeader(k) {
- continue
- }
- for _, v := range vv {
- h.Add(k, encodeMetadataHeader(k, v))
- }
- }
-
- s.hdrMu.Unlock()
-}
-
func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
- headersWritten := s.updateHeaderSent()
return ht.do(func() {
- if !headersWritten {
- ht.writePendingHeaders(s)
- }
+ ht.writeCommonHeaders(s)
ht.rw.Write(hdr)
ht.rw.Write(data)
ht.rw.(http.Flusher).Flush()
@@ -299,28 +270,26 @@
}
func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
- if err := s.SetHeader(md); err != nil {
- return err
- }
-
- headersWritten := s.updateHeaderSent()
err := ht.do(func() {
- if !headersWritten {
- ht.writePendingHeaders(s)
+ ht.writeCommonHeaders(s)
+ h := ht.rw.Header()
+ for k, vv := range md {
+ // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
+ if isReservedHeader(k) {
+ continue
+ }
+ for _, v := range vv {
+ v = encodeMetadataHeader(k, v)
+ h.Add(k, v)
+ }
}
-
ht.rw.WriteHeader(200)
ht.rw.(http.Flusher).Flush()
})
if err == nil {
if ht.stats != nil {
- // Note: The header fields are compressed with hpack after this call returns.
- // No WireLength field is set here.
- ht.stats.HandleRPC(s.Context(), &stats.OutHeader{
- Header: md.Copy(),
- Compression: s.sendCompress,
- })
+ ht.stats.HandleRPC(s.Context(), &stats.OutHeader{})
}
}
return err
@@ -365,7 +334,7 @@
Addr: ht.RemoteAddr(),
}
if req.TLS != nil {
- pr.AuthInfo = credentials.TLSInfo{State: *req.TLS, CommonAuthInfo: credentials.CommonAuthInfo{SecurityLevel: credentials.PrivacyAndIntegrity}}
+ pr.AuthInfo = credentials.TLSInfo{State: *req.TLS}
}
ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
s.ctx = peer.NewContext(ctx, pr)
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
index 1cc586f..294661a 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
@@ -45,14 +45,9 @@
"google.golang.org/grpc/status"
)
-// clientConnectionCounter counts the number of connections a client has
-// initiated (equal to the number of http2Clients created). Must be accessed
-// atomically.
-var clientConnectionCounter uint64
-
// http2Client implements the ClientTransport interface with HTTP2.
type http2Client struct {
- lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
+ lastRead int64 // keep this field 64-bit aligned
ctx context.Context
cancel context.CancelFunc
ctxDone <-chan struct{} // Cache the ctx.Done() chan.
@@ -131,8 +126,6 @@
onClose func()
bufferPool *bufferPool
-
- connectionID uint64
}
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
@@ -336,8 +329,6 @@
}
}
- t.connectionID = atomic.AddUint64(&clientConnectionCounter, 1)
-
if err := t.framer.writer.Flush(); err != nil {
return nil, err
}
@@ -403,8 +394,7 @@
func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
aud := t.createAudience(callHdr)
ri := credentials.RequestInfo{
- Method: callHdr.Method,
- AuthInfo: t.authInfo,
+ Method: callHdr.Method,
}
ctxWithRequestInfo := internal.NewRequestInfoContext.(func(context.Context, credentials.RequestInfo) context.Context)(ctx, ri)
authData, err := t.getTrAuthData(ctxWithRequestInfo, aud)
@@ -434,7 +424,6 @@
if callHdr.SendCompress != "" {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
- headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-accept-encoding", Value: callHdr.SendCompress})
}
if dl, ok := ctx.Deadline(); ok {
// Send out timeout regardless its value. The server can detect timeout context by itself.
@@ -680,21 +669,12 @@
}
}
if t.statsHandler != nil {
- header, ok := metadata.FromOutgoingContext(ctx)
- if ok {
- header.Set("user-agent", t.userAgent)
- } else {
- header = metadata.Pairs("user-agent", t.userAgent)
- }
- // Note: The header fields are compressed with hpack after this call returns.
- // No WireLength field is set here.
outHeader := &stats.OutHeader{
Client: true,
FullMethod: callHdr.Method,
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
Compression: callHdr.SendCompress,
- Header: header,
}
t.statsHandler.HandleRPC(s.ctx, outHeader)
}
@@ -1195,17 +1175,14 @@
if t.statsHandler != nil {
if isHeader {
inHeader := &stats.InHeader{
- Client: true,
- WireLength: int(frame.Header().Length),
- Header: s.header.Copy(),
- Compression: s.recvCompress,
+ Client: true,
+ WireLength: int(frame.Header().Length),
}
t.statsHandler.HandleRPC(s.ctx, inHeader)
} else {
inTrailer := &stats.InTrailer{
Client: true,
WireLength: int(frame.Header().Length),
- Trailer: s.trailer.Copy(),
}
t.statsHandler.HandleRPC(s.ctx, inTrailer)
}
@@ -1392,6 +1369,7 @@
// acked).
sleepDuration := minTime(t.kp.Time, timeoutLeft)
timeoutLeft -= sleepDuration
+ prevNano = lastRead
timer.Reset(sleepDuration)
case <-t.ctx.Done():
if !timer.Stop() {
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
index fa33ffb..0760383 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
@@ -35,9 +35,11 @@
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
+ spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/keepalive"
@@ -55,15 +57,13 @@
// ErrHeaderListSizeLimitViolation indicates that the header list size is larger
// than the limit set by peer.
ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
+ // statusRawProto is a function to get to the raw status proto wrapped in a
+ // status.Status without a proto.Clone().
+ statusRawProto = internal.StatusRawProto.(func(*status.Status) *spb.Status)
)
-// serverConnectionCounter counts the number of connections a server has seen
-// (equal to the number of http2Servers created). Must be accessed atomically.
-var serverConnectionCounter uint64
-
// http2Server implements the ServerTransport interface with HTTP2.
type http2Server struct {
- lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
ctx context.Context
done chan struct{}
conn net.Conn
@@ -83,8 +83,12 @@
controlBuf *controlBuffer
fc *trInFlow
stats stats.Handler
+ // Flag to keep track of reading activity on transport.
+ // 1 is true and 0 is false.
+ activity uint32 // Accessed atomically.
// Keepalive and max-age parameters for the server.
kp keepalive.ServerParameters
+
// Keepalive enforcement policy.
kep keepalive.EnforcementPolicy
// The time instance last ping was received.
@@ -120,8 +124,6 @@
channelzID int64 // channelz unique identification number
czData *channelzData
bufferPool *bufferPool
-
- connectionID uint64
}
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
@@ -251,9 +253,6 @@
if channelz.IsOn() {
t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
}
-
- t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
-
t.framer.writer.Flush()
defer func() {
@@ -278,7 +277,7 @@
if err != nil {
return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
}
- atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
+ atomic.StoreUint32(&t.activity, 1)
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
@@ -417,7 +416,6 @@
LocalAddr: t.localAddr,
Compression: s.recvCompress,
WireLength: int(frame.Header().Length),
- Header: metadata.MD(state.data.mdata).Copy(),
}
t.stats.HandleRPC(s.ctx, inHeader)
}
@@ -451,7 +449,7 @@
for {
t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame()
- atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
+ atomic.StoreUint32(&t.activity, 1)
if err != nil {
if se, ok := err.(http2.StreamError); ok {
warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
@@ -808,12 +806,9 @@
return ErrHeaderListSizeLimitViolation
}
if t.stats != nil {
- // Note: Headers are compressed with hpack after this call returns.
- // No WireLength field is set here.
- outHeader := &stats.OutHeader{
- Header: s.header.Copy(),
- Compression: s.sendCompress,
- }
+ // Note: WireLength is not set in outHeader.
+ // TODO(mmukhi): Revisit this later, if needed.
+ outHeader := &stats.OutHeader{}
t.stats.HandleRPC(s.Context(), outHeader)
}
return nil
@@ -845,7 +840,7 @@
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
- if p := st.Proto(); p != nil && len(p.Details) > 0 {
+ if p := statusRawProto(st); p != nil && len(p.Details) > 0 {
stBytes, err := proto.Marshal(p)
if err != nil {
// TODO: return error instead, when callers are able to handle it.
@@ -876,11 +871,7 @@
rst := s.getState() == streamActive
t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
if t.stats != nil {
- // Note: The trailer fields are compressed with hpack after this call returns.
- // No WireLength field is set here.
- t.stats.HandleRPC(s.Context(), &stats.OutTrailer{
- Trailer: s.trailer.Copy(),
- })
+ t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
}
return nil
}
@@ -941,35 +932,32 @@
// after an additional duration of keepalive.Timeout.
func (t *http2Server) keepalive() {
p := &ping{}
- // True iff a ping has been sent, and no data has been received since then.
- outstandingPing := false
- // Amount of time remaining before which we should receive an ACK for the
- // last sent ping.
- kpTimeoutLeft := time.Duration(0)
- // Records the last value of t.lastRead before we go block on the timer.
- // This is required to check for read activity since then.
- prevNano := time.Now().UnixNano()
- // Initialize the different timers to their default values.
- idleTimer := time.NewTimer(t.kp.MaxConnectionIdle)
- ageTimer := time.NewTimer(t.kp.MaxConnectionAge)
- kpTimer := time.NewTimer(t.kp.Time)
+ var pingSent bool
+ maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
+ maxAge := time.NewTimer(t.kp.MaxConnectionAge)
+ keepalive := time.NewTimer(t.kp.Time)
+ // NOTE: All exit paths of this function should reset their
+ // respective timers. A failure to do so will cause the
+ // following clean-up to deadlock and eventually leak.
defer func() {
- // We need to drain the underlying channel in these timers after a call
- // to Stop(), only if we are interested in resetting them. Clearly we
- // are not interested in resetting them here.
- idleTimer.Stop()
- ageTimer.Stop()
- kpTimer.Stop()
+ if !maxIdle.Stop() {
+ <-maxIdle.C
+ }
+ if !maxAge.Stop() {
+ <-maxAge.C
+ }
+ if !keepalive.Stop() {
+ <-keepalive.C
+ }
}()
-
for {
select {
- case <-idleTimer.C:
+ case <-maxIdle.C:
t.mu.Lock()
idle := t.idle
if idle.IsZero() { // The connection is non-idle.
t.mu.Unlock()
- idleTimer.Reset(t.kp.MaxConnectionIdle)
+ maxIdle.Reset(t.kp.MaxConnectionIdle)
continue
}
val := t.kp.MaxConnectionIdle - time.Since(idle)
@@ -978,51 +966,43 @@
// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
// Gracefully close the connection.
t.drain(http2.ErrCodeNo, []byte{})
+ // Resetting the timer so that the clean-up doesn't deadlock.
+ maxIdle.Reset(infinity)
return
}
- idleTimer.Reset(val)
- case <-ageTimer.C:
+ maxIdle.Reset(val)
+ case <-maxAge.C:
t.drain(http2.ErrCodeNo, []byte{})
- ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
+ maxAge.Reset(t.kp.MaxConnectionAgeGrace)
select {
- case <-ageTimer.C:
+ case <-maxAge.C:
// Close the connection after grace period.
infof("transport: closing server transport due to maximum connection age.")
t.Close()
+ // Resetting the timer so that the clean-up doesn't deadlock.
+ maxAge.Reset(infinity)
case <-t.done:
}
return
- case <-kpTimer.C:
- lastRead := atomic.LoadInt64(&t.lastRead)
- if lastRead > prevNano {
- // There has been read activity since the last time we were
- // here. Setup the timer to fire at kp.Time seconds from
- // lastRead time and continue.
- outstandingPing = false
- kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
- prevNano = lastRead
+ case <-keepalive.C:
+ if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
+ pingSent = false
+ keepalive.Reset(t.kp.Time)
continue
}
- if outstandingPing && kpTimeoutLeft <= 0 {
+ if pingSent {
infof("transport: closing server transport due to idleness.")
t.Close()
+ // Resetting the timer so that the clean-up doesn't deadlock.
+ keepalive.Reset(infinity)
return
}
- if !outstandingPing {
- if channelz.IsOn() {
- atomic.AddInt64(&t.czData.kpCount, 1)
- }
- t.controlBuf.put(p)
- kpTimeoutLeft = t.kp.Timeout
- outstandingPing = true
+ pingSent = true
+ if channelz.IsOn() {
+ atomic.AddInt64(&t.czData.kpCount, 1)
}
- // The amount of time to sleep here is the minimum of kp.Time and
- // timeoutLeft. This will ensure that we wait only for kp.Time
- // before sending out the next ping (for cases where the ping is
- // acked).
- sleepDuration := minTime(t.kp.Time, kpTimeoutLeft)
- kpTimeoutLeft -= sleepDuration
- kpTimer.Reset(sleepDuration)
+ t.controlBuf.put(p)
+ keepalive.Reset(t.kp.Timeout)
case <-t.done:
return
}
diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go
index a30da9e..bfab940 100644
--- a/vendor/google.golang.org/grpc/internal/transport/transport.go
+++ b/vendor/google.golang.org/grpc/internal/transport/transport.go
@@ -73,11 +73,10 @@
}
// recvBuffer is an unbounded channel of recvMsg structs.
-//
-// Note: recvBuffer differs from buffer.Unbounded only in the fact that it
-// holds a channel of recvMsg structs instead of objects implementing "item"
-// interface. recvBuffer is written to much more often and using strict recvMsg
-// structs helps avoid allocation in "recvBuffer.put"
+// Note recvBuffer differs from controlBuffer only in that recvBuffer
+// holds a channel of only recvMsg structs instead of objects implementing "item" interface.
+// recvBuffer is written to much more often than
+// controlBuffer and using strict recvMsg structs helps avoid allocation in "recvBuffer.put"
type recvBuffer struct {
c chan recvMsg
mu sync.Mutex