interim commit go fmt stuff
Change-Id: I617a7d771b50c2b7999eabbbebbabef4e68d8713
diff --git a/vendor/google.golang.org/grpc/.travis.yml b/vendor/google.golang.org/grpc/.travis.yml
index 024408e..87f40b8 100644
--- a/vendor/google.golang.org/grpc/.travis.yml
+++ b/vendor/google.golang.org/grpc/.travis.yml
@@ -2,18 +2,20 @@
matrix:
include:
- - go: 1.12.x
+ - go: 1.13.x
env: VET=1 GO111MODULE=on
- - go: 1.12.x
+ - go: 1.13.x
env: RACE=1 GO111MODULE=on
- - go: 1.12.x
+ - go: 1.13.x
env: RUN386=1
- - go: 1.12.x
+ - go: 1.13.x
env: GRPC_GO_RETRY=on
+ - go: 1.13.x
+ env: TESTEXAMPLES=1
+ - go: 1.12.x
+ env: GO111MODULE=on
- go: 1.11.x
env: GO111MODULE=on
- - go: 1.10.x
- - go: 1.9.x
- go: 1.9.x
env: GAE=1
@@ -23,17 +25,18 @@
- if [[ "${GO111MODULE}" = "on" ]]; then mkdir "${HOME}/go"; export GOPATH="${HOME}/go"; fi
- if [[ -n "${RUN386}" ]]; then export GOARCH=386; fi
- if [[ "${TRAVIS_EVENT_TYPE}" = "cron" && -z "${RUN386}" ]]; then RACE=1; fi
- - if [[ "${TRAVIS_EVENT_TYPE}" != "cron" ]]; then VET_SKIP_PROTO=1; fi
+ - if [[ "${TRAVIS_EVENT_TYPE}" != "cron" ]]; then export VET_SKIP_PROTO=1; fi
install:
- try3() { eval "$*" || eval "$*" || eval "$*"; }
- try3 'if [[ "${GO111MODULE}" = "on" ]]; then go mod download; else make testdeps; fi'
- - if [[ "${GAE}" = 1 ]]; then source ./install_gae.sh; make testappenginedeps; fi
- - if [[ "${VET}" = 1 ]]; then ./vet.sh -install; fi
+ - if [[ -n "${GAE}" ]]; then source ./install_gae.sh; make testappenginedeps; fi
+ - if [[ -n "${VET}" ]]; then ./vet.sh -install; fi
script:
- set -e
- - if [[ "${VET}" = 1 ]]; then ./vet.sh; fi
- - if [[ "${GAE}" = 1 ]]; then make testappengine; exit 0; fi
- - if [[ "${RACE}" = 1 ]]; then make testrace; exit 0; fi
+ - if [[ -n "${TESTEXAMPLES}" ]]; then examples/examples_test.sh; exit 0; fi
+ - if [[ -n "${VET}" ]]; then ./vet.sh; fi
+ - if [[ -n "${GAE}" ]]; then make testappengine; exit 0; fi
+ - if [[ -n "${RACE}" ]]; then make testrace; exit 0; fi
- make test
diff --git a/vendor/google.golang.org/grpc/CODE-OF-CONDUCT.md b/vendor/google.golang.org/grpc/CODE-OF-CONDUCT.md
new file mode 100644
index 0000000..9d4213e
--- /dev/null
+++ b/vendor/google.golang.org/grpc/CODE-OF-CONDUCT.md
@@ -0,0 +1,3 @@
+## Community Code of Conduct
+
+gRPC follows the [CNCF Code of Conduct](https://github.com/cncf/foundation/blob/master/code-of-conduct.md).
diff --git a/vendor/google.golang.org/grpc/CONTRIBUTING.md b/vendor/google.golang.org/grpc/CONTRIBUTING.md
index 6e69b28..4f1567e 100644
--- a/vendor/google.golang.org/grpc/CONTRIBUTING.md
+++ b/vendor/google.golang.org/grpc/CONTRIBUTING.md
@@ -1,6 +1,8 @@
# How to contribute
-We definitely welcome your patches and contributions to gRPC!
+We definitely welcome your patches and contributions to gRPC! Please read the gRPC
+organization's [governance rules](https://github.com/grpc/grpc-community/blob/master/governance.md)
+and [contribution guidelines](https://github.com/grpc/grpc-community/blob/master/CONTRIBUTING.md) before proceeding.
If you are new to github, please start by reading [Pull Request howto](https://help.github.com/articles/about-pull-requests/)
diff --git a/vendor/google.golang.org/grpc/GOVERNANCE.md b/vendor/google.golang.org/grpc/GOVERNANCE.md
new file mode 100644
index 0000000..d6ff267
--- /dev/null
+++ b/vendor/google.golang.org/grpc/GOVERNANCE.md
@@ -0,0 +1 @@
+This repository is governed by the gRPC organization's [governance rules](https://github.com/grpc/grpc-community/blob/master/governance.md).
diff --git a/vendor/google.golang.org/grpc/MAINTAINERS.md b/vendor/google.golang.org/grpc/MAINTAINERS.md
new file mode 100644
index 0000000..093c82b
--- /dev/null
+++ b/vendor/google.golang.org/grpc/MAINTAINERS.md
@@ -0,0 +1,27 @@
+This page lists all active maintainers of this repository. If you were a
+maintainer and would like to add your name to the Emeritus list, please send us a
+PR.
+
+See [GOVERNANCE.md](https://github.com/grpc/grpc-community/blob/master/governance.md)
+for governance guidelines and how to become a maintainer.
+See [CONTRIBUTING.md](https://github.com/grpc/grpc-community/blob/master/CONTRIBUTING.md)
+for general contribution guidelines.
+
+## Maintainers (in alphabetical order)
+- [canguler](https://github.com/canguler), Google LLC
+- [cesarghali](https://github.com/cesarghali), Google LLC
+- [dfawley](https://github.com/dfawley), Google LLC
+- [easwars](https://github.com/easwars), Google LLC
+- [jadekler](https://github.com/jadekler), Google LLC
+- [menghanl](https://github.com/menghanl), Google LLC
+- [srini100](https://github.com/srini100), Google LLC
+
+## Emeritus Maintainers (in alphabetical order)
+- [adelez](https://github.com/adelez), Google LLC
+- [iamqizhao](https://github.com/iamqizhao), Google LLC
+- [jtattermusch](https://github.com/jtattermusch), Google LLC
+- [lyuxuan](https://github.com/lyuxuan), Google LLC
+- [makmukhi](https://github.com/makmukhi), Google LLC
+- [matt-kwong](https://github.com/matt-kwong), Google LLC
+- [nicolasnoble](https://github.com/nicolasnoble), Google LLC
+- [yongni](https://github.com/yongni), Google LLC
diff --git a/vendor/google.golang.org/grpc/backoff.go b/vendor/google.golang.org/grpc/backoff.go
index 97c6e25..ff7c3ee 100644
--- a/vendor/google.golang.org/grpc/backoff.go
+++ b/vendor/google.golang.org/grpc/backoff.go
@@ -23,16 +23,36 @@
import (
"time"
+
+ "google.golang.org/grpc/backoff"
)
// DefaultBackoffConfig uses values specified for backoff in
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md.
+//
+// Deprecated: use ConnectParams instead. Will be supported throughout 1.x.
var DefaultBackoffConfig = BackoffConfig{
MaxDelay: 120 * time.Second,
}
// BackoffConfig defines the parameters for the default gRPC backoff strategy.
+//
+// Deprecated: use ConnectParams instead. Will be supported throughout 1.x.
type BackoffConfig struct {
// MaxDelay is the upper bound of backoff delay.
MaxDelay time.Duration
}
+
+// ConnectParams defines the parameters for connecting and retrying. Users are
+// encouraged to use this instead of the BackoffConfig type defined above. See
+// here for more details:
+// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md.
+//
+// This API is EXPERIMENTAL.
+type ConnectParams struct {
+ // Backoff specifies the configuration options for connection backoff.
+ Backoff backoff.Config
+ // MinConnectTimeout is the minimum amount of time we are willing to give a
+ // connection to complete.
+ MinConnectTimeout time.Duration
+}
diff --git a/vendor/google.golang.org/grpc/backoff/backoff.go b/vendor/google.golang.org/grpc/backoff/backoff.go
new file mode 100644
index 0000000..0787d0b
--- /dev/null
+++ b/vendor/google.golang.org/grpc/backoff/backoff.go
@@ -0,0 +1,52 @@
+/*
+ *
+ * Copyright 2019 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package backoff provides configuration options for backoff.
+//
+// More details can be found at:
+// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md.
+//
+// All APIs in this package are experimental.
+package backoff
+
+import "time"
+
+// Config defines the configuration options for backoff.
+type Config struct {
+ // BaseDelay is the amount of time to backoff after the first failure.
+ BaseDelay time.Duration
+ // Multiplier is the factor with which to multiply backoffs after a
+ // failed retry. Should ideally be greater than 1.
+ Multiplier float64
+ // Jitter is the factor with which backoffs are randomized.
+ Jitter float64
+ // MaxDelay is the upper bound of backoff delay.
+ MaxDelay time.Duration
+}
+
+// DefaultConfig is a backoff configuration with the default values specfied
+// at https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md.
+//
+// This should be useful for callers who want to configure backoff with
+// non-default values only for a subset of the options.
+var DefaultConfig = Config{
+ BaseDelay: 1.0 * time.Second,
+ Multiplier: 1.6,
+ Jitter: 0.2,
+ MaxDelay: 120 * time.Second,
+}
diff --git a/vendor/google.golang.org/grpc/balancer/balancer.go b/vendor/google.golang.org/grpc/balancer/balancer.go
index c266f4e..917c242 100644
--- a/vendor/google.golang.org/grpc/balancer/balancer.go
+++ b/vendor/google.golang.org/grpc/balancer/balancer.go
@@ -305,14 +305,23 @@
BalancerConfig serviceconfig.LoadBalancingConfig
}
+// ErrBadResolverState may be returned by UpdateClientConnState to indicate a
+// problem with the provided name resolver data.
+var ErrBadResolverState = errors.New("bad resolver state")
+
// V2Balancer is defined for documentation purposes. If a Balancer also
// implements V2Balancer, its UpdateClientConnState method will be called
// instead of HandleResolvedAddrs and its UpdateSubConnState will be called
// instead of HandleSubConnStateChange.
type V2Balancer interface {
// UpdateClientConnState is called by gRPC when the state of the ClientConn
- // changes.
- UpdateClientConnState(ClientConnState)
+ // changes. If the error returned is ErrBadResolverState, the ClientConn
+ // will begin calling ResolveNow on the active name resolver with
+ // exponential backoff until a subsequent call to UpdateClientConnState
+ // returns a nil error. Any other errors are currently ignored.
+ UpdateClientConnState(ClientConnState) error
+ // ResolverError is called by gRPC when the name resolver reports an error.
+ ResolverError(error)
// UpdateSubConnState is called by gRPC when the state of a SubConn
// changes.
UpdateSubConnState(SubConn, SubConnState)
diff --git a/vendor/google.golang.org/grpc/balancer/base/balancer.go b/vendor/google.golang.org/grpc/balancer/base/balancer.go
index 1af88f0..1a5c1aa 100644
--- a/vendor/google.golang.org/grpc/balancer/base/balancer.go
+++ b/vendor/google.golang.org/grpc/balancer/base/balancer.go
@@ -53,6 +53,8 @@
return bb.name
}
+var _ balancer.V2Balancer = (*baseBalancer)(nil) // Assert that we implement V2Balancer
+
type baseBalancer struct {
cc balancer.ClientConn
pickerBuilder PickerBuilder
@@ -70,7 +72,11 @@
panic("not implemented")
}
-func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) {
+func (b *baseBalancer) ResolverError(error) {
+ // Ignore
+}
+
+func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
// TODO: handle s.ResolverState.Err (log if not nil) once implemented.
// TODO: handle s.ResolverState.ServiceConfig?
if grpclog.V(2) {
@@ -101,6 +107,7 @@
// The entry will be deleted in HandleSubConnStateChange.
}
}
+ return nil
}
// regeneratePicker takes a snapshot of the balancer, and generates a picker
diff --git a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
index 8df4095..5356194 100644
--- a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
+++ b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
@@ -25,6 +25,8 @@
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/internal/buffer"
+ "google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/resolver"
)
@@ -34,62 +36,14 @@
state connectivity.State
}
-// scStateUpdateBuffer is an unbounded channel for scStateChangeTuple.
-// TODO make a general purpose buffer that uses interface{}.
-type scStateUpdateBuffer struct {
- c chan *scStateUpdate
- mu sync.Mutex
- backlog []*scStateUpdate
-}
-
-func newSCStateUpdateBuffer() *scStateUpdateBuffer {
- return &scStateUpdateBuffer{
- c: make(chan *scStateUpdate, 1),
- }
-}
-
-func (b *scStateUpdateBuffer) put(t *scStateUpdate) {
- b.mu.Lock()
- defer b.mu.Unlock()
- if len(b.backlog) == 0 {
- select {
- case b.c <- t:
- return
- default:
- }
- }
- b.backlog = append(b.backlog, t)
-}
-
-func (b *scStateUpdateBuffer) load() {
- b.mu.Lock()
- defer b.mu.Unlock()
- if len(b.backlog) > 0 {
- select {
- case b.c <- b.backlog[0]:
- b.backlog[0] = nil
- b.backlog = b.backlog[1:]
- default:
- }
- }
-}
-
-// get returns the channel that the scStateUpdate will be sent to.
-//
-// Upon receiving, the caller should call load to send another
-// scStateChangeTuple onto the channel if there is any.
-func (b *scStateUpdateBuffer) get() <-chan *scStateUpdate {
- return b.c
-}
-
// ccBalancerWrapper is a wrapper on top of cc for balancers.
// It implements balancer.ClientConn interface.
type ccBalancerWrapper struct {
- cc *ClientConn
- balancer balancer.Balancer
- stateChangeQueue *scStateUpdateBuffer
- ccUpdateCh chan *balancer.ClientConnState
- done chan struct{}
+ cc *ClientConn
+ balancerMu sync.Mutex // synchronizes calls to the balancer
+ balancer balancer.Balancer
+ scBuffer *buffer.Unbounded
+ done *grpcsync.Event
mu sync.Mutex
subConns map[*acBalancerWrapper]struct{}
@@ -97,11 +51,10 @@
func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
ccb := &ccBalancerWrapper{
- cc: cc,
- stateChangeQueue: newSCStateUpdateBuffer(),
- ccUpdateCh: make(chan *balancer.ClientConnState, 1),
- done: make(chan struct{}),
- subConns: make(map[*acBalancerWrapper]struct{}),
+ cc: cc,
+ scBuffer: buffer.NewUnbounded(),
+ done: grpcsync.NewEvent(),
+ subConns: make(map[*acBalancerWrapper]struct{}),
}
go ccb.watcher()
ccb.balancer = b.Build(ccb, bopts)
@@ -113,36 +66,23 @@
func (ccb *ccBalancerWrapper) watcher() {
for {
select {
- case t := <-ccb.stateChangeQueue.get():
- ccb.stateChangeQueue.load()
- select {
- case <-ccb.done:
- ccb.balancer.Close()
- return
- default:
+ case t := <-ccb.scBuffer.Get():
+ ccb.scBuffer.Load()
+ if ccb.done.HasFired() {
+ break
}
+ ccb.balancerMu.Lock()
+ su := t.(*scStateUpdate)
if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
- ub.UpdateSubConnState(t.sc, balancer.SubConnState{ConnectivityState: t.state})
+ ub.UpdateSubConnState(su.sc, balancer.SubConnState{ConnectivityState: su.state})
} else {
- ccb.balancer.HandleSubConnStateChange(t.sc, t.state)
+ ccb.balancer.HandleSubConnStateChange(su.sc, su.state)
}
- case s := <-ccb.ccUpdateCh:
- select {
- case <-ccb.done:
- ccb.balancer.Close()
- return
- default:
- }
- if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
- ub.UpdateClientConnState(*s)
- } else {
- ccb.balancer.HandleResolvedAddrs(s.ResolverState.Addresses, nil)
- }
- case <-ccb.done:
+ ccb.balancerMu.Unlock()
+ case <-ccb.done.Done():
}
- select {
- case <-ccb.done:
+ if ccb.done.HasFired() {
ccb.balancer.Close()
ccb.mu.Lock()
scs := ccb.subConns
@@ -153,14 +93,12 @@
}
ccb.UpdateBalancerState(connectivity.Connecting, nil)
return
- default:
}
- ccb.cc.firstResolveEvent.Fire()
}
}
func (ccb *ccBalancerWrapper) close() {
- close(ccb.done)
+ ccb.done.Fire()
}
func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
@@ -174,30 +112,28 @@
if sc == nil {
return
}
- ccb.stateChangeQueue.put(&scStateUpdate{
+ ccb.scBuffer.Put(&scStateUpdate{
sc: sc,
state: s,
})
}
-func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) {
- if ccb.cc.curBalancerName != grpclbName {
- // Filter any grpclb addresses since we don't have the grpclb balancer.
- s := &ccs.ResolverState
- for i := 0; i < len(s.Addresses); {
- if s.Addresses[i].Type == resolver.GRPCLB {
- copy(s.Addresses[i:], s.Addresses[i+1:])
- s.Addresses = s.Addresses[:len(s.Addresses)-1]
- continue
- }
- i++
- }
+func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
+ ccb.balancerMu.Lock()
+ defer ccb.balancerMu.Unlock()
+ if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
+ return ub.UpdateClientConnState(*ccs)
}
- select {
- case <-ccb.ccUpdateCh:
- default:
+ ccb.balancer.HandleResolvedAddrs(ccs.ResolverState.Addresses, nil)
+ return nil
+}
+
+func (ccb *ccBalancerWrapper) resolverError(err error) {
+ if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
+ ccb.balancerMu.Lock()
+ ub.ResolverError(err)
+ ccb.balancerMu.Unlock()
}
- ccb.ccUpdateCh <- ccs
}
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index a7643df..4414ba8 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -31,7 +31,7 @@
"time"
"google.golang.org/grpc/balancer"
- _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
+ "google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
@@ -42,10 +42,12 @@
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
- _ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
- _ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"
+
+ _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
+ _ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver.
+ _ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
)
const (
@@ -186,11 +188,11 @@
}
if cc.dopts.defaultServiceConfigRawJSON != nil {
- sc, err := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
- if err != nil {
- return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, err)
+ scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
+ if scpr.Err != nil {
+ return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
}
- cc.dopts.defaultServiceConfig = sc
+ cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
}
cc.mkp = cc.dopts.copts.KeepaliveParams
@@ -235,9 +237,7 @@
}
}
if cc.dopts.bs == nil {
- cc.dopts.bs = backoff.Exponential{
- MaxDelay: DefaultBackoffConfig.MaxDelay,
- }
+ cc.dopts.bs = backoff.DefaultExponential
}
if cc.dopts.resolverBuilder == nil {
// Only try to parse target when resolver builder is not already set.
@@ -443,7 +443,18 @@
return csm.notifyChan
}
-// ClientConn represents a client connection to an RPC server.
+// ClientConn represents a virtual connection to a conceptual endpoint, to
+// perform RPCs.
+//
+// A ClientConn is free to have zero or more actual connections to the endpoint
+// based on configuration, load, etc. It is also free to determine which actual
+// endpoints to use and may change it every RPC, permitting client-side load
+// balancing.
+//
+// A ClientConn encapsulates a range of functionality including name
+// resolution, TCP connection establishment (with retries and backoff) and TLS
+// handshakes. It also handles errors on established connections by
+// re-resolving the name and reconnecting.
type ClientConn struct {
ctx context.Context
cancel context.CancelFunc
@@ -532,58 +543,104 @@
}
}
-func (cc *ClientConn) updateResolverState(s resolver.State) error {
+var emptyServiceConfig *ServiceConfig
+
+func init() {
+ cfg := parseServiceConfig("{}")
+ if cfg.Err != nil {
+ panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
+ }
+ emptyServiceConfig = cfg.Config.(*ServiceConfig)
+}
+
+func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
+ if cc.sc != nil {
+ cc.applyServiceConfigAndBalancer(cc.sc, addrs)
+ return
+ }
+ if cc.dopts.defaultServiceConfig != nil {
+ cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, addrs)
+ } else {
+ cc.applyServiceConfigAndBalancer(emptyServiceConfig, addrs)
+ }
+}
+
+func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
+ defer cc.firstResolveEvent.Fire()
cc.mu.Lock()
- defer cc.mu.Unlock()
// Check if the ClientConn is already closed. Some fields (e.g.
// balancerWrapper) are set to nil when closing the ClientConn, and could
// cause nil pointer panic if we don't have this check.
if cc.conns == nil {
+ cc.mu.Unlock()
return nil
}
- if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
- if cc.dopts.defaultServiceConfig != nil && cc.sc == nil {
- cc.applyServiceConfig(cc.dopts.defaultServiceConfig)
+ if err != nil {
+ // May need to apply the initial service config in case the resolver
+ // doesn't support service configs, or doesn't provide a service config
+ // with the new addresses.
+ cc.maybeApplyDefaultServiceConfig(nil)
+
+ if cc.balancerWrapper != nil {
+ cc.balancerWrapper.resolverError(err)
}
- } else if sc, ok := s.ServiceConfig.(*ServiceConfig); ok {
- cc.applyServiceConfig(sc)
+
+ // No addresses are valid with err set; return early.
+ cc.mu.Unlock()
+ return balancer.ErrBadResolverState
+ }
+
+ var ret error
+ if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
+ cc.maybeApplyDefaultServiceConfig(s.Addresses)
+ // TODO: do we need to apply a failing LB policy if there is no
+ // default, per the error handling design?
+ } else {
+ if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
+ cc.applyServiceConfigAndBalancer(sc, s.Addresses)
+ } else {
+ ret = balancer.ErrBadResolverState
+ if cc.balancerWrapper == nil {
+ var err error
+ if s.ServiceConfig.Err != nil {
+ err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err)
+ } else {
+ err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)
+ }
+ cc.blockingpicker.updatePicker(base.NewErrPicker(err))
+ cc.csMgr.updateState(connectivity.TransientFailure)
+ cc.mu.Unlock()
+ return ret
+ }
+ }
}
var balCfg serviceconfig.LoadBalancingConfig
- if cc.dopts.balancerBuilder == nil {
- // Only look at balancer types and switch balancer if balancer dial
- // option is not set.
- var newBalancerName string
- if cc.sc != nil && cc.sc.lbConfig != nil {
- newBalancerName = cc.sc.lbConfig.name
- balCfg = cc.sc.lbConfig.cfg
- } else {
- var isGRPCLB bool
- for _, a := range s.Addresses {
- if a.Type == resolver.GRPCLB {
- isGRPCLB = true
- break
- }
- }
- if isGRPCLB {
- newBalancerName = grpclbName
- } else if cc.sc != nil && cc.sc.LB != nil {
- newBalancerName = *cc.sc.LB
- } else {
- newBalancerName = PickFirstBalancerName
- }
- }
- cc.switchBalancer(newBalancerName)
- } else if cc.balancerWrapper == nil {
- // Balancer dial option was set, and this is the first time handling
- // resolved addresses. Build a balancer with dopts.balancerBuilder.
- cc.curBalancerName = cc.dopts.balancerBuilder.Name()
- cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
+ if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {
+ balCfg = cc.sc.lbConfig.cfg
}
- cc.balancerWrapper.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
- return nil
+ cbn := cc.curBalancerName
+ bw := cc.balancerWrapper
+ cc.mu.Unlock()
+ if cbn != grpclbName {
+ // Filter any grpclb addresses since we don't have the grpclb balancer.
+ for i := 0; i < len(s.Addresses); {
+ if s.Addresses[i].Type == resolver.GRPCLB {
+ copy(s.Addresses[i:], s.Addresses[i+1:])
+ s.Addresses = s.Addresses[:len(s.Addresses)-1]
+ continue
+ }
+ i++
+ }
+ }
+ uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
+ if ret == nil {
+ ret = uccsErr // prefer ErrBadResolver state since any other error is
+ // currently meaningless to the caller.
+ }
+ return ret
}
// switchBalancer starts the switching from current balancer to the balancer
@@ -831,10 +888,10 @@
return t, done, nil
}
-func (cc *ClientConn) applyServiceConfig(sc *ServiceConfig) error {
+func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, addrs []resolver.Address) {
if sc == nil {
// should never reach here.
- return fmt.Errorf("got nil pointer for service config")
+ return
}
cc.sc = sc
@@ -850,7 +907,35 @@
cc.retryThrottler.Store((*retryThrottler)(nil))
}
- return nil
+ if cc.dopts.balancerBuilder == nil {
+ // Only look at balancer types and switch balancer if balancer dial
+ // option is not set.
+ var newBalancerName string
+ if cc.sc != nil && cc.sc.lbConfig != nil {
+ newBalancerName = cc.sc.lbConfig.name
+ } else {
+ var isGRPCLB bool
+ for _, a := range addrs {
+ if a.Type == resolver.GRPCLB {
+ isGRPCLB = true
+ break
+ }
+ }
+ if isGRPCLB {
+ newBalancerName = grpclbName
+ } else if cc.sc != nil && cc.sc.LB != nil {
+ newBalancerName = *cc.sc.LB
+ } else {
+ newBalancerName = PickFirstBalancerName
+ }
+ }
+ cc.switchBalancer(newBalancerName)
+ } else if cc.balancerWrapper == nil {
+ // Balancer dial option was set, and this is the first time handling
+ // resolved addresses. Build a balancer with dopts.balancerBuilder.
+ cc.curBalancerName = cc.dopts.balancerBuilder.Name()
+ cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
+ }
}
func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) {
@@ -875,8 +960,9 @@
// This API is EXPERIMENTAL.
func (cc *ClientConn) ResetConnectBackoff() {
cc.mu.Lock()
- defer cc.mu.Unlock()
- for ac := range cc.conns {
+ conns := cc.conns
+ cc.mu.Unlock()
+ for ac := range conns {
ac.resetConnectBackoff()
}
}
@@ -1136,10 +1222,16 @@
onCloseCalled := make(chan struct{})
reconnect := grpcsync.NewEvent()
+ authority := ac.cc.authority
+ // addr.ServerName takes precedent over ClientConn authority, if present.
+ if addr.ServerName != "" {
+ authority = addr.ServerName
+ }
+
target := transport.TargetInfo{
Addr: addr.Addr,
Metadata: addr.Metadata,
- Authority: ac.cc.authority,
+ Authority: authority,
}
once := sync.Once{}
@@ -1331,7 +1423,7 @@
curTr := ac.transport
ac.transport = nil
// We have to set the state to Shutdown before anything else to prevent races
- // between setting the state and logic that waits on context cancelation / etc.
+ // between setting the state and logic that waits on context cancellation / etc.
ac.updateConnectivityState(connectivity.Shutdown)
ac.cancel()
ac.curAddr = resolver.Address{}
@@ -1355,7 +1447,7 @@
},
})
// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
- // the entity beng deleted, and thus prevent it from being deleted right away.
+ // the entity being deleted, and thus prevent it from being deleted right away.
channelz.RemoveEntry(ac.channelzID)
}
ac.mu.Unlock()
diff --git a/vendor/google.golang.org/grpc/codegen.sh b/vendor/google.golang.org/grpc/codegen.sh
old mode 100755
new mode 100644
diff --git a/vendor/google.golang.org/grpc/credentials/credentials.go b/vendor/google.golang.org/grpc/credentials/credentials.go
index 8ea3d4a..c690161 100644
--- a/vendor/google.golang.org/grpc/credentials/credentials.go
+++ b/vendor/google.golang.org/grpc/credentials/credentials.go
@@ -30,10 +30,11 @@
"fmt"
"io/ioutil"
"net"
- "strings"
"github.com/golang/protobuf/proto"
+
"google.golang.org/grpc/credentials/internal"
+ ginternal "google.golang.org/grpc/internal"
)
// PerRPCCredentials defines the common interface for the credentials which need to
@@ -45,7 +46,8 @@
// context. If a status code is returned, it will be used as the status
// for the RPC. uri is the URI of the entry point for the request.
// When supported by the underlying implementation, ctx can be used for
- // timeout and cancellation.
+ // timeout and cancellation. Additionally, RequestInfo data will be
+ // available via ctx to this call.
// TODO(zhaoq): Define the set of the qualified keys instead of leaving
// it as an arbitrary string.
GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error)
@@ -166,11 +168,12 @@
// use local cfg to avoid clobbering ServerName if using multiple endpoints
cfg := cloneTLSConfig(c.config)
if cfg.ServerName == "" {
- colonPos := strings.LastIndex(authority, ":")
- if colonPos == -1 {
- colonPos = len(authority)
+ serverName, _, err := net.SplitHostPort(authority)
+ if err != nil {
+ // If the authority had no host port or if the authority cannot be parsed, use it as-is.
+ serverName = authority
}
- cfg.ServerName = authority[:colonPos]
+ cfg.ServerName = serverName
}
conn := tls.Client(rawConn, cfg)
errChannel := make(chan error, 1)
@@ -334,3 +337,28 @@
return cfg.Clone()
}
+
+// RequestInfo contains request data attached to the context passed to GetRequestMetadata calls.
+//
+// This API is experimental.
+type RequestInfo struct {
+ // The method passed to Invoke or NewStream for this RPC. (For proto methods, this has the format "/some.Service/Method")
+ Method string
+}
+
+// requestInfoKey is a struct to be used as the key when attaching a RequestInfo to a context object.
+type requestInfoKey struct{}
+
+// RequestInfoFromContext extracts the RequestInfo from the context if it exists.
+//
+// This API is experimental.
+func RequestInfoFromContext(ctx context.Context) (ri RequestInfo, ok bool) {
+ ri, ok = ctx.Value(requestInfoKey{}).(RequestInfo)
+ return
+}
+
+func init() {
+ ginternal.NewRequestInfoContext = func(ctx context.Context, ri RequestInfo) context.Context {
+ return context.WithValue(ctx, requestInfoKey{}, ri)
+ }
+}
diff --git a/vendor/google.golang.org/grpc/dialoptions.go b/vendor/google.golang.org/grpc/dialoptions.go
index e8f34d0..9f872df 100644
--- a/vendor/google.golang.org/grpc/dialoptions.go
+++ b/vendor/google.golang.org/grpc/dialoptions.go
@@ -24,11 +24,12 @@
"net"
"time"
+ "google.golang.org/grpc/backoff"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
- "google.golang.org/grpc/internal/backoff"
+ internalbackoff "google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
@@ -47,7 +48,7 @@
cp Compressor
dc Decompressor
- bs backoff.Strategy
+ bs internalbackoff.Strategy
block bool
insecure bool
timeout time.Duration
@@ -68,6 +69,10 @@
minConnectTimeout func() time.Duration
defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON.
defaultServiceConfigRawJSON *string
+ // This is used by ccResolverWrapper to backoff between successive calls to
+ // resolver.ResolveNow(). The user will have no need to configure this, but
+ // we need to be able to configure this in tests.
+ resolveNowBackoff func(int) time.Duration
}
// DialOption configures how we set up the connection.
@@ -246,8 +251,28 @@
})
}
+// WithConnectParams configures the dialer to use the provided ConnectParams.
+//
+// The backoff configuration specified as part of the ConnectParams overrides
+// all defaults specified in
+// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md. Consider
+// using the backoff.DefaultConfig as a base, in cases where you want to
+// override only a subset of the backoff configuration.
+//
+// This API is EXPERIMENTAL.
+func WithConnectParams(p ConnectParams) DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.bs = internalbackoff.Exponential{Config: p.Backoff}
+ o.minConnectTimeout = func() time.Duration {
+ return p.MinConnectTimeout
+ }
+ })
+}
+
// WithBackoffMaxDelay configures the dialer to use the provided maximum delay
// when backing off after failed connection attempts.
+//
+// Deprecated: use WithConnectParams instead. Will be supported throughout 1.x.
func WithBackoffMaxDelay(md time.Duration) DialOption {
return WithBackoffConfig(BackoffConfig{MaxDelay: md})
}
@@ -255,19 +280,18 @@
// WithBackoffConfig configures the dialer to use the provided backoff
// parameters after connection failures.
//
-// Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up
-// for use.
+// Deprecated: use WithConnectParams instead. Will be supported throughout 1.x.
func WithBackoffConfig(b BackoffConfig) DialOption {
- return withBackoff(backoff.Exponential{
- MaxDelay: b.MaxDelay,
- })
+ bc := backoff.DefaultConfig
+ bc.MaxDelay = b.MaxDelay
+ return withBackoff(internalbackoff.Exponential{Config: bc})
}
// withBackoff sets the backoff strategy used for connectRetryNum after a failed
// connection attempt.
//
// This can be exported if arbitrary backoff strategies are allowed by gRPC.
-func withBackoff(bs backoff.Strategy) DialOption {
+func withBackoff(bs internalbackoff.Strategy) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.bs = bs
})
@@ -539,6 +563,7 @@
WriteBufferSize: defaultWriteBufSize,
ReadBufferSize: defaultReadBufSize,
},
+ resolveNowBackoff: internalbackoff.DefaultExponential.Backoff,
}
}
@@ -552,3 +577,13 @@
o.minConnectTimeout = f
})
}
+
+// withResolveNowBackoff specifies the function that clientconn uses to backoff
+// between successive calls to resolver.ResolveNow().
+//
+// For testing purpose only.
+func withResolveNowBackoff(f func(int) time.Duration) DialOption {
+ return newFuncDialOption(func(o *dialOptions) {
+ o.resolveNowBackoff = f
+ })
+}
diff --git a/vendor/google.golang.org/grpc/encoding/encoding.go b/vendor/google.golang.org/grpc/encoding/encoding.go
index 30a75da..195e844 100644
--- a/vendor/google.golang.org/grpc/encoding/encoding.go
+++ b/vendor/google.golang.org/grpc/encoding/encoding.go
@@ -46,6 +46,10 @@
// coding header. The result must be static; the result cannot change
// between calls.
Name() string
+ // EXPERIMENTAL: if a Compressor implements
+ // DecompressedSize(compressedBytes []byte) int, gRPC will call it
+ // to determine the size of the buffer allocated for the result of decompression.
+ // Return -1 to indicate unknown size.
}
var registeredCompressor = make(map[string]Compressor)
diff --git a/vendor/google.golang.org/grpc/go.mod b/vendor/google.golang.org/grpc/go.mod
index c1a8340..dc56aa7 100644
--- a/vendor/google.golang.org/grpc/go.mod
+++ b/vendor/google.golang.org/grpc/go.mod
@@ -1,19 +1,16 @@
module google.golang.org/grpc
+go 1.11
+
require (
- cloud.google.com/go v0.26.0 // indirect
- github.com/BurntSushi/toml v0.3.1 // indirect
- github.com/client9/misspell v0.3.4
+ github.com/envoyproxy/go-control-plane v0.9.0
+ github.com/envoyproxy/protoc-gen-validate v0.1.0
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/mock v1.1.1
- github.com/golang/protobuf v1.2.0
+ github.com/golang/protobuf v1.3.2
github.com/google/go-cmp v0.2.0
- golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3
golang.org/x/net v0.0.0-20190311183353-d8887717615a
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a
- golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135
- google.golang.org/appengine v1.1.0 // indirect
- google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8
- honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc
+ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55
)
diff --git a/vendor/google.golang.org/grpc/go.sum b/vendor/google.golang.org/grpc/go.sum
index 741677d..f6a4784 100644
--- a/vendor/google.golang.org/grpc/go.sum
+++ b/vendor/google.golang.org/grpc/go.sum
@@ -1,37 +1,53 @@
cloud.google.com/go v0.26.0 h1:e0WKqKTd5BnrG8aKH3J3h+QvEIQtSUcf2n5UZ5ZgLtQ=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
-github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
-github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI=
+github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
+github.com/envoyproxy/go-control-plane v0.9.0 h1:67WMNTvGrl7V1dWdKCeTwxDr7nio9clKoTlLhwIPnT4=
+github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
+github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A=
+github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1 h1:G5FRp8JnTd7RQH5kemVNlMeyXQAztQ3mOWV95KxsXH8=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
-github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
+github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
+github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
-golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 h1:XQyxROzUlZH+WIQwySDgnISgOivlhjIEwaQaJEJrrN0=
+golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
+golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
+golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be h1:vEDujvNQGv4jgYKudGeI/+DAX4Jffq6hpD55MmoEvKs=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
-golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
-golang.org/x/tools v0.0.0-20190311212946-11955173bddd h1:/e+gpKk9r3dJobndpTytxS2gOy6m5uvpg+ISQoEcusQ=
+golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
-golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 h1:5Beo0mZN8dRzgrMMkDp0jc8YXQKx9DiJ2k1dkvGsn5A=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
-google.golang.org/appengine v1.1.0 h1:igQkv0AAhEIvTEpD5LIpAfav2eeVO9HBTjvKHVJPRSs=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
-google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc=
+google.golang.org/appengine v1.4.0 h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO508=
+google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
-honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc h1:/hemPrYIhOhy8zYrNj+069zDB68us2sMGsfkFJO0iZs=
+google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE=
+google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
+google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
+google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
+honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
diff --git a/vendor/google.golang.org/grpc/grpclog/grpclog.go b/vendor/google.golang.org/grpc/grpclog/grpclog.go
index 51bb945..874ea6d 100644
--- a/vendor/google.golang.org/grpc/grpclog/grpclog.go
+++ b/vendor/google.golang.org/grpc/grpclog/grpclog.go
@@ -89,7 +89,7 @@
}
// Fatalf logs to the FATAL log. Arguments are handled in the manner of fmt.Printf.
-// It calles os.Exit() with exit code 1.
+// It calls os.Exit() with exit code 1.
func Fatalf(format string, args ...interface{}) {
logger.Fatalf(format, args...)
// Make sure fatal logs will exit.
diff --git a/vendor/google.golang.org/grpc/install_gae.sh b/vendor/google.golang.org/grpc/install_gae.sh
old mode 100755
new mode 100644
diff --git a/vendor/google.golang.org/grpc/internal/backoff/backoff.go b/vendor/google.golang.org/grpc/internal/backoff/backoff.go
index 1bd0cce..5fc0ee3 100644
--- a/vendor/google.golang.org/grpc/internal/backoff/backoff.go
+++ b/vendor/google.golang.org/grpc/internal/backoff/backoff.go
@@ -25,44 +25,39 @@
import (
"time"
+ grpcbackoff "google.golang.org/grpc/backoff"
"google.golang.org/grpc/internal/grpcrand"
)
// Strategy defines the methodology for backing off after a grpc connection
// failure.
-//
type Strategy interface {
// Backoff returns the amount of time to wait before the next retry given
// the number of consecutive failures.
Backoff(retries int) time.Duration
}
-const (
- // baseDelay is the amount of time to wait before retrying after the first
- // failure.
- baseDelay = 1.0 * time.Second
- // factor is applied to the backoff after each retry.
- factor = 1.6
- // jitter provides a range to randomize backoff delays.
- jitter = 0.2
-)
+// DefaultExponential is an exponential backoff implementation using the
+// default values for all the configurable knobs defined in
+// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md.
+var DefaultExponential = Exponential{Config: grpcbackoff.DefaultConfig}
// Exponential implements exponential backoff algorithm as defined in
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md.
type Exponential struct {
- // MaxDelay is the upper bound of backoff delay.
- MaxDelay time.Duration
+ // Config contains all options to configure the backoff algorithm.
+ Config grpcbackoff.Config
}
// Backoff returns the amount of time to wait before the next retry given the
// number of retries.
func (bc Exponential) Backoff(retries int) time.Duration {
if retries == 0 {
- return baseDelay
+ return bc.Config.BaseDelay
}
- backoff, max := float64(baseDelay), float64(bc.MaxDelay)
+ backoff, max := float64(bc.Config.BaseDelay), float64(bc.Config.MaxDelay)
for backoff < max && retries > 0 {
- backoff *= factor
+ backoff *= bc.Config.Multiplier
retries--
}
if backoff > max {
@@ -70,7 +65,7 @@
}
// Randomize backoff delays so that if a cluster of requests start at
// the same time, they won't operate in lockstep.
- backoff *= 1 + jitter*(grpcrand.Float64()*2-1)
+ backoff *= 1 + bc.Config.Jitter*(grpcrand.Float64()*2-1)
if backoff < 0 {
return 0
}
diff --git a/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go b/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go
index fee6aec..4062c02 100644
--- a/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go
+++ b/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go
@@ -34,7 +34,7 @@
}
// binLogger is the global binary logger for the binary. One of this should be
-// built at init time from the configuration (environment varialbe or flags).
+// built at init time from the configuration (environment variable or flags).
//
// It is used to get a methodLogger for each individual method.
var binLogger Logger
diff --git a/vendor/google.golang.org/grpc/internal/binarylog/env_config.go b/vendor/google.golang.org/grpc/internal/binarylog/env_config.go
index 4cc2525..be30d0e 100644
--- a/vendor/google.golang.org/grpc/internal/binarylog/env_config.go
+++ b/vendor/google.golang.org/grpc/internal/binarylog/env_config.go
@@ -43,7 +43,7 @@
// Foo.
//
// If two configs exist for one certain method or service, the one specified
-// later overrides the privous config.
+// later overrides the previous config.
func NewLoggerFromConfigString(s string) Logger {
if s == "" {
return nil
@@ -74,7 +74,7 @@
return fmt.Errorf("invalid config: %q, %v", config, err)
}
if m == "*" {
- return fmt.Errorf("invalid config: %q, %v", config, "* not allowd in blacklist config")
+ return fmt.Errorf("invalid config: %q, %v", config, "* not allowed in blacklist config")
}
if suffix != "" {
return fmt.Errorf("invalid config: %q, %v", config, "header/message limit not allowed in blacklist config")
diff --git a/vendor/google.golang.org/grpc/internal/binarylog/regenerate.sh b/vendor/google.golang.org/grpc/internal/binarylog/regenerate.sh
old mode 100755
new mode 100644
diff --git a/vendor/google.golang.org/grpc/internal/binarylog/sink.go b/vendor/google.golang.org/grpc/internal/binarylog/sink.go
index 20d044f..a2e7c34 100644
--- a/vendor/google.golang.org/grpc/internal/binarylog/sink.go
+++ b/vendor/google.golang.org/grpc/internal/binarylog/sink.go
@@ -63,7 +63,7 @@
// newWriterSink creates a binary log sink with the given writer.
//
-// Write() marshalls the proto message and writes it to the given writer. Each
+// Write() marshals the proto message and writes it to the given writer. Each
// message is prefixed with a 4 byte big endian unsigned integer as the length.
//
// No buffer is done, Close() doesn't try to close the writer.
diff --git a/vendor/google.golang.org/grpc/internal/buffer/unbounded.go b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go
new file mode 100644
index 0000000..2cb3109
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2019 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package buffer provides an implementation of an unbounded buffer.
+package buffer
+
+import "sync"
+
+// Unbounded is an implementation of an unbounded buffer which does not use
+// extra goroutines. This is typically used for passing updates from one entity
+// to another within gRPC.
+//
+// All methods on this type are thread-safe and don't block on anything except
+// the underlying mutex used for synchronization.
+type Unbounded struct {
+ c chan interface{}
+ mu sync.Mutex
+ backlog []interface{}
+}
+
+// NewUnbounded returns a new instance of Unbounded.
+func NewUnbounded() *Unbounded {
+ return &Unbounded{c: make(chan interface{}, 1)}
+}
+
+// Put adds t to the unbounded buffer.
+func (b *Unbounded) Put(t interface{}) {
+ b.mu.Lock()
+ if len(b.backlog) == 0 {
+ select {
+ case b.c <- t:
+ b.mu.Unlock()
+ return
+ default:
+ }
+ }
+ b.backlog = append(b.backlog, t)
+ b.mu.Unlock()
+}
+
+// Load sends the earliest buffered data, if any, onto the read channel
+// returned by Get(). Users are expected to call this every time they read a
+// value from the read channel.
+func (b *Unbounded) Load() {
+ b.mu.Lock()
+ if len(b.backlog) > 0 {
+ select {
+ case b.c <- b.backlog[0]:
+ b.backlog[0] = nil
+ b.backlog = b.backlog[1:]
+ default:
+ }
+ }
+ b.mu.Unlock()
+}
+
+// Get returns a read channel on which values added to the buffer, via Put(),
+// are sent on.
+//
+// Upon reading a value from this channel, users are expected to call Load() to
+// send the next buffered value onto the channel if there is any.
+func (b *Unbounded) Get() <-chan interface{} {
+ return b.c
+}
diff --git a/vendor/google.golang.org/grpc/internal/internal.go b/vendor/google.golang.org/grpc/internal/internal.go
index bc1f99a..b96b359 100644
--- a/vendor/google.golang.org/grpc/internal/internal.go
+++ b/vendor/google.golang.org/grpc/internal/internal.go
@@ -28,9 +28,9 @@
)
var (
- // WithResolverBuilder is exported by dialoptions.go
+ // WithResolverBuilder is set by dialoptions.go
WithResolverBuilder interface{} // func (resolver.Builder) grpc.DialOption
- // WithHealthCheckFunc is not exported by dialoptions.go
+ // WithHealthCheckFunc is set by dialoptions.go
WithHealthCheckFunc interface{} // func (HealthChecker) DialOption
// HealthCheckFunc is used to provide client-side LB channel health checking
HealthCheckFunc HealthChecker
@@ -39,14 +39,17 @@
// KeepaliveMinPingTime is the minimum ping interval. This must be 10s by
// default, but tests may wish to set it lower for convenience.
KeepaliveMinPingTime = 10 * time.Second
- // ParseServiceConfig is a function to parse JSON service configs into
- // opaque data structures.
- ParseServiceConfig func(sc string) (interface{}, error)
// StatusRawProto is exported by status/status.go. This func returns a
// pointer to the wrapped Status proto for a given status.Status without a
// call to proto.Clone(). The returned Status proto should not be mutated by
// the caller.
StatusRawProto interface{} // func (*status.Status) *spb.Status
+ // NewRequestInfoContext creates a new context based on the argument context attaching
+ // the passed in RequestInfo to the new context.
+ NewRequestInfoContext interface{} // func(context.Context, credentials.RequestInfo) context.Context
+ // ParseServiceConfigForTesting is for creating a fake
+ // ClientConn for resolver testing only
+ ParseServiceConfigForTesting interface{} // func(string) *serviceconfig.ParseResult
)
// HealthChecker defines the signature of the client-side LB channel health checking function.
diff --git a/vendor/google.golang.org/grpc/resolver/dns/dns_resolver.go b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
similarity index 97%
rename from vendor/google.golang.org/grpc/resolver/dns/dns_resolver.go
rename to vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
index 297492e..abc0f92 100644
--- a/vendor/google.golang.org/grpc/resolver/dns/dns_resolver.go
+++ b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
@@ -32,8 +32,9 @@
"sync"
"time"
+ "google.golang.org/grpc/backoff"
"google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/internal/backoff"
+ internalbackoff "google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/resolver"
)
@@ -126,9 +127,11 @@
// DNS address (non-IP).
ctx, cancel := context.WithCancel(context.Background())
+ bc := backoff.DefaultConfig
+ bc.MaxDelay = b.minFreq
d := &dnsResolver{
freq: b.minFreq,
- backoff: backoff.Exponential{MaxDelay: b.minFreq},
+ backoff: internalbackoff.Exponential{Config: bc},
host: host,
port: port,
ctx: ctx,
@@ -200,7 +203,7 @@
// dnsResolver watches for the name resolution update for a non-IP target.
type dnsResolver struct {
freq time.Duration
- backoff backoff.Exponential
+ backoff internalbackoff.Exponential
retryCount int
host string
port string
diff --git a/vendor/google.golang.org/grpc/resolver/passthrough/passthrough.go b/vendor/google.golang.org/grpc/internal/resolver/passthrough/passthrough.go
similarity index 100%
rename from vendor/google.golang.org/grpc/resolver/passthrough/passthrough.go
rename to vendor/google.golang.org/grpc/internal/resolver/passthrough/passthrough.go
diff --git a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
index b8e0aa4..ddee20b 100644
--- a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
+++ b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
@@ -107,8 +107,8 @@
type headerFrame struct {
streamID uint32
hf []hpack.HeaderField
- endStream bool // Valid on server side.
- initStream func(uint32) (bool, error) // Used only on the client side.
+ endStream bool // Valid on server side.
+ initStream func(uint32) error // Used only on the client side.
onWrite func()
wq *writeQuota // write quota for the stream created.
cleanup *cleanupStream // Valid on the server side.
@@ -637,21 +637,17 @@
func (l *loopyWriter) originateStream(str *outStream) error {
hdr := str.itl.dequeue().(*headerFrame)
- sendPing, err := hdr.initStream(str.id)
- if err != nil {
+ if err := hdr.initStream(str.id); err != nil {
if err == ErrConnClosing {
return err
}
// Other errors(errStreamDrain) need not close transport.
return nil
}
- if err = l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
+ if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
return err
}
l.estdStreams[str.id] = str
- if sendPing {
- return l.pingHandler(&ping{data: [8]byte{}})
- }
return nil
}
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
index 41a79c5..294661a 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
@@ -35,6 +35,7 @@
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/syscall"
"google.golang.org/grpc/keepalive"
@@ -46,6 +47,7 @@
// http2Client implements the ClientTransport interface with HTTP2.
type http2Client struct {
+ lastRead int64 // keep this field 64-bit aligned
ctx context.Context
cancel context.CancelFunc
ctxDone <-chan struct{} // Cache the ctx.Done() chan.
@@ -62,8 +64,6 @@
// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
// that the server sent GoAway on this transport.
goAway chan struct{}
- // awakenKeepalive is used to wake up keepalive when after it has gone dormant.
- awakenKeepalive chan struct{}
framer *framer
// controlBuf delivers all the control related tasks (e.g., window
@@ -77,9 +77,6 @@
perRPCCreds []credentials.PerRPCCredentials
- // Boolean to keep track of reading activity on transport.
- // 1 is true and 0 is false.
- activity uint32 // Accessed atomically.
kp keepalive.ClientParameters
keepaliveEnabled bool
@@ -110,6 +107,16 @@
// goAwayReason records the http2.ErrCode and debug data received with the
// GoAway frame.
goAwayReason GoAwayReason
+ // A condition variable used to signal when the keepalive goroutine should
+ // go dormant. The condition for dormancy is based on the number of active
+ // streams and the `PermitWithoutStream` keepalive client parameter. And
+ // since the number of active streams is guarded by the above mutex, we use
+ // the same for this condition variable as well.
+ kpDormancyCond *sync.Cond
+ // A boolean to track whether the keepalive goroutine is dormant or not.
+ // This is checked before attempting to signal the above condition
+ // variable.
+ kpDormant bool
// Fields below are for channelz metric collection.
channelzID int64 // channelz unique identification number
@@ -232,7 +239,6 @@
readerDone: make(chan struct{}),
writerDone: make(chan struct{}),
goAway: make(chan struct{}),
- awakenKeepalive: make(chan struct{}, 1),
framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
fc: &trInFlow{limit: uint32(icwz)},
scheme: scheme,
@@ -264,9 +270,6 @@
updateFlowControl: t.updateFlowControl,
}
}
- // Make sure awakenKeepalive can't be written upon.
- // keepalive routine will make it writable, if need be.
- t.awakenKeepalive <- struct{}{}
if t.statsHandler != nil {
t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
@@ -281,6 +284,7 @@
t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
}
if t.keepaliveEnabled {
+ t.kpDormancyCond = sync.NewCond(&t.mu)
go t.keepalive()
}
// Start the reader goroutine for incoming message. Each transport has
@@ -347,6 +351,7 @@
func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
// TODO(zhaoq): Handle uint32 overflow of Stream.id.
s := &Stream{
+ ct: t,
done: make(chan struct{}),
method: callHdr.Method,
sendCompress: callHdr.SendCompress,
@@ -380,23 +385,23 @@
}
func (t *http2Client) getPeer() *peer.Peer {
- pr := &peer.Peer{
- Addr: t.remoteAddr,
+ return &peer.Peer{
+ Addr: t.remoteAddr,
+ AuthInfo: t.authInfo,
}
- // Attach Auth info if there is any.
- if t.authInfo != nil {
- pr.AuthInfo = t.authInfo
- }
- return pr
}
func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
aud := t.createAudience(callHdr)
- authData, err := t.getTrAuthData(ctx, aud)
+ ri := credentials.RequestInfo{
+ Method: callHdr.Method,
+ }
+ ctxWithRequestInfo := internal.NewRequestInfoContext.(func(context.Context, credentials.RequestInfo) context.Context)(ctx, ri)
+ authData, err := t.getTrAuthData(ctxWithRequestInfo, aud)
if err != nil {
return nil, err
}
- callAuthData, err := t.getCallAuthData(ctx, aud, callHdr)
+ callAuthData, err := t.getCallAuthData(ctxWithRequestInfo, aud, callHdr)
if err != nil {
return nil, err
}
@@ -564,7 +569,7 @@
hdr := &headerFrame{
hf: headerFields,
endStream: false,
- initStream: func(id uint32) (bool, error) {
+ initStream: func(id uint32) error {
t.mu.Lock()
if state := t.state; state != reachable {
t.mu.Unlock()
@@ -574,29 +579,19 @@
err = ErrConnClosing
}
cleanup(err)
- return false, err
+ return err
}
t.activeStreams[id] = s
if channelz.IsOn() {
atomic.AddInt64(&t.czData.streamsStarted, 1)
atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
}
- var sendPing bool
- // If the number of active streams change from 0 to 1, then check if keepalive
- // has gone dormant. If so, wake it up.
- if len(t.activeStreams) == 1 && t.keepaliveEnabled {
- select {
- case t.awakenKeepalive <- struct{}{}:
- sendPing = true
- // Fill the awakenKeepalive channel again as this channel must be
- // kept non-writable except at the point that the keepalive()
- // goroutine is waiting either to be awaken or shutdown.
- t.awakenKeepalive <- struct{}{}
- default:
- }
+ // If the keepalive goroutine has gone dormant, wake it up.
+ if t.kpDormant {
+ t.kpDormancyCond.Signal()
}
t.mu.Unlock()
- return sendPing, nil
+ return nil
},
onOrphaned: cleanup,
wq: s.wq,
@@ -778,6 +773,11 @@
t.state = closing
streams := t.activeStreams
t.activeStreams = nil
+ if t.kpDormant {
+ // If the keepalive goroutine is blocked on this condition variable, we
+ // should unblock it so that the goroutine eventually exits.
+ t.kpDormancyCond.Signal()
+ }
t.mu.Unlock()
t.controlBuf.finish()
t.cancel()
@@ -853,11 +853,11 @@
return t.controlBuf.put(df)
}
-func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
+func (t *http2Client) getStream(f http2.Frame) *Stream {
t.mu.Lock()
- defer t.mu.Unlock()
- s, ok := t.activeStreams[f.Header().StreamID]
- return s, ok
+ s := t.activeStreams[f.Header().StreamID]
+ t.mu.Unlock()
+ return s
}
// adjustWindow sends out extra window update over the initial window size
@@ -937,8 +937,8 @@
t.controlBuf.put(bdpPing)
}
// Select the right stream to dispatch.
- s, ok := t.getStream(f)
- if !ok {
+ s := t.getStream(f)
+ if s == nil {
return
}
if size > 0 {
@@ -969,8 +969,8 @@
}
func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
- s, ok := t.getStream(f)
- if !ok {
+ s := t.getStream(f)
+ if s == nil {
return
}
if f.ErrCode == http2.ErrCodeRefusedStream {
@@ -1147,8 +1147,8 @@
// operateHeaders takes action on the decoded headers.
func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
- s, ok := t.getStream(frame)
- if !ok {
+ s := t.getStream(frame)
+ if s == nil {
return
}
endStream := frame.StreamEnded()
@@ -1191,6 +1191,7 @@
// If headerChan hasn't been closed yet
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
+ s.headerValid = true
if !endStream {
// HEADERS frame block carries a Response-Headers.
isHeader = true
@@ -1233,7 +1234,7 @@
}
t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
if t.keepaliveEnabled {
- atomic.CompareAndSwapUint32(&t.activity, 0, 1)
+ atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
}
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
@@ -1248,7 +1249,7 @@
t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame()
if t.keepaliveEnabled {
- atomic.CompareAndSwapUint32(&t.activity, 0, 1)
+ atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
}
if err != nil {
// Abort an active stream if the http2.Framer returns a
@@ -1292,56 +1293,84 @@
}
}
+func minTime(a, b time.Duration) time.Duration {
+ if a < b {
+ return a
+ }
+ return b
+}
+
// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
func (t *http2Client) keepalive() {
p := &ping{data: [8]byte{}}
+ // True iff a ping has been sent, and no data has been received since then.
+ outstandingPing := false
+ // Amount of time remaining before which we should receive an ACK for the
+ // last sent ping.
+ timeoutLeft := time.Duration(0)
+ // Records the last value of t.lastRead before we go block on the timer.
+ // This is required to check for read activity since then.
+ prevNano := time.Now().UnixNano()
timer := time.NewTimer(t.kp.Time)
for {
select {
case <-timer.C:
- if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
- timer.Reset(t.kp.Time)
+ lastRead := atomic.LoadInt64(&t.lastRead)
+ if lastRead > prevNano {
+ // There has been read activity since the last time we were here.
+ outstandingPing = false
+ // Next timer should fire at kp.Time seconds from lastRead time.
+ timer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
+ prevNano = lastRead
continue
}
- // Check if keepalive should go dormant.
+ if outstandingPing && timeoutLeft <= 0 {
+ t.Close()
+ return
+ }
t.mu.Lock()
+ if t.state == closing {
+ // If the transport is closing, we should exit from the
+ // keepalive goroutine here. If not, we could have a race
+ // between the call to Signal() from Close() and the call to
+ // Wait() here, whereby the keepalive goroutine ends up
+ // blocking on the condition variable which will never be
+ // signalled again.
+ t.mu.Unlock()
+ return
+ }
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
- // Make awakenKeepalive writable.
- <-t.awakenKeepalive
- t.mu.Unlock()
- select {
- case <-t.awakenKeepalive:
- // If the control gets here a ping has been sent
- // need to reset the timer with keepalive.Timeout.
- case <-t.ctx.Done():
- return
- }
- } else {
- t.mu.Unlock()
+ // If a ping was sent out previously (because there were active
+ // streams at that point) which wasn't acked and its timeout
+ // hadn't fired, but we got here and are about to go dormant,
+ // we should make sure that we unconditionally send a ping once
+ // we awaken.
+ outstandingPing = false
+ t.kpDormant = true
+ t.kpDormancyCond.Wait()
+ }
+ t.kpDormant = false
+ t.mu.Unlock()
+
+ // We get here either because we were dormant and a new stream was
+ // created which unblocked the Wait() call, or because the
+ // keepalive timer expired. In both cases, we need to send a ping.
+ if !outstandingPing {
if channelz.IsOn() {
atomic.AddInt64(&t.czData.kpCount, 1)
}
- // Send ping.
t.controlBuf.put(p)
+ timeoutLeft = t.kp.Timeout
+ outstandingPing = true
}
-
- // By the time control gets here a ping has been sent one way or the other.
- timer.Reset(t.kp.Timeout)
- select {
- case <-timer.C:
- if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
- timer.Reset(t.kp.Time)
- continue
- }
- infof("transport: closing client transport due to idleness.")
- t.Close()
- return
- case <-t.ctx.Done():
- if !timer.Stop() {
- <-timer.C
- }
- return
- }
+ // The amount of time to sleep here is the minimum of kp.Time and
+ // timeoutLeft. This will ensure that we wait only for kp.Time
+ // before sending out the next ping (for cases where the ping is
+ // acked).
+ sleepDuration := minTime(t.kp.Time, timeoutLeft)
+ timeoutLeft -= sleepDuration
+ prevNano = lastRead
+ timer.Reset(sleepDuration)
case <-t.ctx.Done():
if !timer.Stop() {
<-timer.C
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
index 4e26f6a..0760383 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
@@ -65,8 +65,7 @@
// http2Server implements the ServerTransport interface with HTTP2.
type http2Server struct {
ctx context.Context
- ctxDone <-chan struct{} // Cache the context.Done() chan
- cancel context.CancelFunc
+ done chan struct{}
conn net.Conn
loopy *loopyWriter
readerDone chan struct{} // sync point to enable testing.
@@ -175,6 +174,12 @@
Val: *config.MaxHeaderListSize,
})
}
+ if config.HeaderTableSize != nil {
+ isettings = append(isettings, http2.Setting{
+ ID: http2.SettingHeaderTableSize,
+ Val: *config.HeaderTableSize,
+ })
+ }
if err := framer.fr.WriteSettings(isettings...); err != nil {
return nil, connectionErrorf(false, err, "transport: %v", err)
}
@@ -206,11 +211,10 @@
if kep.MinTime == 0 {
kep.MinTime = defaultKeepalivePolicyMinTime
}
- ctx, cancel := context.WithCancel(context.Background())
+ done := make(chan struct{})
t := &http2Server{
- ctx: ctx,
- cancel: cancel,
- ctxDone: ctx.Done(),
+ ctx: context.Background(),
+ done: done,
conn: conn,
remoteAddr: conn.RemoteAddr(),
localAddr: conn.LocalAddr(),
@@ -231,7 +235,7 @@
czData: new(channelzData),
bufferPool: newBufferPool(),
}
- t.controlBuf = newControlBuffer(t.ctxDone)
+ t.controlBuf = newControlBuffer(t.done)
if dynamicWindow {
t.bdpEst = &bdpEstimator{
bdp: initialWindowSize,
@@ -362,12 +366,14 @@
rstCode: http2.ErrCodeRefusedStream,
onWrite: func() {},
})
+ s.cancel()
return false
}
}
t.mu.Lock()
if t.state != reachable {
t.mu.Unlock()
+ s.cancel()
return false
}
if uint32(len(t.activeStreams)) >= t.maxStreams {
@@ -378,12 +384,14 @@
rstCode: http2.ErrCodeRefusedStream,
onWrite: func() {},
})
+ s.cancel()
return false
}
if streamID%2 != 1 || streamID <= t.maxStreamID {
t.mu.Unlock()
// illegal gRPC stream id.
errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
+ s.cancel()
return true
}
t.maxStreamID = streamID
@@ -749,7 +757,7 @@
return true
}
-// WriteHeader sends the header metedata md back to the client.
+// WriteHeader sends the header metadata md back to the client.
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
if s.updateHeaderSent() || s.getState() == streamDone {
return ErrIllegalHeaderWrite
@@ -885,7 +893,7 @@
// TODO(mmukhi, dfawley): Should the server write also return io.EOF?
s.cancel()
select {
- case <-t.ctx.Done():
+ case <-t.done:
return ErrConnClosing
default:
}
@@ -907,7 +915,7 @@
}
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
select {
- case <-t.ctx.Done():
+ case <-t.done:
return ErrConnClosing
default:
}
@@ -973,7 +981,7 @@
t.Close()
// Resetting the timer so that the clean-up doesn't deadlock.
maxAge.Reset(infinity)
- case <-t.ctx.Done():
+ case <-t.done:
}
return
case <-keepalive.C:
@@ -995,7 +1003,7 @@
}
t.controlBuf.put(p)
keepalive.Reset(t.kp.Timeout)
- case <-t.ctx.Done():
+ case <-t.done:
return
}
}
@@ -1015,7 +1023,7 @@
t.activeStreams = nil
t.mu.Unlock()
t.controlBuf.finish()
- t.cancel()
+ close(t.done)
err := t.conn.Close()
if channelz.IsOn() {
channelz.RemoveEntry(t.channelzID)
@@ -1155,7 +1163,7 @@
select {
case <-t.drainChan:
case <-timer.C:
- case <-t.ctx.Done():
+ case <-t.done:
return
}
t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
@@ -1205,7 +1213,7 @@
select {
case sz := <-resp:
return int64(sz)
- case <-t.ctxDone:
+ case <-t.done:
return -1
case <-timer.C:
return -2
diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go
index 1c1d106..bfab940 100644
--- a/vendor/google.golang.org/grpc/internal/transport/transport.go
+++ b/vendor/google.golang.org/grpc/internal/transport/transport.go
@@ -233,6 +233,7 @@
type Stream struct {
id uint32
st ServerTransport // nil for client side Stream
+ ct *http2Client // nil for server side Stream
ctx context.Context // the associated context of the stream
cancel context.CancelFunc // always nil for client side Stream
done chan struct{} // closed at the end of stream to unblock writers. On the client side.
@@ -251,6 +252,10 @@
headerChan chan struct{} // closed to indicate the end of header metadata.
headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
+ // headerValid indicates whether a valid header was received. Only
+ // meaningful after headerChan is closed (always call waitOnHeader() before
+ // reading its value). Not valid on server side.
+ headerValid bool
// hdrMu protects header and trailer metadata on the server-side.
hdrMu sync.Mutex
@@ -303,34 +308,28 @@
return streamState(atomic.LoadUint32((*uint32)(&s.state)))
}
-func (s *Stream) waitOnHeader() error {
+func (s *Stream) waitOnHeader() {
if s.headerChan == nil {
// On the server headerChan is always nil since a stream originates
// only after having received headers.
- return nil
+ return
}
select {
case <-s.ctx.Done():
- // We prefer success over failure when reading messages because we delay
- // context error in stream.Read(). To keep behavior consistent, we also
- // prefer success here.
- select {
- case <-s.headerChan:
- return nil
- default:
- }
- return ContextErr(s.ctx.Err())
+ // Close the stream to prevent headers/trailers from changing after
+ // this function returns.
+ s.ct.CloseStream(s, ContextErr(s.ctx.Err()))
+ // headerChan could possibly not be closed yet if closeStream raced
+ // with operateHeaders; wait until it is closed explicitly here.
+ <-s.headerChan
case <-s.headerChan:
- return nil
}
}
// RecvCompress returns the compression algorithm applied to the inbound
// message. It is empty string if there is no compression applied.
func (s *Stream) RecvCompress() string {
- if err := s.waitOnHeader(); err != nil {
- return ""
- }
+ s.waitOnHeader()
return s.recvCompress
}
@@ -351,36 +350,27 @@
// available. It blocks until i) the metadata is ready or ii) there is no header
// metadata or iii) the stream is canceled/expired.
//
-// On server side, it returns the out header after t.WriteHeader is called.
+// On server side, it returns the out header after t.WriteHeader is called. It
+// does not block and must not be called until after WriteHeader.
func (s *Stream) Header() (metadata.MD, error) {
- if s.headerChan == nil && s.header != nil {
+ if s.headerChan == nil {
// On server side, return the header in stream. It will be the out
// header after t.WriteHeader is called.
return s.header.Copy(), nil
}
- err := s.waitOnHeader()
- // Even if the stream is closed, header is returned if available.
- select {
- case <-s.headerChan:
- if s.header == nil {
- return nil, nil
- }
- return s.header.Copy(), nil
- default:
+ s.waitOnHeader()
+ if !s.headerValid {
+ return nil, s.status.Err()
}
- return nil, err
+ return s.header.Copy(), nil
}
// TrailersOnly blocks until a header or trailers-only frame is received and
// then returns true if the stream was trailers-only. If the stream ends
-// before headers are received, returns true, nil. If a context error happens
-// first, returns it as a status error. Client-side only.
-func (s *Stream) TrailersOnly() (bool, error) {
- err := s.waitOnHeader()
- if err != nil {
- return false, err
- }
- return s.noHeaders, nil
+// before headers are received, returns true, nil. Client-side only.
+func (s *Stream) TrailersOnly() bool {
+ s.waitOnHeader()
+ return s.noHeaders
}
// Trailer returns the cached trailer metedata. Note that if it is not called
@@ -534,6 +524,7 @@
ReadBufferSize int
ChannelzParentID int64
MaxHeaderListSize *uint32
+ HeaderTableSize *uint32
}
// NewServerTransport creates a ServerTransport with conn or non-nil error
diff --git a/vendor/google.golang.org/grpc/resolver/resolver.go b/vendor/google.golang.org/grpc/resolver/resolver.go
index e83da34..4c5423b 100644
--- a/vendor/google.golang.org/grpc/resolver/resolver.go
+++ b/vendor/google.golang.org/grpc/resolver/resolver.go
@@ -21,6 +21,10 @@
package resolver
import (
+ "context"
+ "net"
+
+ "google.golang.org/grpc/credentials"
"google.golang.org/grpc/serviceconfig"
)
@@ -86,9 +90,16 @@
// Type is the type of this address.
Type AddressType
// ServerName is the name of this address.
+ // If non-empty, the ServerName is used as the transport certification authority for
+ // the address, instead of the hostname from the Dial target string. In most cases,
+ // this should not be set.
//
- // e.g. if Type is GRPCLB, ServerName should be the name of the remote load
+ // If Type is GRPCLB, ServerName should be the name of the remote load
// balancer, not the name of the backend.
+ //
+ // WARNING: ServerName must only be populated with trusted values. It
+ // is insecure to populate it with data from untrusted inputs since untrusted
+ // values could be used to bypass the authority checks performed by TLS.
ServerName string
// Metadata is the information associated with Addr, which may be used
// to make load balancing decision.
@@ -98,18 +109,38 @@
// BuildOption includes additional information for the builder to create
// the resolver.
type BuildOption struct {
- // DisableServiceConfig indicates whether resolver should fetch service config data.
+ // DisableServiceConfig indicates whether a resolver implementation should
+ // fetch service config data.
DisableServiceConfig bool
+ // DialCreds is the transport credentials used by the ClientConn for
+ // communicating with the target gRPC service (set via
+ // WithTransportCredentials). In cases where a name resolution service
+ // requires the same credentials, the resolver may use this field. In most
+ // cases though, it is not appropriate, and this field may be ignored.
+ DialCreds credentials.TransportCredentials
+ // CredsBundle is the credentials bundle used by the ClientConn for
+ // communicating with the target gRPC service (set via
+ // WithCredentialsBundle). In cases where a name resolution service
+ // requires the same credentials, the resolver may use this field. In most
+ // cases though, it is not appropriate, and this field may be ignored.
+ CredsBundle credentials.Bundle
+ // Dialer is the custom dialer used by the ClientConn for dialling the
+ // target gRPC service (set via WithDialer). In cases where a name
+ // resolution service requires the same dialer, the resolver may use this
+ // field. In most cases though, it is not appropriate, and this field may
+ // be ignored.
+ Dialer func(context.Context, string) (net.Conn, error)
}
// State contains the current Resolver state relevant to the ClientConn.
type State struct {
- Addresses []Address // Resolved addresses for the target
- // ServiceConfig is the parsed service config; obtained from
- // serviceconfig.Parse.
- ServiceConfig serviceconfig.Config
+ // Addresses is the latest set of resolved addresses for the target.
+ Addresses []Address
- // TODO: add Err error
+ // ServiceConfig contains the result from parsing the latest service
+ // config. If it is nil, it indicates no service config is present or the
+ // resolver does not provide service configs.
+ ServiceConfig *serviceconfig.ParseResult
}
// ClientConn contains the callbacks for resolver to notify any updates
@@ -122,6 +153,10 @@
type ClientConn interface {
// UpdateState updates the state of the ClientConn appropriately.
UpdateState(State)
+ // ReportError notifies the ClientConn that the Resolver encountered an
+ // error. The ClientConn will notify the load balancer and begin calling
+ // ResolveNow on the Resolver with exponential backoff.
+ ReportError(error)
// NewAddress is called by resolver to notify ClientConn a new list
// of resolved addresses.
// The address list should be the complete list of resolved addresses.
@@ -133,6 +168,9 @@
//
// Deprecated: Use UpdateState instead.
NewServiceConfig(serviceConfig string)
+ // ParseServiceConfig parses the provided service config and returns an
+ // object that provides the parsed config.
+ ParseServiceConfig(serviceConfigJSON string) *serviceconfig.ParseResult
}
// Target represents a target for gRPC, as specified in:
diff --git a/vendor/google.golang.org/grpc/resolver_conn_wrapper.go b/vendor/google.golang.org/grpc/resolver_conn_wrapper.go
index 6934905..7dcefcf 100644
--- a/vendor/google.golang.org/grpc/resolver_conn_wrapper.go
+++ b/vendor/google.golang.org/grpc/resolver_conn_wrapper.go
@@ -21,22 +21,29 @@
import (
"fmt"
"strings"
- "sync/atomic"
+ "sync"
+ "time"
+ "google.golang.org/grpc/balancer"
+ "google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/channelz"
+ "google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/resolver"
+ "google.golang.org/grpc/serviceconfig"
)
// ccResolverWrapper is a wrapper on top of cc for resolvers.
// It implements resolver.ClientConnection interface.
type ccResolverWrapper struct {
- cc *ClientConn
- resolver resolver.Resolver
- addrCh chan []resolver.Address
- scCh chan string
- done uint32 // accessed atomically; set to 1 when closed.
- curState resolver.State
+ cc *ClientConn
+ resolverMu sync.Mutex
+ resolver resolver.Resolver
+ done *grpcsync.Event
+ curState resolver.State
+
+ pollingMu sync.Mutex
+ polling chan struct{}
}
// split2 returns the values from strings.SplitN(s, sep, 2).
@@ -67,12 +74,9 @@
return ret
}
-// newCCResolverWrapper parses cc.target for scheme and gets the resolver
-// builder for this scheme and builds the resolver. The monitoring goroutine
-// for it is not started yet and can be created by calling start().
-//
-// If withResolverBuilder dial option is set, the specified resolver will be
-// used instead.
+// newCCResolverWrapper uses the resolver.Builder stored in the ClientConn to
+// build a Resolver and returns a ccResolverWrapper object which wraps the
+// newly built resolver.
func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) {
rb := cc.dopts.resolverBuilder
if rb == nil {
@@ -80,47 +84,122 @@
}
ccr := &ccResolverWrapper{
- cc: cc,
- addrCh: make(chan []resolver.Address, 1),
- scCh: make(chan string, 1),
+ cc: cc,
+ done: grpcsync.NewEvent(),
+ }
+
+ var credsClone credentials.TransportCredentials
+ if creds := cc.dopts.copts.TransportCredentials; creds != nil {
+ credsClone = creds.Clone()
+ }
+ rbo := resolver.BuildOption{
+ DisableServiceConfig: cc.dopts.disableServiceConfig,
+ DialCreds: credsClone,
+ CredsBundle: cc.dopts.copts.CredsBundle,
+ Dialer: cc.dopts.copts.Dialer,
}
var err error
- ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, resolver.BuildOption{DisableServiceConfig: cc.dopts.disableServiceConfig})
+ // We need to hold the lock here while we assign to the ccr.resolver field
+ // to guard against a data race caused by the following code path,
+ // rb.Build-->ccr.ReportError-->ccr.poll-->ccr.resolveNow, would end up
+ // accessing ccr.resolver which is being assigned here.
+ ccr.resolverMu.Lock()
+ ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
if err != nil {
return nil, err
}
+ ccr.resolverMu.Unlock()
return ccr, nil
}
func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOption) {
- ccr.resolver.ResolveNow(o)
+ ccr.resolverMu.Lock()
+ if !ccr.done.HasFired() {
+ ccr.resolver.ResolveNow(o)
+ }
+ ccr.resolverMu.Unlock()
}
func (ccr *ccResolverWrapper) close() {
+ ccr.resolverMu.Lock()
ccr.resolver.Close()
- atomic.StoreUint32(&ccr.done, 1)
+ ccr.done.Fire()
+ ccr.resolverMu.Unlock()
}
-func (ccr *ccResolverWrapper) isDone() bool {
- return atomic.LoadUint32(&ccr.done) == 1
+// poll begins or ends asynchronous polling of the resolver based on whether
+// err is ErrBadResolverState.
+func (ccr *ccResolverWrapper) poll(err error) {
+ ccr.pollingMu.Lock()
+ defer ccr.pollingMu.Unlock()
+ if err != balancer.ErrBadResolverState {
+ // stop polling
+ if ccr.polling != nil {
+ close(ccr.polling)
+ ccr.polling = nil
+ }
+ return
+ }
+ if ccr.polling != nil {
+ // already polling
+ return
+ }
+ p := make(chan struct{})
+ ccr.polling = p
+ go func() {
+ for i := 0; ; i++ {
+ ccr.resolveNow(resolver.ResolveNowOption{})
+ t := time.NewTimer(ccr.cc.dopts.resolveNowBackoff(i))
+ select {
+ case <-p:
+ t.Stop()
+ return
+ case <-ccr.done.Done():
+ // Resolver has been closed.
+ t.Stop()
+ return
+ case <-t.C:
+ select {
+ case <-p:
+ return
+ default:
+ }
+ // Timer expired; re-resolve.
+ }
+ }
+ }()
}
func (ccr *ccResolverWrapper) UpdateState(s resolver.State) {
- if ccr.isDone() {
+ if ccr.done.HasFired() {
return
}
grpclog.Infof("ccResolverWrapper: sending update to cc: %v", s)
if channelz.IsOn() {
ccr.addChannelzTraceEvent(s)
}
- ccr.cc.updateResolverState(s)
ccr.curState = s
+ ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
+}
+
+func (ccr *ccResolverWrapper) ReportError(err error) {
+ if ccr.done.HasFired() {
+ return
+ }
+ grpclog.Warningf("ccResolverWrapper: reporting error to cc: %v", err)
+ if channelz.IsOn() {
+ channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
+ Desc: fmt.Sprintf("Resolver reported error: %v", err),
+ Severity: channelz.CtWarning,
+ })
+ }
+ ccr.poll(ccr.cc.updateResolverState(resolver.State{}, err))
}
// NewAddress is called by the resolver implementation to send addresses to gRPC.
func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
- if ccr.isDone() {
+ if ccr.done.HasFired() {
return
}
grpclog.Infof("ccResolverWrapper: sending new addresses to cc: %v", addrs)
@@ -128,31 +207,49 @@
ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig})
}
ccr.curState.Addresses = addrs
- ccr.cc.updateResolverState(ccr.curState)
+ ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
}
// NewServiceConfig is called by the resolver implementation to send service
// configs to gRPC.
func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
- if ccr.isDone() {
+ if ccr.done.HasFired() {
return
}
grpclog.Infof("ccResolverWrapper: got new service config: %v", sc)
- c, err := parseServiceConfig(sc)
- if err != nil {
+ scpr := parseServiceConfig(sc)
+ if scpr.Err != nil {
+ grpclog.Warningf("ccResolverWrapper: error parsing service config: %v", scpr.Err)
+ if channelz.IsOn() {
+ channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
+ Desc: fmt.Sprintf("Error parsing service config: %v", scpr.Err),
+ Severity: channelz.CtWarning,
+ })
+ }
+ ccr.poll(balancer.ErrBadResolverState)
return
}
if channelz.IsOn() {
- ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: c})
+ ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr})
}
- ccr.curState.ServiceConfig = c
- ccr.cc.updateResolverState(ccr.curState)
+ ccr.curState.ServiceConfig = scpr
+ ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
+}
+
+func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult {
+ return parseServiceConfig(scJSON)
}
func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
var updates []string
- oldSC, oldOK := ccr.curState.ServiceConfig.(*ServiceConfig)
- newSC, newOK := s.ServiceConfig.(*ServiceConfig)
+ var oldSC, newSC *ServiceConfig
+ var oldOK, newOK bool
+ if ccr.curState.ServiceConfig != nil {
+ oldSC, oldOK = ccr.curState.ServiceConfig.Config.(*ServiceConfig)
+ }
+ if s.ServiceConfig != nil {
+ newSC, newOK = s.ServiceConfig.Config.(*ServiceConfig)
+ }
if oldOK != newOK || (oldOK && newOK && oldSC.rawJSONString != newSC.rawJSONString) {
updates = append(updates, "service config updated")
}
diff --git a/vendor/google.golang.org/grpc/rpc_util.go b/vendor/google.golang.org/grpc/rpc_util.go
index 088c3f1..edaba79 100644
--- a/vendor/google.golang.org/grpc/rpc_util.go
+++ b/vendor/google.golang.org/grpc/rpc_util.go
@@ -648,35 +648,58 @@
return nil, st.Err()
}
+ var size int
if pf == compressionMade {
// To match legacy behavior, if the decompressor is set by WithDecompressor or RPCDecompressor,
// use this decompressor as the default.
if dc != nil {
d, err = dc.Do(bytes.NewReader(d))
- if err != nil {
- return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
- }
+ size = len(d)
} else {
- dcReader, err := compressor.Decompress(bytes.NewReader(d))
- if err != nil {
- return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
- }
- // Read from LimitReader with limit max+1. So if the underlying
- // reader is over limit, the result will be bigger than max.
- d, err = ioutil.ReadAll(io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1))
- if err != nil {
- return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
- }
+ d, size, err = decompress(compressor, d, maxReceiveMessageSize)
}
+ if err != nil {
+ return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
+ }
+ } else {
+ size = len(d)
}
- if len(d) > maxReceiveMessageSize {
+ if size > maxReceiveMessageSize {
// TODO: Revisit the error code. Currently keep it consistent with java
// implementation.
- return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(d), maxReceiveMessageSize)
+ return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", size, maxReceiveMessageSize)
}
return d, nil
}
+// Using compressor, decompress d, returning data and size.
+// Optionally, if data will be over maxReceiveMessageSize, just return the size.
+func decompress(compressor encoding.Compressor, d []byte, maxReceiveMessageSize int) ([]byte, int, error) {
+ dcReader, err := compressor.Decompress(bytes.NewReader(d))
+ if err != nil {
+ return nil, 0, err
+ }
+ if sizer, ok := compressor.(interface {
+ DecompressedSize(compressedBytes []byte) int
+ }); ok {
+ if size := sizer.DecompressedSize(d); size >= 0 {
+ if size > maxReceiveMessageSize {
+ return nil, size, nil
+ }
+ // size is used as an estimate to size the buffer, but we
+ // will read more data if available.
+ // +MinRead so ReadFrom will not reallocate if size is correct.
+ buf := bytes.NewBuffer(make([]byte, 0, size+bytes.MinRead))
+ bytesRead, err := buf.ReadFrom(io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1))
+ return buf.Bytes(), int(bytesRead), err
+ }
+ }
+ // Read from LimitReader with limit max+1. So if the underlying
+ // reader is over limit, the result will be bigger than max.
+ d, err = ioutil.ReadAll(io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1))
+ return d, len(d), err
+}
+
// For the two compressor parameters, both should not be set, but if they are,
// dc takes precedence over compressor.
// TODO(dfawley): wrap the old compressor/decompressor using the new API?
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index f064b73..e54083d 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -130,6 +130,7 @@
readBufferSize int
connectionTimeout time.Duration
maxHeaderListSize *uint32
+ headerTableSize *uint32
}
var defaultServerOptions = serverOptions{
@@ -377,6 +378,16 @@
})
}
+// HeaderTableSize returns a ServerOption that sets the size of dynamic
+// header table for stream.
+//
+// This API is EXPERIMENTAL.
+func HeaderTableSize(s uint32) ServerOption {
+ return newFuncServerOption(func(o *serverOptions) {
+ o.headerTableSize = &s
+ })
+}
+
// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
@@ -686,6 +697,7 @@
ReadBufferSize: s.opts.readBufferSize,
ChannelzParentID: s.channelzID,
MaxHeaderListSize: s.opts.maxHeaderListSize,
+ HeaderTableSize: s.opts.headerTableSize,
}
st, err := transport.NewServerTransport("http2", c, config)
if err != nil {
diff --git a/vendor/google.golang.org/grpc/service_config.go b/vendor/google.golang.org/grpc/service_config.go
index d0787f1..4f8836d 100644
--- a/vendor/google.golang.org/grpc/service_config.go
+++ b/vendor/google.golang.org/grpc/service_config.go
@@ -261,20 +261,17 @@
}
func init() {
- internal.ParseServiceConfig = func(sc string) (interface{}, error) {
- return parseServiceConfig(sc)
- }
+ internal.ParseServiceConfigForTesting = parseServiceConfig
}
-
-func parseServiceConfig(js string) (*ServiceConfig, error) {
+func parseServiceConfig(js string) *serviceconfig.ParseResult {
if len(js) == 0 {
- return nil, fmt.Errorf("no JSON service config provided")
+ return &serviceconfig.ParseResult{Err: fmt.Errorf("no JSON service config provided")}
}
var rsc jsonSC
err := json.Unmarshal([]byte(js), &rsc)
if err != nil {
grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err)
- return nil, err
+ return &serviceconfig.ParseResult{Err: err}
}
sc := ServiceConfig{
LB: rsc.LoadBalancingPolicy,
@@ -288,7 +285,7 @@
if len(lbcfg) != 1 {
err := fmt.Errorf("invalid loadBalancingConfig: entry %v does not contain exactly 1 policy/config pair: %q", i, lbcfg)
grpclog.Warningf(err.Error())
- return nil, err
+ return &serviceconfig.ParseResult{Err: err}
}
var name string
var jsonCfg json.RawMessage
@@ -303,17 +300,25 @@
var err error
sc.lbConfig.cfg, err = parser.ParseConfig(jsonCfg)
if err != nil {
- return nil, fmt.Errorf("error parsing loadBalancingConfig for policy %q: %v", name, err)
+ return &serviceconfig.ParseResult{Err: fmt.Errorf("error parsing loadBalancingConfig for policy %q: %v", name, err)}
}
} else if string(jsonCfg) != "{}" {
grpclog.Warningf("non-empty balancer configuration %q, but balancer does not implement ParseConfig", string(jsonCfg))
}
break
}
+ if sc.lbConfig == nil {
+ // We had a loadBalancingConfig field but did not encounter a
+ // supported policy. The config is considered invalid in this
+ // case.
+ err := fmt.Errorf("invalid loadBalancingConfig: no supported policies found")
+ grpclog.Warningf(err.Error())
+ return &serviceconfig.ParseResult{Err: err}
+ }
}
if rsc.MethodConfig == nil {
- return &sc, nil
+ return &serviceconfig.ParseResult{Config: &sc}
}
for _, m := range *rsc.MethodConfig {
if m.Name == nil {
@@ -322,7 +327,7 @@
d, err := parseDuration(m.Timeout)
if err != nil {
grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err)
- return nil, err
+ return &serviceconfig.ParseResult{Err: err}
}
mc := MethodConfig{
@@ -331,7 +336,7 @@
}
if mc.retryPolicy, err = convertRetryPolicy(m.RetryPolicy); err != nil {
grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err)
- return nil, err
+ return &serviceconfig.ParseResult{Err: err}
}
if m.MaxRequestMessageBytes != nil {
if *m.MaxRequestMessageBytes > int64(maxInt) {
@@ -356,13 +361,13 @@
if sc.retryThrottling != nil {
if mt := sc.retryThrottling.MaxTokens; mt <= 0 || mt > 1000 {
- return nil, fmt.Errorf("invalid retry throttling config: maxTokens (%v) out of range (0, 1000]", mt)
+ return &serviceconfig.ParseResult{Err: fmt.Errorf("invalid retry throttling config: maxTokens (%v) out of range (0, 1000]", mt)}
}
if tr := sc.retryThrottling.TokenRatio; tr <= 0 {
- return nil, fmt.Errorf("invalid retry throttling config: tokenRatio (%v) may not be negative", tr)
+ return &serviceconfig.ParseResult{Err: fmt.Errorf("invalid retry throttling config: tokenRatio (%v) may not be negative", tr)}
}
}
- return &sc, nil
+ return &serviceconfig.ParseResult{Config: &sc}
}
func convertRetryPolicy(jrp *jsonRetryPolicy) (p *retryPolicy, err error) {
diff --git a/vendor/google.golang.org/grpc/serviceconfig/serviceconfig.go b/vendor/google.golang.org/grpc/serviceconfig/serviceconfig.go
index 53b2787..187c304 100644
--- a/vendor/google.golang.org/grpc/serviceconfig/serviceconfig.go
+++ b/vendor/google.golang.org/grpc/serviceconfig/serviceconfig.go
@@ -22,27 +22,20 @@
// This package is EXPERIMENTAL.
package serviceconfig
-import (
- "google.golang.org/grpc/internal"
-)
-
// Config represents an opaque data structure holding a service config.
type Config interface {
- isConfig()
+ isServiceConfig()
}
// LoadBalancingConfig represents an opaque data structure holding a load
-// balancer config.
+// balancing config.
type LoadBalancingConfig interface {
isLoadBalancingConfig()
}
-// Parse parses the JSON service config provided into an internal form or
-// returns an error if the config is invalid.
-func Parse(ServiceConfigJSON string) (Config, error) {
- c, err := internal.ParseServiceConfig(ServiceConfigJSON)
- if err != nil {
- return nil, err
- }
- return c.(Config), err
+// ParseResult contains a service config or an error. Exactly one must be
+// non-nil.
+type ParseResult struct {
+ Config Config
+ Err error
}
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index 134a624..bb99940 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -488,7 +488,7 @@
pushback := 0
hasPushback := false
if cs.attempt.s != nil {
- if to, toErr := cs.attempt.s.TrailersOnly(); toErr != nil || !to {
+ if !cs.attempt.s.TrailersOnly() {
return err
}
diff --git a/vendor/google.golang.org/grpc/version.go b/vendor/google.golang.org/grpc/version.go
index 5888505..40af096 100644
--- a/vendor/google.golang.org/grpc/version.go
+++ b/vendor/google.golang.org/grpc/version.go
@@ -19,4 +19,4 @@
package grpc
// Version is the current grpc version.
-const Version = "1.23.1"
+const Version = "1.25.1"
diff --git a/vendor/google.golang.org/grpc/vet.sh b/vendor/google.golang.org/grpc/vet.sh
old mode 100755
new mode 100644
index 661e1e1..f324be5
--- a/vendor/google.golang.org/grpc/vet.sh
+++ b/vendor/google.golang.org/grpc/vet.sh
@@ -31,12 +31,15 @@
if [[ "$1" = "-install" ]]; then
# Check for module support
if go help mod >& /dev/null; then
+ # Install the pinned versions as defined in module tools.
+ pushd ./test/tools
go install \
golang.org/x/lint/golint \
golang.org/x/tools/cmd/goimports \
honnef.co/go/tools/cmd/staticcheck \
github.com/client9/misspell/cmd/misspell \
github.com/golang/protobuf/protoc-gen-go
+ popd
else
# Ye olde `go get` incantation.
# Note: this gets the latest version of all tools (vs. the pinned versions
@@ -67,18 +70,21 @@
fi
# - Ensure all source files contain a copyright message.
-git ls-files "*.go" | xargs grep -L "\(Copyright [0-9]\{4,\} gRPC authors\)\|DO NOT EDIT" 2>&1 | fail_on_output
+(! git grep -L "\(Copyright [0-9]\{4,\} gRPC authors\)\|DO NOT EDIT" -- '*.go')
# - Make sure all tests in grpc and grpc/test use leakcheck via Teardown.
(! grep 'func Test[^(]' *_test.go)
(! grep 'func Test[^(]' test/*.go)
+# - Do not import x/net/context.
+(! git grep -l 'x/net/context' -- "*.go")
+
# - Do not import math/rand for real library code. Use internal/grpcrand for
# thread safety.
-git ls-files "*.go" | xargs grep -l '"math/rand"' 2>&1 | (! grep -v '^examples\|^stress\|grpcrand\|wrr_test')
+git grep -l '"math/rand"' -- "*.go" 2>&1 | (! grep -v '^examples\|^stress\|grpcrand\|wrr_test')
# - Ensure all ptypes proto packages are renamed when importing.
-git ls-files "*.go" | (! xargs grep "\(import \|^\s*\)\"github.com/golang/protobuf/ptypes/")
+(! git grep "\(import \|^\s*\)\"github.com/golang/protobuf/ptypes/" -- "*.go")
# - Check imports that are illegal in appengine (until Go 1.11).
# TODO: Remove when we drop Go 1.10 support
@@ -86,7 +92,7 @@
# - gofmt, goimports, golint (with exceptions for generated code), go vet.
gofmt -s -d -l . 2>&1 | fail_on_output
-goimports -l . 2>&1 | (! grep -vE "(_mock|\.pb)\.go:") | fail_on_output
+goimports -l . 2>&1 | (! grep -vE "(_mock|\.pb)\.go") | fail_on_output
golint ./... 2>&1 | (! grep -vE "(_mock|\.pb)\.go:")
go vet -all .
@@ -109,8 +115,10 @@
staticcheck -go 1.9 -checks 'inherit,-ST1015' -ignore '
google.golang.org/grpc/balancer.go:SA1019
google.golang.org/grpc/balancer/grpclb/grpclb_remote_balancer.go:SA1019
+google.golang.org/grpc/balancer/grpclb/grpclb_test.go:SA1019
google.golang.org/grpc/balancer/roundrobin/roundrobin_test.go:SA1019
google.golang.org/grpc/xds/internal/balancer/edsbalancer/balancergroup.go:SA1019
+google.golang.org/grpc/xds/internal/resolver/xds_resolver.go:SA1019
google.golang.org/grpc/xds/internal/balancer/xds.go:SA1019
google.golang.org/grpc/xds/internal/balancer/xds_client.go:SA1019
google.golang.org/grpc/balancer_conn_wrappers.go:SA1019
@@ -124,7 +132,7 @@
google.golang.org/grpc/examples/features/load_balancing/client/main.go:SA1019
google.golang.org/grpc/internal/transport/handler_server.go:SA1019
google.golang.org/grpc/internal/transport/handler_server_test.go:SA1019
-google.golang.org/grpc/resolver/dns/dns_resolver.go:SA1019
+google.golang.org/grpc/internal/resolver/dns/dns_resolver.go:SA1019
google.golang.org/grpc/stats/stats_test.go:SA1019
google.golang.org/grpc/test/balancer_test.go:SA1019
google.golang.org/grpc/test/channelz_test.go:SA1019