[SEBA-930] update GRPC version to 1.27 and change kafka message producing
Change-Id: I14145a1351eb2523fa54e66381ad97abc5eedf50
diff --git a/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go b/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go
index 4062c02..8b10516 100644
--- a/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go
+++ b/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go
@@ -98,7 +98,7 @@
// New methodLogger with same service overrides the old one.
func (l *logger) setServiceMethodLogger(service string, ml *methodLoggerConfig) error {
if _, ok := l.services[service]; ok {
- return fmt.Errorf("conflicting rules for service %v found", service)
+ return fmt.Errorf("conflicting service rules for service %v found", service)
}
if l.services == nil {
l.services = make(map[string]*methodLoggerConfig)
@@ -112,10 +112,10 @@
// New methodLogger with same method overrides the old one.
func (l *logger) setMethodMethodLogger(method string, ml *methodLoggerConfig) error {
if _, ok := l.blacklist[method]; ok {
- return fmt.Errorf("conflicting rules for method %v found", method)
+ return fmt.Errorf("conflicting blacklist rules for method %v found", method)
}
if _, ok := l.methods[method]; ok {
- return fmt.Errorf("conflicting rules for method %v found", method)
+ return fmt.Errorf("conflicting method rules for method %v found", method)
}
if l.methods == nil {
l.methods = make(map[string]*methodLoggerConfig)
@@ -127,10 +127,10 @@
// Set blacklist method for "-service/method".
func (l *logger) setBlacklist(method string) error {
if _, ok := l.blacklist[method]; ok {
- return fmt.Errorf("conflicting rules for method %v found", method)
+ return fmt.Errorf("conflicting blacklist rules for method %v found", method)
}
if _, ok := l.methods[method]; ok {
- return fmt.Errorf("conflicting rules for method %v found", method)
+ return fmt.Errorf("conflicting method rules for method %v found", method)
}
if l.blacklist == nil {
l.blacklist = make(map[string]struct{})
diff --git a/vendor/google.golang.org/grpc/internal/buffer/unbounded.go b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go
index 2cb3109..9f6a0c1 100644
--- a/vendor/google.golang.org/grpc/internal/buffer/unbounded.go
+++ b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go
@@ -26,6 +26,13 @@
//
// All methods on this type are thread-safe and don't block on anything except
// the underlying mutex used for synchronization.
+//
+// Unbounded supports values of any type to be stored in it by using a channel
+// of `interface{}`. This means that a call to Put() incurs an extra memory
+// allocation, and also that users need a type assertion while reading. For
+// performance critical code paths, using Unbounded is strongly discouraged and
+// defining a new type specific implementation of this buffer is preferred. See
+// internal/transport/transport.go for an example of this.
type Unbounded struct {
c chan interface{}
mu sync.Mutex
diff --git a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
index 3ee8740..ae6c897 100644
--- a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
+++ b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
@@ -25,11 +25,14 @@
)
const (
- prefix = "GRPC_GO_"
- retryStr = prefix + "RETRY"
+ prefix = "GRPC_GO_"
+ retryStr = prefix + "RETRY"
+ txtErrIgnoreStr = prefix + "IGNORE_TXT_ERRORS"
)
var (
// Retry is set if retry is explicitly enabled via "GRPC_GO_RETRY=on".
Retry = strings.EqualFold(os.Getenv(retryStr), "on")
+ // TXTErrIgnore is set if TXT errors should be ignored ("GRPC_GO_IGNORE_TXT_ERRORS" is not "false").
+ TXTErrIgnore = !strings.EqualFold(os.Getenv(retryStr), "false")
)
diff --git a/vendor/google.golang.org/grpc/internal/internal.go b/vendor/google.golang.org/grpc/internal/internal.go
index b96b359..0912f0b 100644
--- a/vendor/google.golang.org/grpc/internal/internal.go
+++ b/vendor/google.golang.org/grpc/internal/internal.go
@@ -28,8 +28,6 @@
)
var (
- // WithResolverBuilder is set by dialoptions.go
- WithResolverBuilder interface{} // func (resolver.Builder) grpc.DialOption
// WithHealthCheckFunc is set by dialoptions.go
WithHealthCheckFunc interface{} // func (HealthChecker) DialOption
// HealthCheckFunc is used to provide client-side LB channel health checking
@@ -60,7 +58,7 @@
//
// The health checking protocol is defined at:
// https://github.com/grpc/grpc/blob/master/doc/health-checking.md
-type HealthChecker func(ctx context.Context, newStream func(string) (interface{}, error), setConnectivityState func(connectivity.State), serviceName string) error
+type HealthChecker func(ctx context.Context, newStream func(string) (interface{}, error), setConnectivityState func(connectivity.State, error), serviceName string) error
const (
// CredsBundleModeFallback switches GoogleDefaultCreds to fallback mode.
diff --git a/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
index abc0f92..c368db6 100644
--- a/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
+++ b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
@@ -32,20 +32,23 @@
"sync"
"time"
- "google.golang.org/grpc/backoff"
"google.golang.org/grpc/grpclog"
- internalbackoff "google.golang.org/grpc/internal/backoff"
+ "google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/resolver"
+ "google.golang.org/grpc/serviceconfig"
)
+// EnableSRVLookups controls whether the DNS resolver attempts to fetch gRPCLB
+// addresses from SRV records. Must not be changed after init time.
+var EnableSRVLookups = false
+
func init() {
resolver.Register(NewBuilder())
}
const (
defaultPort = "443"
- defaultFreq = time.Minute * 30
defaultDNSSvrPort = "53"
golang = "GO"
// txtPrefix is the prefix string to be prepended to the host name for txt record lookup.
@@ -95,49 +98,33 @@
// NewBuilder creates a dnsBuilder which is used to factory DNS resolvers.
func NewBuilder() resolver.Builder {
- return &dnsBuilder{minFreq: defaultFreq}
+ return &dnsBuilder{}
}
-type dnsBuilder struct {
- // minimum frequency of polling the DNS server.
- minFreq time.Duration
-}
+type dnsBuilder struct{}
// Build creates and starts a DNS resolver that watches the name resolution of the target.
-func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
+func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
host, port, err := parseTarget(target.Endpoint, defaultPort)
if err != nil {
return nil, err
}
// IP address.
- if net.ParseIP(host) != nil {
- host, _ = formatIP(host)
- addr := []resolver.Address{{Addr: host + ":" + port}}
- i := &ipResolver{
- cc: cc,
- ip: addr,
- rn: make(chan struct{}, 1),
- q: make(chan struct{}),
- }
- cc.NewAddress(addr)
- go i.watcher()
- return i, nil
+ if ipAddr, ok := formatIP(host); ok {
+ addr := []resolver.Address{{Addr: ipAddr + ":" + port}}
+ cc.UpdateState(resolver.State{Addresses: addr})
+ return deadResolver{}, nil
}
// DNS address (non-IP).
ctx, cancel := context.WithCancel(context.Background())
- bc := backoff.DefaultConfig
- bc.MaxDelay = b.minFreq
d := &dnsResolver{
- freq: b.minFreq,
- backoff: internalbackoff.Exponential{Config: bc},
host: host,
port: port,
ctx: ctx,
cancel: cancel,
cc: cc,
- t: time.NewTimer(0),
rn: make(chan struct{}, 1),
disableServiceConfig: opts.DisableServiceConfig,
}
@@ -153,6 +140,7 @@
d.wg.Add(1)
go d.watcher()
+ d.ResolveNow(resolver.ResolveNowOptions{})
return d, nil
}
@@ -167,53 +155,23 @@
LookupTXT(ctx context.Context, name string) (txts []string, err error)
}
-// ipResolver watches for the name resolution update for an IP address.
-type ipResolver struct {
- cc resolver.ClientConn
- ip []resolver.Address
- // rn channel is used by ResolveNow() to force an immediate resolution of the target.
- rn chan struct{}
- q chan struct{}
-}
+// deadResolver is a resolver that does nothing.
+type deadResolver struct{}
-// ResolveNow resend the address it stores, no resolution is needed.
-func (i *ipResolver) ResolveNow(opt resolver.ResolveNowOption) {
- select {
- case i.rn <- struct{}{}:
- default:
- }
-}
+func (deadResolver) ResolveNow(resolver.ResolveNowOptions) {}
-// Close closes the ipResolver.
-func (i *ipResolver) Close() {
- close(i.q)
-}
-
-func (i *ipResolver) watcher() {
- for {
- select {
- case <-i.rn:
- i.cc.NewAddress(i.ip)
- case <-i.q:
- return
- }
- }
-}
+func (deadResolver) Close() {}
// dnsResolver watches for the name resolution update for a non-IP target.
type dnsResolver struct {
- freq time.Duration
- backoff internalbackoff.Exponential
- retryCount int
- host string
- port string
- resolver netResolver
- ctx context.Context
- cancel context.CancelFunc
- cc resolver.ClientConn
+ host string
+ port string
+ resolver netResolver
+ ctx context.Context
+ cancel context.CancelFunc
+ cc resolver.ClientConn
// rn channel is used by ResolveNow() to force an immediate resolution of the target.
rn chan struct{}
- t *time.Timer
// wg is used to enforce Close() to return after the watcher() goroutine has finished.
// Otherwise, data race will be possible. [Race Example] in dns_resolver_test we
// replace the real lookup functions with mocked ones to facilitate testing.
@@ -225,7 +183,7 @@
}
// ResolveNow invoke an immediate resolution of the target that this dnsResolver watches.
-func (d *dnsResolver) ResolveNow(opt resolver.ResolveNowOption) {
+func (d *dnsResolver) ResolveNow(resolver.ResolveNowOptions) {
select {
case d.rn <- struct{}{}:
default:
@@ -236,7 +194,6 @@
func (d *dnsResolver) Close() {
d.cancel()
d.wg.Wait()
- d.t.Stop()
}
func (d *dnsResolver) watcher() {
@@ -245,27 +202,15 @@
select {
case <-d.ctx.Done():
return
- case <-d.t.C:
case <-d.rn:
- if !d.t.Stop() {
- // Before resetting a timer, it should be stopped to prevent racing with
- // reads on it's channel.
- <-d.t.C
- }
}
- result, sc := d.lookup()
- // Next lookup should happen within an interval defined by d.freq. It may be
- // more often due to exponential retry on empty address list.
- if len(result) == 0 {
- d.retryCount++
- d.t.Reset(d.backoff.Backoff(d.retryCount))
+ state, err := d.lookup()
+ if err != nil {
+ d.cc.ReportError(err)
} else {
- d.retryCount = 0
- d.t.Reset(d.freq)
+ d.cc.UpdateState(*state)
}
- d.cc.NewServiceConfig(sc)
- d.cc.NewAddress(result)
// Sleep to prevent excessive re-resolutions. Incoming resolution requests
// will be queued in d.rn.
@@ -279,37 +224,68 @@
}
}
-func (d *dnsResolver) lookupSRV() []resolver.Address {
+func (d *dnsResolver) lookupSRV() ([]resolver.Address, error) {
+ if !EnableSRVLookups {
+ return nil, nil
+ }
var newAddrs []resolver.Address
_, srvs, err := d.resolver.LookupSRV(d.ctx, "grpclb", "tcp", d.host)
if err != nil {
- grpclog.Infof("grpc: failed dns SRV record lookup due to %v.\n", err)
- return nil
+ err = handleDNSError(err, "SRV") // may become nil
+ return nil, err
}
for _, s := range srvs {
lbAddrs, err := d.resolver.LookupHost(d.ctx, s.Target)
if err != nil {
- grpclog.Infof("grpc: failed load balancer address dns lookup due to %v.\n", err)
- continue
- }
- for _, a := range lbAddrs {
- a, ok := formatIP(a)
- if !ok {
- grpclog.Errorf("grpc: failed IP parsing due to %v.\n", err)
+ err = handleDNSError(err, "A") // may become nil
+ if err == nil {
+ // If there are other SRV records, look them up and ignore this
+ // one that does not exist.
continue
}
- addr := a + ":" + strconv.Itoa(int(s.Port))
+ return nil, err
+ }
+ for _, a := range lbAddrs {
+ ip, ok := formatIP(a)
+ if !ok {
+ return nil, fmt.Errorf("dns: error parsing A record IP address %v", a)
+ }
+ addr := ip + ":" + strconv.Itoa(int(s.Port))
newAddrs = append(newAddrs, resolver.Address{Addr: addr, Type: resolver.GRPCLB, ServerName: s.Target})
}
}
- return newAddrs
+ return newAddrs, nil
}
-func (d *dnsResolver) lookupTXT() string {
+var filterError = func(err error) error {
+ if dnsErr, ok := err.(*net.DNSError); ok && !dnsErr.IsTimeout && !dnsErr.IsTemporary {
+ // Timeouts and temporary errors should be communicated to gRPC to
+ // attempt another DNS query (with backoff). Other errors should be
+ // suppressed (they may represent the absence of a TXT record).
+ return nil
+ }
+ return err
+}
+
+func handleDNSError(err error, lookupType string) error {
+ err = filterError(err)
+ if err != nil {
+ err = fmt.Errorf("dns: %v record lookup error: %v", lookupType, err)
+ grpclog.Infoln(err)
+ }
+ return err
+}
+
+func (d *dnsResolver) lookupTXT() *serviceconfig.ParseResult {
ss, err := d.resolver.LookupTXT(d.ctx, txtPrefix+d.host)
if err != nil {
- grpclog.Infof("grpc: failed dns TXT record lookup due to %v.\n", err)
- return ""
+ if envconfig.TXTErrIgnore {
+ return nil
+ }
+ if err = handleDNSError(err, "TXT"); err != nil {
+ return &serviceconfig.ParseResult{Err: err}
+ }
+ return nil
}
var res string
for _, s := range ss {
@@ -318,40 +294,45 @@
// TXT record must have "grpc_config=" attribute in order to be used as service config.
if !strings.HasPrefix(res, txtAttribute) {
- grpclog.Warningf("grpc: TXT record %v missing %v attribute", res, txtAttribute)
- return ""
+ grpclog.Warningf("dns: TXT record %v missing %v attribute", res, txtAttribute)
+ // This is not an error; it is the equivalent of not having a service config.
+ return nil
}
- return strings.TrimPrefix(res, txtAttribute)
+ sc := canaryingSC(strings.TrimPrefix(res, txtAttribute))
+ return d.cc.ParseServiceConfig(sc)
}
-func (d *dnsResolver) lookupHost() []resolver.Address {
+func (d *dnsResolver) lookupHost() ([]resolver.Address, error) {
var newAddrs []resolver.Address
addrs, err := d.resolver.LookupHost(d.ctx, d.host)
if err != nil {
- grpclog.Warningf("grpc: failed dns A record lookup due to %v.\n", err)
- return nil
+ err = handleDNSError(err, "A")
+ return nil, err
}
for _, a := range addrs {
- a, ok := formatIP(a)
+ ip, ok := formatIP(a)
if !ok {
- grpclog.Errorf("grpc: failed IP parsing due to %v.\n", err)
- continue
+ return nil, fmt.Errorf("dns: error parsing A record IP address %v", a)
}
- addr := a + ":" + d.port
+ addr := ip + ":" + d.port
newAddrs = append(newAddrs, resolver.Address{Addr: addr})
}
- return newAddrs
+ return newAddrs, nil
}
-func (d *dnsResolver) lookup() ([]resolver.Address, string) {
- newAddrs := d.lookupSRV()
- // Support fallback to non-balancer address.
- newAddrs = append(newAddrs, d.lookupHost()...)
- if d.disableServiceConfig {
- return newAddrs, ""
+func (d *dnsResolver) lookup() (*resolver.State, error) {
+ srv, srvErr := d.lookupSRV()
+ addrs, hostErr := d.lookupHost()
+ if hostErr != nil && (srvErr != nil || len(srv) == 0) {
+ return nil, hostErr
}
- sc := d.lookupTXT()
- return newAddrs, canaryingSC(sc)
+ state := &resolver.State{
+ Addresses: append(addrs, srv...),
+ }
+ if !d.disableServiceConfig {
+ state.ServiceConfig = d.lookupTXT()
+ }
+ return state, nil
}
// formatIP returns ok = false if addr is not a valid textual representation of an IP address.
@@ -437,12 +418,12 @@
var rcs []rawChoice
err := json.Unmarshal([]byte(js), &rcs)
if err != nil {
- grpclog.Warningf("grpc: failed to parse service config json string due to %v.\n", err)
+ grpclog.Warningf("dns: error parsing service config json: %v", err)
return ""
}
cliHostname, err := os.Hostname()
if err != nil {
- grpclog.Warningf("grpc: failed to get client hostname due to %v.\n", err)
+ grpclog.Warningf("dns: error getting client hostname: %v", err)
return ""
}
var sc string
diff --git a/vendor/google.golang.org/grpc/internal/resolver/dns/go113.go b/vendor/google.golang.org/grpc/internal/resolver/dns/go113.go
new file mode 100644
index 0000000..8783a8c
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/resolver/dns/go113.go
@@ -0,0 +1,33 @@
+// +build go1.13
+
+/*
+ *
+ * Copyright 2019 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package dns
+
+import "net"
+
+func init() {
+ filterError = func(err error) error {
+ if dnsErr, ok := err.(*net.DNSError); ok && dnsErr.IsNotFound {
+ // The name does not exist; not an error.
+ return nil
+ }
+ return err
+ }
+}
diff --git a/vendor/google.golang.org/grpc/internal/resolver/passthrough/passthrough.go b/vendor/google.golang.org/grpc/internal/resolver/passthrough/passthrough.go
index 893d5d1..520d922 100644
--- a/vendor/google.golang.org/grpc/internal/resolver/passthrough/passthrough.go
+++ b/vendor/google.golang.org/grpc/internal/resolver/passthrough/passthrough.go
@@ -26,7 +26,7 @@
type passthroughBuilder struct{}
-func (*passthroughBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
+func (*passthroughBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
r := &passthroughResolver{
target: target,
cc: cc,
@@ -48,7 +48,7 @@
r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: r.target.Endpoint}}})
}
-func (*passthroughResolver) ResolveNow(o resolver.ResolveNowOption) {}
+func (*passthroughResolver) ResolveNow(o resolver.ResolveNowOptions) {}
func (*passthroughResolver) Close() {}
diff --git a/vendor/google.golang.org/grpc/internal/transport/handler_server.go b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
index 78f9ddc..c3c32da 100644
--- a/vendor/google.golang.org/grpc/internal/transport/handler_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
@@ -227,7 +227,9 @@
if err == nil { // transport has not been closed
if ht.stats != nil {
- ht.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
+ ht.stats.HandleRPC(s.Context(), &stats.OutTrailer{
+ Trailer: s.trailer.Copy(),
+ })
}
}
ht.Close()
@@ -289,7 +291,9 @@
if err == nil {
if ht.stats != nil {
- ht.stats.HandleRPC(s.Context(), &stats.OutHeader{})
+ ht.stats.HandleRPC(s.Context(), &stats.OutHeader{
+ Header: md.Copy(),
+ })
}
}
return err
@@ -334,7 +338,7 @@
Addr: ht.RemoteAddr(),
}
if req.TLS != nil {
- pr.AuthInfo = credentials.TLSInfo{State: *req.TLS}
+ pr.AuthInfo = credentials.TLSInfo{State: *req.TLS, CommonAuthInfo: credentials.CommonAuthInfo{credentials.PrivacyAndIntegrity}}
}
ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
s.ctx = peer.NewContext(ctx, pr)
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
index 294661a..2d6feeb 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
@@ -45,9 +45,14 @@
"google.golang.org/grpc/status"
)
+// clientConnectionCounter counts the number of connections a client has
+// initiated (equal to the number of http2Clients created). Must be accessed
+// atomically.
+var clientConnectionCounter uint64
+
// http2Client implements the ClientTransport interface with HTTP2.
type http2Client struct {
- lastRead int64 // keep this field 64-bit aligned
+ lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
ctx context.Context
cancel context.CancelFunc
ctxDone <-chan struct{} // Cache the ctx.Done() chan.
@@ -126,6 +131,8 @@
onClose func()
bufferPool *bufferPool
+
+ connectionID uint64
}
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
@@ -329,6 +336,8 @@
}
}
+ t.connectionID = atomic.AddUint64(&clientConnectionCounter, 1)
+
if err := t.framer.writer.Flush(); err != nil {
return nil, err
}
@@ -394,7 +403,8 @@
func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
aud := t.createAudience(callHdr)
ri := credentials.RequestInfo{
- Method: callHdr.Method,
+ Method: callHdr.Method,
+ AuthInfo: t.authInfo,
}
ctxWithRequestInfo := internal.NewRequestInfoContext.(func(context.Context, credentials.RequestInfo) context.Context)(ctx, ri)
authData, err := t.getTrAuthData(ctxWithRequestInfo, aud)
@@ -424,6 +434,7 @@
if callHdr.SendCompress != "" {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
+ headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-accept-encoding", Value: callHdr.SendCompress})
}
if dl, ok := ctx.Deadline(); ok {
// Send out timeout regardless its value. The server can detect timeout context by itself.
@@ -669,12 +680,14 @@
}
}
if t.statsHandler != nil {
+ header, _, _ := metadata.FromOutgoingContextRaw(ctx)
outHeader := &stats.OutHeader{
Client: true,
FullMethod: callHdr.Method,
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
Compression: callHdr.SendCompress,
+ Header: header.Copy(),
}
t.statsHandler.HandleRPC(s.ctx, outHeader)
}
@@ -1177,12 +1190,14 @@
inHeader := &stats.InHeader{
Client: true,
WireLength: int(frame.Header().Length),
+ Header: s.header.Copy(),
}
t.statsHandler.HandleRPC(s.ctx, inHeader)
} else {
inTrailer := &stats.InTrailer{
Client: true,
WireLength: int(frame.Header().Length),
+ Trailer: s.trailer.Copy(),
}
t.statsHandler.HandleRPC(s.ctx, inTrailer)
}
@@ -1369,7 +1384,6 @@
// acked).
sleepDuration := minTime(t.kp.Time, timeoutLeft)
timeoutLeft -= sleepDuration
- prevNano = lastRead
timer.Reset(sleepDuration)
case <-t.ctx.Done():
if !timer.Stop() {
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
index 0760383..8b04b03 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
@@ -62,8 +62,13 @@
statusRawProto = internal.StatusRawProto.(func(*status.Status) *spb.Status)
)
+// serverConnectionCounter counts the number of connections a server has seen
+// (equal to the number of http2Servers created). Must be accessed atomically.
+var serverConnectionCounter uint64
+
// http2Server implements the ServerTransport interface with HTTP2.
type http2Server struct {
+ lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
ctx context.Context
done chan struct{}
conn net.Conn
@@ -83,12 +88,8 @@
controlBuf *controlBuffer
fc *trInFlow
stats stats.Handler
- // Flag to keep track of reading activity on transport.
- // 1 is true and 0 is false.
- activity uint32 // Accessed atomically.
// Keepalive and max-age parameters for the server.
kp keepalive.ServerParameters
-
// Keepalive enforcement policy.
kep keepalive.EnforcementPolicy
// The time instance last ping was received.
@@ -124,6 +125,8 @@
channelzID int64 // channelz unique identification number
czData *channelzData
bufferPool *bufferPool
+
+ connectionID uint64
}
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
@@ -253,6 +256,9 @@
if channelz.IsOn() {
t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
}
+
+ t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
+
t.framer.writer.Flush()
defer func() {
@@ -277,7 +283,7 @@
if err != nil {
return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
}
- atomic.StoreUint32(&t.activity, 1)
+ atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
@@ -416,6 +422,7 @@
LocalAddr: t.localAddr,
Compression: s.recvCompress,
WireLength: int(frame.Header().Length),
+ Header: metadata.MD(state.data.mdata).Copy(),
}
t.stats.HandleRPC(s.ctx, inHeader)
}
@@ -449,7 +456,7 @@
for {
t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame()
- atomic.StoreUint32(&t.activity, 1)
+ atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
if err != nil {
if se, ok := err.(http2.StreamError); ok {
warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
@@ -808,7 +815,9 @@
if t.stats != nil {
// Note: WireLength is not set in outHeader.
// TODO(mmukhi): Revisit this later, if needed.
- outHeader := &stats.OutHeader{}
+ outHeader := &stats.OutHeader{
+ Header: s.header.Copy(),
+ }
t.stats.HandleRPC(s.Context(), outHeader)
}
return nil
@@ -871,7 +880,9 @@
rst := s.getState() == streamActive
t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
if t.stats != nil {
- t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
+ t.stats.HandleRPC(s.Context(), &stats.OutTrailer{
+ Trailer: s.trailer.Copy(),
+ })
}
return nil
}
@@ -932,32 +943,35 @@
// after an additional duration of keepalive.Timeout.
func (t *http2Server) keepalive() {
p := &ping{}
- var pingSent bool
- maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
- maxAge := time.NewTimer(t.kp.MaxConnectionAge)
- keepalive := time.NewTimer(t.kp.Time)
- // NOTE: All exit paths of this function should reset their
- // respective timers. A failure to do so will cause the
- // following clean-up to deadlock and eventually leak.
+ // True iff a ping has been sent, and no data has been received since then.
+ outstandingPing := false
+ // Amount of time remaining before which we should receive an ACK for the
+ // last sent ping.
+ kpTimeoutLeft := time.Duration(0)
+ // Records the last value of t.lastRead before we go block on the timer.
+ // This is required to check for read activity since then.
+ prevNano := time.Now().UnixNano()
+ // Initialize the different timers to their default values.
+ idleTimer := time.NewTimer(t.kp.MaxConnectionIdle)
+ ageTimer := time.NewTimer(t.kp.MaxConnectionAge)
+ kpTimer := time.NewTimer(t.kp.Time)
defer func() {
- if !maxIdle.Stop() {
- <-maxIdle.C
- }
- if !maxAge.Stop() {
- <-maxAge.C
- }
- if !keepalive.Stop() {
- <-keepalive.C
- }
+ // We need to drain the underlying channel in these timers after a call
+ // to Stop(), only if we are interested in resetting them. Clearly we
+ // are not interested in resetting them here.
+ idleTimer.Stop()
+ ageTimer.Stop()
+ kpTimer.Stop()
}()
+
for {
select {
- case <-maxIdle.C:
+ case <-idleTimer.C:
t.mu.Lock()
idle := t.idle
if idle.IsZero() { // The connection is non-idle.
t.mu.Unlock()
- maxIdle.Reset(t.kp.MaxConnectionIdle)
+ idleTimer.Reset(t.kp.MaxConnectionIdle)
continue
}
val := t.kp.MaxConnectionIdle - time.Since(idle)
@@ -966,43 +980,51 @@
// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
// Gracefully close the connection.
t.drain(http2.ErrCodeNo, []byte{})
- // Resetting the timer so that the clean-up doesn't deadlock.
- maxIdle.Reset(infinity)
return
}
- maxIdle.Reset(val)
- case <-maxAge.C:
+ idleTimer.Reset(val)
+ case <-ageTimer.C:
t.drain(http2.ErrCodeNo, []byte{})
- maxAge.Reset(t.kp.MaxConnectionAgeGrace)
+ ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
select {
- case <-maxAge.C:
+ case <-ageTimer.C:
// Close the connection after grace period.
infof("transport: closing server transport due to maximum connection age.")
t.Close()
- // Resetting the timer so that the clean-up doesn't deadlock.
- maxAge.Reset(infinity)
case <-t.done:
}
return
- case <-keepalive.C:
- if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
- pingSent = false
- keepalive.Reset(t.kp.Time)
+ case <-kpTimer.C:
+ lastRead := atomic.LoadInt64(&t.lastRead)
+ if lastRead > prevNano {
+ // There has been read activity since the last time we were
+ // here. Setup the timer to fire at kp.Time seconds from
+ // lastRead time and continue.
+ outstandingPing = false
+ kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
+ prevNano = lastRead
continue
}
- if pingSent {
+ if outstandingPing && kpTimeoutLeft <= 0 {
infof("transport: closing server transport due to idleness.")
t.Close()
- // Resetting the timer so that the clean-up doesn't deadlock.
- keepalive.Reset(infinity)
return
}
- pingSent = true
- if channelz.IsOn() {
- atomic.AddInt64(&t.czData.kpCount, 1)
+ if !outstandingPing {
+ if channelz.IsOn() {
+ atomic.AddInt64(&t.czData.kpCount, 1)
+ }
+ t.controlBuf.put(p)
+ kpTimeoutLeft = t.kp.Timeout
+ outstandingPing = true
}
- t.controlBuf.put(p)
- keepalive.Reset(t.kp.Timeout)
+ // The amount of time to sleep here is the minimum of kp.Time and
+ // timeoutLeft. This will ensure that we wait only for kp.Time
+ // before sending out the next ping (for cases where the ping is
+ // acked).
+ sleepDuration := minTime(t.kp.Time, kpTimeoutLeft)
+ kpTimeoutLeft -= sleepDuration
+ kpTimer.Reset(sleepDuration)
case <-t.done:
return
}
diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go
index bfab940..a30da9e 100644
--- a/vendor/google.golang.org/grpc/internal/transport/transport.go
+++ b/vendor/google.golang.org/grpc/internal/transport/transport.go
@@ -73,10 +73,11 @@
}
// recvBuffer is an unbounded channel of recvMsg structs.
-// Note recvBuffer differs from controlBuffer only in that recvBuffer
-// holds a channel of only recvMsg structs instead of objects implementing "item" interface.
-// recvBuffer is written to much more often than
-// controlBuffer and using strict recvMsg structs helps avoid allocation in "recvBuffer.put"
+//
+// Note: recvBuffer differs from buffer.Unbounded only in the fact that it
+// holds a channel of recvMsg structs instead of objects implementing "item"
+// interface. recvBuffer is written to much more often and using strict recvMsg
+// structs helps avoid allocation in "recvBuffer.put"
type recvBuffer struct {
c chan recvMsg
mu sync.Mutex