[VOL-2312] Logging - Integrate voltctl with new etcd-based dynamic loglevel mechanism. Testing is in progress
Change-Id: I2e13bb79008c9a49ebb6f58e575f51efebe6dbfd
diff --git a/vendor/google.golang.org/grpc/.travis.yml b/vendor/google.golang.org/grpc/.travis.yml
index 591343e..f0f723f 100644
--- a/vendor/google.golang.org/grpc/.travis.yml
+++ b/vendor/google.golang.org/grpc/.travis.yml
@@ -3,15 +3,15 @@
matrix:
include:
- go: 1.12.x
- env: GO111MODULE=on
- - go: 1.11.x
env: VET=1 GO111MODULE=on
- - go: 1.11.x
+ - go: 1.12.x
env: RACE=1 GO111MODULE=on
- - go: 1.11.x
+ - go: 1.12.x
env: RUN386=1
- - go: 1.11.x
+ - go: 1.12.x
env: GRPC_GO_RETRY=on
+ - go: 1.11.x
+ env: GO111MODULE=on
- go: 1.10.x
- go: 1.9.x
- go: 1.9.x
@@ -23,7 +23,7 @@
- if [[ "${GO111MODULE}" = "on" ]]; then mkdir "${HOME}/go"; export GOPATH="${HOME}/go"; fi
- if [[ -n "${RUN386}" ]]; then export GOARCH=386; fi
- if [[ "${TRAVIS_EVENT_TYPE}" = "cron" && -z "${RUN386}" ]]; then RACE=1; fi
- - if [[ "${TRAVIS_EVENT_TYPE}" != "cron" ]]; then VET_SKIP_PROTO=1; fi
+ - if [[ "${TRAVIS_EVENT_TYPE}" != "cron" ]]; then export VET_SKIP_PROTO=1; fi
install:
- try3() { eval "$*" || eval "$*" || eval "$*"; }
diff --git a/vendor/google.golang.org/grpc/CODE-OF-CONDUCT.md b/vendor/google.golang.org/grpc/CODE-OF-CONDUCT.md
new file mode 100644
index 0000000..9d4213e
--- /dev/null
+++ b/vendor/google.golang.org/grpc/CODE-OF-CONDUCT.md
@@ -0,0 +1,3 @@
+## Community Code of Conduct
+
+gRPC follows the [CNCF Code of Conduct](https://github.com/cncf/foundation/blob/master/code-of-conduct.md).
diff --git a/vendor/google.golang.org/grpc/CONTRIBUTING.md b/vendor/google.golang.org/grpc/CONTRIBUTING.md
index 6e69b28..4f1567e 100644
--- a/vendor/google.golang.org/grpc/CONTRIBUTING.md
+++ b/vendor/google.golang.org/grpc/CONTRIBUTING.md
@@ -1,6 +1,8 @@
# How to contribute
-We definitely welcome your patches and contributions to gRPC!
+We definitely welcome your patches and contributions to gRPC! Please read the gRPC
+organization's [governance rules](https://github.com/grpc/grpc-community/blob/master/governance.md)
+and [contribution guidelines](https://github.com/grpc/grpc-community/blob/master/CONTRIBUTING.md) before proceeding.
If you are new to github, please start by reading [Pull Request howto](https://help.github.com/articles/about-pull-requests/)
diff --git a/vendor/google.golang.org/grpc/GOVERNANCE.md b/vendor/google.golang.org/grpc/GOVERNANCE.md
new file mode 100644
index 0000000..d6ff267
--- /dev/null
+++ b/vendor/google.golang.org/grpc/GOVERNANCE.md
@@ -0,0 +1 @@
+This repository is governed by the gRPC organization's [governance rules](https://github.com/grpc/grpc-community/blob/master/governance.md).
diff --git a/vendor/google.golang.org/grpc/MAINTAINERS.md b/vendor/google.golang.org/grpc/MAINTAINERS.md
new file mode 100644
index 0000000..093c82b
--- /dev/null
+++ b/vendor/google.golang.org/grpc/MAINTAINERS.md
@@ -0,0 +1,27 @@
+This page lists all active maintainers of this repository. If you were a
+maintainer and would like to add your name to the Emeritus list, please send us a
+PR.
+
+See [GOVERNANCE.md](https://github.com/grpc/grpc-community/blob/master/governance.md)
+for governance guidelines and how to become a maintainer.
+See [CONTRIBUTING.md](https://github.com/grpc/grpc-community/blob/master/CONTRIBUTING.md)
+for general contribution guidelines.
+
+## Maintainers (in alphabetical order)
+- [canguler](https://github.com/canguler), Google LLC
+- [cesarghali](https://github.com/cesarghali), Google LLC
+- [dfawley](https://github.com/dfawley), Google LLC
+- [easwars](https://github.com/easwars), Google LLC
+- [jadekler](https://github.com/jadekler), Google LLC
+- [menghanl](https://github.com/menghanl), Google LLC
+- [srini100](https://github.com/srini100), Google LLC
+
+## Emeritus Maintainers (in alphabetical order)
+- [adelez](https://github.com/adelez), Google LLC
+- [iamqizhao](https://github.com/iamqizhao), Google LLC
+- [jtattermusch](https://github.com/jtattermusch), Google LLC
+- [lyuxuan](https://github.com/lyuxuan), Google LLC
+- [makmukhi](https://github.com/makmukhi), Google LLC
+- [matt-kwong](https://github.com/matt-kwong), Google LLC
+- [nicolasnoble](https://github.com/nicolasnoble), Google LLC
+- [yongni](https://github.com/yongni), Google LLC
diff --git a/vendor/google.golang.org/grpc/balancer.go b/vendor/google.golang.org/grpc/balancer.go
index a78e702..a8eb0f4 100644
--- a/vendor/google.golang.org/grpc/balancer.go
+++ b/vendor/google.golang.org/grpc/balancer.go
@@ -43,7 +43,7 @@
// BalancerConfig specifies the configurations for Balancer.
//
-// Deprecated: please use package balancer.
+// Deprecated: please use package balancer. May be removed in a future 1.x release.
type BalancerConfig struct {
// DialCreds is the transport credential the Balancer implementation can
// use to dial to a remote load balancer server. The Balancer implementations
@@ -57,7 +57,7 @@
// BalancerGetOptions configures a Get call.
//
-// Deprecated: please use package balancer.
+// Deprecated: please use package balancer. May be removed in a future 1.x release.
type BalancerGetOptions struct {
// BlockingWait specifies whether Get should block when there is no
// connected address.
@@ -66,7 +66,7 @@
// Balancer chooses network addresses for RPCs.
//
-// Deprecated: please use package balancer.
+// Deprecated: please use package balancer. May be removed in a future 1.x release.
type Balancer interface {
// Start does the initialization work to bootstrap a Balancer. For example,
// this function may start the name resolution and watch the updates. It will
@@ -120,7 +120,7 @@
// RoundRobin returns a Balancer that selects addresses round-robin. It uses r to watch
// the name resolution updates and updates the addresses available correspondingly.
//
-// Deprecated: please use package balancer/roundrobin.
+// Deprecated: please use package balancer/roundrobin. May be removed in a future 1.x release.
func RoundRobin(r naming.Resolver) Balancer {
return &roundRobin{r: r}
}
diff --git a/vendor/google.golang.org/grpc/balancer/balancer.go b/vendor/google.golang.org/grpc/balancer/balancer.go
index 4b72daa..c266f4e 100644
--- a/vendor/google.golang.org/grpc/balancer/balancer.go
+++ b/vendor/google.golang.org/grpc/balancer/balancer.go
@@ -22,6 +22,7 @@
import (
"context"
+ "encoding/json"
"errors"
"net"
"strings"
@@ -31,6 +32,7 @@
"google.golang.org/grpc/internal"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
+ "google.golang.org/grpc/serviceconfig"
)
var (
@@ -39,7 +41,10 @@
)
// Register registers the balancer builder to the balancer map. b.Name
-// (lowercased) will be used as the name registered with this builder.
+// (lowercased) will be used as the name registered with this builder. If the
+// Builder implements ConfigParser, ParseConfig will be called when new service
+// configs are received by the resolver, and the result will be provided to the
+// Balancer in UpdateClientConnState.
//
// NOTE: this function must only be called during initialization time (i.e. in
// an init() function), and is not thread-safe. If multiple Balancers are
@@ -172,6 +177,14 @@
Name() string
}
+// ConfigParser parses load balancer configs.
+type ConfigParser interface {
+ // ParseConfig parses the JSON load balancer config provided into an
+ // internal form or returns an error if the config is invalid. For future
+ // compatibility reasons, unknown fields in the config should be ignored.
+ ParseConfig(LoadBalancingConfigJSON json.RawMessage) (serviceconfig.LoadBalancingConfig, error)
+}
+
// PickOptions contains addition information for the Pick operation.
type PickOptions struct {
// FullMethodName is the method name that NewClientStream() is called
@@ -270,7 +283,7 @@
// non-nil error to gRPC.
//
// Deprecated: if V2Balancer is implemented by the Balancer,
- // UpdateResolverState will be called instead.
+ // UpdateClientConnState will be called instead.
HandleResolvedAddrs([]resolver.Address, error)
// Close closes the balancer. The balancer is not required to call
// ClientConn.RemoveSubConn for its existing SubConns.
@@ -283,14 +296,23 @@
// TODO: add last connection error
}
+// ClientConnState describes the state of a ClientConn relevant to the
+// balancer.
+type ClientConnState struct {
+ ResolverState resolver.State
+ // The parsed load balancing configuration returned by the builder's
+ // ParseConfig method, if implemented.
+ BalancerConfig serviceconfig.LoadBalancingConfig
+}
+
// V2Balancer is defined for documentation purposes. If a Balancer also
-// implements V2Balancer, its UpdateResolverState method will be called instead
-// of HandleResolvedAddrs and its UpdateSubConnState will be called instead of
-// HandleSubConnStateChange.
+// implements V2Balancer, its UpdateClientConnState method will be called
+// instead of HandleResolvedAddrs and its UpdateSubConnState will be called
+// instead of HandleSubConnStateChange.
type V2Balancer interface {
- // UpdateResolverState is called by gRPC when the state of the resolver
+ // UpdateClientConnState is called by gRPC when the state of the ClientConn
// changes.
- UpdateResolverState(resolver.State)
+ UpdateClientConnState(ClientConnState)
// UpdateSubConnState is called by gRPC when the state of a SubConn
// changes.
UpdateSubConnState(SubConn, SubConnState)
diff --git a/vendor/google.golang.org/grpc/balancer/base/balancer.go b/vendor/google.golang.org/grpc/balancer/base/balancer.go
index c5a51bd..1af88f0 100644
--- a/vendor/google.golang.org/grpc/balancer/base/balancer.go
+++ b/vendor/google.golang.org/grpc/balancer/base/balancer.go
@@ -70,13 +70,15 @@
panic("not implemented")
}
-func (b *baseBalancer) UpdateResolverState(s resolver.State) {
- // TODO: handle s.Err (log if not nil) once implemented.
- // TODO: handle s.ServiceConfig?
- grpclog.Infoln("base.baseBalancer: got new resolver state: ", s)
+func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) {
+ // TODO: handle s.ResolverState.Err (log if not nil) once implemented.
+ // TODO: handle s.ResolverState.ServiceConfig?
+ if grpclog.V(2) {
+ grpclog.Infoln("base.baseBalancer: got new ClientConn state: ", s)
+ }
// addrsSet is the set converted from addrs, it's used for quick lookup of an address.
addrsSet := make(map[resolver.Address]struct{})
- for _, a := range s.Addresses {
+ for _, a := range s.ResolverState.Addresses {
addrsSet[a] = struct{}{}
if _, ok := b.subConns[a]; !ok {
// a is a new address (not existing in b.subConns).
@@ -127,10 +129,14 @@
func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
s := state.ConnectivityState
- grpclog.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s)
+ if grpclog.V(2) {
+ grpclog.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s)
+ }
oldS, ok := b.scStates[sc]
if !ok {
- grpclog.Infof("base.baseBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
+ if grpclog.V(2) {
+ grpclog.Infof("base.baseBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
+ }
return
}
b.scStates[sc] = s
diff --git a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
index bc965f0..8df4095 100644
--- a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
+++ b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
@@ -88,7 +88,7 @@
cc *ClientConn
balancer balancer.Balancer
stateChangeQueue *scStateUpdateBuffer
- resolverUpdateCh chan *resolver.State
+ ccUpdateCh chan *balancer.ClientConnState
done chan struct{}
mu sync.Mutex
@@ -99,7 +99,7 @@
ccb := &ccBalancerWrapper{
cc: cc,
stateChangeQueue: newSCStateUpdateBuffer(),
- resolverUpdateCh: make(chan *resolver.State, 1),
+ ccUpdateCh: make(chan *balancer.ClientConnState, 1),
done: make(chan struct{}),
subConns: make(map[*acBalancerWrapper]struct{}),
}
@@ -126,7 +126,7 @@
} else {
ccb.balancer.HandleSubConnStateChange(t.sc, t.state)
}
- case s := <-ccb.resolverUpdateCh:
+ case s := <-ccb.ccUpdateCh:
select {
case <-ccb.done:
ccb.balancer.Close()
@@ -134,9 +134,9 @@
default:
}
if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
- ub.UpdateResolverState(*s)
+ ub.UpdateClientConnState(*s)
} else {
- ccb.balancer.HandleResolvedAddrs(s.Addresses, nil)
+ ccb.balancer.HandleResolvedAddrs(s.ResolverState.Addresses, nil)
}
case <-ccb.done:
}
@@ -151,9 +151,11 @@
for acbw := range scs {
ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
}
+ ccb.UpdateBalancerState(connectivity.Connecting, nil)
return
default:
}
+ ccb.cc.firstResolveEvent.Fire()
}
}
@@ -178,9 +180,10 @@
})
}
-func (ccb *ccBalancerWrapper) updateResolverState(s resolver.State) {
+func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) {
if ccb.cc.curBalancerName != grpclbName {
// Filter any grpclb addresses since we don't have the grpclb balancer.
+ s := &ccs.ResolverState
for i := 0; i < len(s.Addresses); {
if s.Addresses[i].Type == resolver.GRPCLB {
copy(s.Addresses[i:], s.Addresses[i+1:])
@@ -191,10 +194,10 @@
}
}
select {
- case <-ccb.resolverUpdateCh:
+ case <-ccb.ccUpdateCh:
default:
}
- ccb.resolverUpdateCh <- &s
+ ccb.ccUpdateCh <- ccs
}
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index 78e6d17..a7643df 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -38,13 +38,13 @@
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
- "google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
_ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
_ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
+ "google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"
)
@@ -532,24 +532,6 @@
}
}
-// gRPC should resort to default service config when:
-// * resolver service config is disabled
-// * or, resolver does not return a service config or returns an invalid one.
-func (cc *ClientConn) fallbackToDefaultServiceConfig(sc string) bool {
- if cc.dopts.disableServiceConfig {
- return true
- }
- // The logic below is temporary, will be removed once we change the resolver.State ServiceConfig field type.
- // Right now, we assume that empty service config string means resolver does not return a config.
- if sc == "" {
- return true
- }
- // TODO: the logic below is temporary. Once we finish the logic to validate service config
- // in resolver, we will replace the logic below.
- _, err := parseServiceConfig(sc)
- return err != nil
-}
-
func (cc *ClientConn) updateResolverState(s resolver.State) error {
cc.mu.Lock()
defer cc.mu.Unlock()
@@ -560,44 +542,37 @@
return nil
}
- if cc.fallbackToDefaultServiceConfig(s.ServiceConfig) {
+ if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
if cc.dopts.defaultServiceConfig != nil && cc.sc == nil {
cc.applyServiceConfig(cc.dopts.defaultServiceConfig)
}
- } else {
- // TODO: the parsing logic below will be moved inside resolver.
- sc, err := parseServiceConfig(s.ServiceConfig)
- if err != nil {
- return err
- }
- if cc.sc == nil || cc.sc.rawJSONString != s.ServiceConfig {
- cc.applyServiceConfig(sc)
- }
+ } else if sc, ok := s.ServiceConfig.(*ServiceConfig); ok {
+ cc.applyServiceConfig(sc)
}
- // update the service config that will be sent to balancer.
- if cc.sc != nil {
- s.ServiceConfig = cc.sc.rawJSONString
- }
-
+ var balCfg serviceconfig.LoadBalancingConfig
if cc.dopts.balancerBuilder == nil {
// Only look at balancer types and switch balancer if balancer dial
// option is not set.
- var isGRPCLB bool
- for _, a := range s.Addresses {
- if a.Type == resolver.GRPCLB {
- isGRPCLB = true
- break
- }
- }
var newBalancerName string
- // TODO: use new loadBalancerConfig field with appropriate priority.
- if isGRPCLB {
- newBalancerName = grpclbName
- } else if cc.sc != nil && cc.sc.LB != nil {
- newBalancerName = *cc.sc.LB
+ if cc.sc != nil && cc.sc.lbConfig != nil {
+ newBalancerName = cc.sc.lbConfig.name
+ balCfg = cc.sc.lbConfig.cfg
} else {
- newBalancerName = PickFirstBalancerName
+ var isGRPCLB bool
+ for _, a := range s.Addresses {
+ if a.Type == resolver.GRPCLB {
+ isGRPCLB = true
+ break
+ }
+ }
+ if isGRPCLB {
+ newBalancerName = grpclbName
+ } else if cc.sc != nil && cc.sc.LB != nil {
+ newBalancerName = *cc.sc.LB
+ } else {
+ newBalancerName = PickFirstBalancerName
+ }
}
cc.switchBalancer(newBalancerName)
} else if cc.balancerWrapper == nil {
@@ -607,8 +582,7 @@
cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
}
- cc.balancerWrapper.updateResolverState(s)
- cc.firstResolveEvent.Fire()
+ cc.balancerWrapper.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
return nil
}
@@ -621,7 +595,7 @@
//
// Caller must hold cc.mu.
func (cc *ClientConn) switchBalancer(name string) {
- if strings.ToLower(cc.curBalancerName) == strings.ToLower(name) {
+ if strings.EqualFold(cc.curBalancerName, name) {
return
}
@@ -760,6 +734,8 @@
ac.mu.Unlock()
return nil
}
+ // Update connectivity state within the lock to prevent subsequent or
+ // concurrent calls from resetting the transport more than once.
ac.updateConnectivityState(connectivity.Connecting)
ac.mu.Unlock()
@@ -770,7 +746,16 @@
// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
//
-// It checks whether current connected address of ac is in the new addrs list.
+// If ac is Connecting, it returns false. The caller should tear down the ac and
+// create a new one. Note that the backoff will be reset when this happens.
+//
+// If ac is TransientFailure, it updates ac.addrs and returns true. The updated
+// addresses will be picked up by retry in the next iteration after backoff.
+//
+// If ac is Shutdown or Idle, it updates ac.addrs and returns true.
+//
+// If ac is Ready, it checks whether current connected address of ac is in the
+// new addrs list.
// - If true, it updates ac.addrs and returns true. The ac will keep using
// the existing connection.
// - If false, it does nothing and returns false.
@@ -778,17 +763,18 @@
ac.mu.Lock()
defer ac.mu.Unlock()
grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
- if ac.state == connectivity.Shutdown {
+ if ac.state == connectivity.Shutdown ||
+ ac.state == connectivity.TransientFailure ||
+ ac.state == connectivity.Idle {
ac.addrs = addrs
return true
}
- // Unless we're busy reconnecting already, let's reconnect from the top of
- // the list.
- if ac.state != connectivity.Ready {
+ if ac.state == connectivity.Connecting {
return false
}
+ // ac.state is Ready, try to find the connected address.
var curAddrFound bool
for _, a := range addrs {
if reflect.DeepEqual(ac.curAddr, a) {
@@ -1037,6 +1023,9 @@
// The spec doesn't mention what should be done for multiple addresses.
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
connectDeadline := time.Now().Add(dialDuration)
+
+ ac.updateConnectivityState(connectivity.Connecting)
+ ac.transport = nil
ac.mu.Unlock()
newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
@@ -1071,55 +1060,32 @@
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
- newTr.Close()
ac.mu.Unlock()
+ newTr.Close()
return
}
ac.curAddr = addr
ac.transport = newTr
ac.backoffIdx = 0
- healthCheckConfig := ac.cc.healthCheckConfig()
- // LB channel health checking is only enabled when all the four requirements below are met:
- // 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
- // 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
- // 3. a service config with non-empty healthCheckConfig field is provided,
- // 4. the current load balancer allows it.
hctx, hcancel := context.WithCancel(ac.ctx)
- healthcheckManagingState := false
- if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
- if ac.cc.dopts.healthCheckFunc == nil {
- // TODO: add a link to the health check doc in the error message.
- grpclog.Error("the client side LB channel health check function has not been set.")
- } else {
- // TODO(deklerk) refactor to just return transport
- go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
- healthcheckManagingState = true
- }
- }
- if !healthcheckManagingState {
- ac.updateConnectivityState(connectivity.Ready)
- }
+ ac.startHealthCheck(hctx)
ac.mu.Unlock()
// Block until the created transport is down. And when this happens,
// we restart from the top of the addr list.
<-reconnect.Done()
hcancel()
-
- // Need to reconnect after a READY, the addrConn enters
- // TRANSIENT_FAILURE.
+ // restart connecting - the top of the loop will set state to
+ // CONNECTING. This is against the current connectivity semantics doc,
+ // however it allows for graceful behavior for RPCs not yet dispatched
+ // - unfortunate timing would otherwise lead to the RPC failing even
+ // though the TRANSIENT_FAILURE state (called for by the doc) would be
+ // instantaneous.
//
- // This will set addrConn to TRANSIENT_FAILURE for a very short period
- // of time, and turns CONNECTING. It seems reasonable to skip this, but
- // READY-CONNECTING is not a valid transition.
- ac.mu.Lock()
- if ac.state == connectivity.Shutdown {
- ac.mu.Unlock()
- return
- }
- ac.updateConnectivityState(connectivity.TransientFailure)
- ac.mu.Unlock()
+ // Ideally we should transition to Idle here and block until there is
+ // RPC activity that leads to the balancer requesting a reconnect of
+ // the associated SubConn.
}
}
@@ -1133,8 +1099,6 @@
ac.mu.Unlock()
return nil, resolver.Address{}, nil, errConnClosing
}
- ac.updateConnectivityState(connectivity.Connecting)
- ac.transport = nil
ac.cc.mu.RLock()
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
@@ -1178,14 +1142,35 @@
Authority: ac.cc.authority,
}
+ once := sync.Once{}
onGoAway := func(r transport.GoAwayReason) {
ac.mu.Lock()
ac.adjustParams(r)
+ once.Do(func() {
+ if ac.state == connectivity.Ready {
+ // Prevent this SubConn from being used for new RPCs by setting its
+ // state to Connecting.
+ //
+ // TODO: this should be Idle when grpc-go properly supports it.
+ ac.updateConnectivityState(connectivity.Connecting)
+ }
+ })
ac.mu.Unlock()
reconnect.Fire()
}
onClose := func() {
+ ac.mu.Lock()
+ once.Do(func() {
+ if ac.state == connectivity.Ready {
+ // Prevent this SubConn from being used for new RPCs by setting its
+ // state to Connecting.
+ //
+ // TODO: this should be Idle when grpc-go properly supports it.
+ ac.updateConnectivityState(connectivity.Connecting)
+ }
+ })
+ ac.mu.Unlock()
close(onCloseCalled)
reconnect.Fire()
}
@@ -1207,60 +1192,99 @@
return nil, nil, err
}
- if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
- select {
- case <-time.After(connectDeadline.Sub(time.Now())):
- // We didn't get the preface in time.
- newTr.Close()
- grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
- return nil, nil, errors.New("timed out waiting for server handshake")
- case <-prefaceReceived:
- // We got the preface - huzzah! things are good.
- case <-onCloseCalled:
- // The transport has already closed - noop.
- return nil, nil, errors.New("connection closed")
- // TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
- }
+ select {
+ case <-time.After(connectDeadline.Sub(time.Now())):
+ // We didn't get the preface in time.
+ newTr.Close()
+ grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
+ return nil, nil, errors.New("timed out waiting for server handshake")
+ case <-prefaceReceived:
+ // We got the preface - huzzah! things are good.
+ case <-onCloseCalled:
+ // The transport has already closed - noop.
+ return nil, nil, errors.New("connection closed")
+ // TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
}
return newTr, reconnect, nil
}
-func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.ClientTransport, addr resolver.Address, serviceName string) {
- // Set up the health check helper functions
- newStream := func() (interface{}, error) {
- return ac.newClientStream(ctx, &StreamDesc{ServerStreams: true}, "/grpc.health.v1.Health/Watch", newTr)
+// startHealthCheck starts the health checking stream (RPC) to watch the health
+// stats of this connection if health checking is requested and configured.
+//
+// LB channel health checking is enabled when all requirements below are met:
+// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
+// 2. internal.HealthCheckFunc is set by importing the grpc/healthcheck package
+// 3. a service config with non-empty healthCheckConfig field is provided
+// 4. the load balancer requests it
+//
+// It sets addrConn to READY if the health checking stream is not started.
+//
+// Caller must hold ac.mu.
+func (ac *addrConn) startHealthCheck(ctx context.Context) {
+ var healthcheckManagingState bool
+ defer func() {
+ if !healthcheckManagingState {
+ ac.updateConnectivityState(connectivity.Ready)
+ }
+ }()
+
+ if ac.cc.dopts.disableHealthCheck {
+ return
}
- firstReady := true
- reportHealth := func(ok bool) {
+ healthCheckConfig := ac.cc.healthCheckConfig()
+ if healthCheckConfig == nil {
+ return
+ }
+ if !ac.scopts.HealthCheckEnabled {
+ return
+ }
+ healthCheckFunc := ac.cc.dopts.healthCheckFunc
+ if healthCheckFunc == nil {
+ // The health package is not imported to set health check function.
+ //
+ // TODO: add a link to the health check doc in the error message.
+ grpclog.Error("Health check is requested but health check function is not set.")
+ return
+ }
+
+ healthcheckManagingState = true
+
+ // Set up the health check helper functions.
+ currentTr := ac.transport
+ newStream := func(method string) (interface{}, error) {
+ ac.mu.Lock()
+ if ac.transport != currentTr {
+ ac.mu.Unlock()
+ return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
+ }
+ ac.mu.Unlock()
+ return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
+ }
+ setConnectivityState := func(s connectivity.State) {
ac.mu.Lock()
defer ac.mu.Unlock()
- if ac.transport != newTr {
+ if ac.transport != currentTr {
return
}
- if ok {
- if firstReady {
- firstReady = false
- ac.curAddr = addr
- }
- ac.updateConnectivityState(connectivity.Ready)
- } else {
- ac.updateConnectivityState(connectivity.TransientFailure)
- }
+ ac.updateConnectivityState(s)
}
- err := ac.cc.dopts.healthCheckFunc(ctx, newStream, reportHealth, serviceName)
- if err != nil {
- if status.Code(err) == codes.Unimplemented {
- if channelz.IsOn() {
- channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
- Desc: "Subchannel health check is unimplemented at server side, thus health check is disabled",
- Severity: channelz.CtError,
- })
+ // Start the health checking stream.
+ go func() {
+ err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
+ if err != nil {
+ if status.Code(err) == codes.Unimplemented {
+ if channelz.IsOn() {
+ channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
+ Desc: "Subchannel health check is unimplemented at server side, thus health check is disabled",
+ Severity: channelz.CtError,
+ })
+ }
+ grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
+ } else {
+ grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
}
- grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
- } else {
- grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
}
- }
+ }()
}
func (ac *addrConn) resetConnectBackoff() {
diff --git a/vendor/google.golang.org/grpc/credentials/credentials.go b/vendor/google.golang.org/grpc/credentials/credentials.go
index 88aff94..8ea3d4a 100644
--- a/vendor/google.golang.org/grpc/credentials/credentials.go
+++ b/vendor/google.golang.org/grpc/credentials/credentials.go
@@ -278,24 +278,22 @@
// TLSChannelzSecurityValue defines the struct that TLS protocol should return
// from GetSecurityValue(), containing security info like cipher and certificate used.
type TLSChannelzSecurityValue struct {
+ ChannelzSecurityValue
StandardName string
LocalCertificate []byte
RemoteCertificate []byte
}
-func (*TLSChannelzSecurityValue) isChannelzSecurityValue() {}
-
// OtherChannelzSecurityValue defines the struct that non-TLS protocol should return
// from GetSecurityValue(), which contains protocol specific security info. Note
// the Value field will be sent to users of channelz requesting channel info, and
// thus sensitive info should better be avoided.
type OtherChannelzSecurityValue struct {
+ ChannelzSecurityValue
Name string
Value proto.Message
}
-func (*OtherChannelzSecurityValue) isChannelzSecurityValue() {}
-
var cipherSuiteLookup = map[uint16]string{
tls.TLS_RSA_WITH_RC4_128_SHA: "TLS_RSA_WITH_RC4_128_SHA",
tls.TLS_RSA_WITH_3DES_EDE_CBC_SHA: "TLS_RSA_WITH_3DES_EDE_CBC_SHA",
diff --git a/vendor/google.golang.org/grpc/dialoptions.go b/vendor/google.golang.org/grpc/dialoptions.go
index 69c0031..e8f34d0 100644
--- a/vendor/google.golang.org/grpc/dialoptions.go
+++ b/vendor/google.golang.org/grpc/dialoptions.go
@@ -60,7 +60,6 @@
balancerBuilder balancer.Builder
// This is to support grpclb.
resolverBuilder resolver.Builder
- reqHandshake envconfig.RequireHandshakeSetting
channelzParentID int64
disableServiceConfig bool
disableRetry bool
@@ -100,17 +99,6 @@
}
}
-// WithWaitForHandshake blocks until the initial settings frame is received from
-// the server before assigning RPCs to the connection.
-//
-// Deprecated: this is the default behavior, and this option will be removed
-// after the 1.18 release.
-func WithWaitForHandshake() DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.reqHandshake = envconfig.RequireHandshakeOn
- })
-}
-
// WithWriteBufferSize determines how much data can be batched before doing a
// write on the wire. The corresponding memory allocation for this buffer will
// be twice the size to keep syscalls low. The default value for this buffer is
@@ -156,7 +144,8 @@
// WithMaxMsgSize returns a DialOption which sets the maximum message size the
// client can receive.
//
-// Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead.
+// Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead. Will
+// be supported throughout 1.x.
func WithMaxMsgSize(s int) DialOption {
return WithDefaultCallOptions(MaxCallRecvMsgSize(s))
}
@@ -172,7 +161,8 @@
// WithCodec returns a DialOption which sets a codec for message marshaling and
// unmarshaling.
//
-// Deprecated: use WithDefaultCallOptions(ForceCodec(_)) instead.
+// Deprecated: use WithDefaultCallOptions(ForceCodec(_)) instead. Will be
+// supported throughout 1.x.
func WithCodec(c Codec) DialOption {
return WithDefaultCallOptions(CallCustomCodec(c))
}
@@ -181,7 +171,7 @@
// message compression. It has lower priority than the compressor set by the
// UseCompressor CallOption.
//
-// Deprecated: use UseCompressor instead.
+// Deprecated: use UseCompressor instead. Will be supported throughout 1.x.
func WithCompressor(cp Compressor) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.cp = cp
@@ -196,7 +186,8 @@
// message. If no compressor is registered for the encoding, an Unimplemented
// status error will be returned.
//
-// Deprecated: use encoding.RegisterCompressor instead.
+// Deprecated: use encoding.RegisterCompressor instead. Will be supported
+// throughout 1.x.
func WithDecompressor(dc Decompressor) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.dc = dc
@@ -207,7 +198,7 @@
// Name resolver will be ignored if this DialOption is specified.
//
// Deprecated: use the new balancer APIs in balancer package and
-// WithBalancerName.
+// WithBalancerName. Will be removed in a future 1.x release.
func WithBalancer(b Balancer) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.balancerBuilder = &balancerWrapperBuilder{
@@ -223,7 +214,8 @@
// The balancer cannot be overridden by balancer option specified by service
// config.
//
-// This is an EXPERIMENTAL API.
+// Deprecated: use WithDefaultServiceConfig and WithDisableServiceConfig
+// instead. Will be removed in a future 1.x release.
func WithBalancerName(balancerName string) DialOption {
builder := balancer.Get(balancerName)
if builder == nil {
@@ -244,9 +236,10 @@
// WithServiceConfig returns a DialOption which has a channel to read the
// service configuration.
//
-// Deprecated: service config should be received through name resolver, as
-// specified here.
-// https://github.com/grpc/grpc/blob/master/doc/service_config.md
+// Deprecated: service config should be received through name resolver or via
+// WithDefaultServiceConfig, as specified at
+// https://github.com/grpc/grpc/blob/master/doc/service_config.md. Will be
+// removed in a future 1.x release.
func WithServiceConfig(c <-chan ServiceConfig) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.scChan = c
@@ -329,7 +322,8 @@
// WithTimeout returns a DialOption that configures a timeout for dialing a
// ClientConn initially. This is valid if and only if WithBlock() is present.
//
-// Deprecated: use DialContext and context.WithTimeout instead.
+// Deprecated: use DialContext and context.WithTimeout instead. Will be
+// supported throughout 1.x.
func WithTimeout(d time.Duration) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.timeout = d
@@ -356,7 +350,8 @@
// is returned by f, gRPC checks the error's Temporary() method to decide if it
// should try to reconnect to the network address.
//
-// Deprecated: use WithContextDialer instead
+// Deprecated: use WithContextDialer instead. Will be supported throughout
+// 1.x.
func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
return WithContextDialer(
func(ctx context.Context, addr string) (net.Conn, error) {
@@ -480,8 +475,10 @@
// WithDefaultServiceConfig returns a DialOption that configures the default
// service config, which will be used in cases where:
-// 1. WithDisableServiceConfig is called.
-// 2. Resolver does not return service config or if the resolver gets and invalid config.
+//
+// 1. WithDisableServiceConfig is also used.
+// 2. Resolver does not return a service config or if the resolver returns an
+// invalid service config.
//
// This API is EXPERIMENTAL.
func WithDefaultServiceConfig(s string) DialOption {
@@ -537,7 +534,6 @@
func defaultDialOptions() dialOptions {
return dialOptions{
disableRetry: !envconfig.Retry,
- reqHandshake: envconfig.RequireHandshake,
healthCheckFunc: internal.HealthCheckFunc,
copts: transport.ConnectOptions{
WriteBufferSize: defaultWriteBufSize,
diff --git a/vendor/google.golang.org/grpc/go.mod b/vendor/google.golang.org/grpc/go.mod
index b75c069..c7f3fa3 100644
--- a/vendor/google.golang.org/grpc/go.mod
+++ b/vendor/google.golang.org/grpc/go.mod
@@ -1,20 +1,21 @@
module google.golang.org/grpc
+go 1.11
+
require (
cloud.google.com/go v0.26.0 // indirect
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/client9/misspell v0.3.4
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/mock v1.1.1
- github.com/golang/protobuf v1.2.0
+ github.com/golang/protobuf v1.3.2
github.com/google/go-cmp v0.2.0
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3
golang.org/x/net v0.0.0-20190311183353-d8887717615a
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be
- golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f // indirect
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a
- golang.org/x/tools v0.0.0-20190311212946-11955173bddd
+ golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135
google.golang.org/appengine v1.1.0 // indirect
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8
- honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099
+ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc
)
diff --git a/vendor/google.golang.org/grpc/go.sum b/vendor/google.golang.org/grpc/go.sum
index 2a17234..7faff49 100644
--- a/vendor/google.golang.org/grpc/go.sum
+++ b/vendor/google.golang.org/grpc/go.sum
@@ -8,8 +8,8 @@
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1 h1:G5FRp8JnTd7RQH5kemVNlMeyXQAztQ3mOWV95KxsXH8=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
-github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
-github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
+github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
@@ -19,17 +19,19 @@
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be h1:vEDujvNQGv4jgYKudGeI/+DAX4Jffq6hpD55MmoEvKs=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
-golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA=
-golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd h1:/e+gpKk9r3dJobndpTytxS2gOy6m5uvpg+ISQoEcusQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 h1:5Beo0mZN8dRzgrMMkDp0jc8YXQKx9DiJ2k1dkvGsn5A=
+golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
google.golang.org/appengine v1.1.0 h1:igQkv0AAhEIvTEpD5LIpAfav2eeVO9HBTjvKHVJPRSs=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
-honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099 h1:XJP7lxbSxWLOMNdBE4B/STaqVy6L73o0knwj2vIlxnw=
-honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
+honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc h1:/hemPrYIhOhy8zYrNj+069zDB68us2sMGsfkFJO0iZs=
+honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
diff --git a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
index 11be7cd..3ee8740 100644
--- a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
+++ b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
@@ -25,40 +25,11 @@
)
const (
- prefix = "GRPC_GO_"
- retryStr = prefix + "RETRY"
- requireHandshakeStr = prefix + "REQUIRE_HANDSHAKE"
-)
-
-// RequireHandshakeSetting describes the settings for handshaking.
-type RequireHandshakeSetting int
-
-const (
- // RequireHandshakeOn indicates to wait for handshake before considering a
- // connection ready/successful.
- RequireHandshakeOn RequireHandshakeSetting = iota
- // RequireHandshakeOff indicates to not wait for handshake before
- // considering a connection ready/successful.
- RequireHandshakeOff
+ prefix = "GRPC_GO_"
+ retryStr = prefix + "RETRY"
)
var (
// Retry is set if retry is explicitly enabled via "GRPC_GO_RETRY=on".
Retry = strings.EqualFold(os.Getenv(retryStr), "on")
- // RequireHandshake is set based upon the GRPC_GO_REQUIRE_HANDSHAKE
- // environment variable.
- //
- // Will be removed after the 1.18 release.
- RequireHandshake = RequireHandshakeOn
)
-
-func init() {
- switch strings.ToLower(os.Getenv(requireHandshakeStr)) {
- case "on":
- fallthrough
- default:
- RequireHandshake = RequireHandshakeOn
- case "off":
- RequireHandshake = RequireHandshakeOff
- }
-}
diff --git a/vendor/google.golang.org/grpc/internal/internal.go b/vendor/google.golang.org/grpc/internal/internal.go
index c1d2c69..bc1f99a 100644
--- a/vendor/google.golang.org/grpc/internal/internal.go
+++ b/vendor/google.golang.org/grpc/internal/internal.go
@@ -23,6 +23,8 @@
import (
"context"
"time"
+
+ "google.golang.org/grpc/connectivity"
)
var (
@@ -37,10 +39,25 @@
// KeepaliveMinPingTime is the minimum ping interval. This must be 10s by
// default, but tests may wish to set it lower for convenience.
KeepaliveMinPingTime = 10 * time.Second
+ // ParseServiceConfig is a function to parse JSON service configs into
+ // opaque data structures.
+ ParseServiceConfig func(sc string) (interface{}, error)
+ // StatusRawProto is exported by status/status.go. This func returns a
+ // pointer to the wrapped Status proto for a given status.Status without a
+ // call to proto.Clone(). The returned Status proto should not be mutated by
+ // the caller.
+ StatusRawProto interface{} // func (*status.Status) *spb.Status
)
// HealthChecker defines the signature of the client-side LB channel health checking function.
-type HealthChecker func(ctx context.Context, newStream func() (interface{}, error), reportHealth func(bool), serviceName string) error
+//
+// The implementation is expected to create a health checking RPC stream by
+// calling newStream(), watch for the health status of serviceName, and report
+// it's health back by calling setConnectivityState().
+//
+// The health checking protocol is defined at:
+// https://github.com/grpc/grpc/blob/master/doc/health-checking.md
+type HealthChecker func(ctx context.Context, newStream func(string) (interface{}, error), setConnectivityState func(connectivity.State), serviceName string) error
const (
// CredsBundleModeFallback switches GoogleDefaultCreds to fallback mode.
diff --git a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
index 204ba15..ddee20b 100644
--- a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
+++ b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
@@ -23,6 +23,7 @@
"fmt"
"runtime"
"sync"
+ "sync/atomic"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
@@ -84,24 +85,40 @@
// the control buffer of transport. They represent different aspects of
// control tasks, e.g., flow control, settings, streaming resetting, etc.
+// maxQueuedTransportResponseFrames is the most queued "transport response"
+// frames we will buffer before preventing new reads from occurring on the
+// transport. These are control frames sent in response to client requests,
+// such as RST_STREAM due to bad headers or settings acks.
+const maxQueuedTransportResponseFrames = 50
+
+type cbItem interface {
+ isTransportResponseFrame() bool
+}
+
// registerStream is used to register an incoming stream with loopy writer.
type registerStream struct {
streamID uint32
wq *writeQuota
}
+func (*registerStream) isTransportResponseFrame() bool { return false }
+
// headerFrame is also used to register stream on the client-side.
type headerFrame struct {
streamID uint32
hf []hpack.HeaderField
- endStream bool // Valid on server side.
- initStream func(uint32) (bool, error) // Used only on the client side.
+ endStream bool // Valid on server side.
+ initStream func(uint32) error // Used only on the client side.
onWrite func()
wq *writeQuota // write quota for the stream created.
cleanup *cleanupStream // Valid on the server side.
onOrphaned func(error) // Valid on client-side
}
+func (h *headerFrame) isTransportResponseFrame() bool {
+ return h.cleanup != nil && h.cleanup.rst // Results in a RST_STREAM
+}
+
type cleanupStream struct {
streamID uint32
rst bool
@@ -109,6 +126,8 @@
onWrite func()
}
+func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
+
type dataFrame struct {
streamID uint32
endStream bool
@@ -119,27 +138,41 @@
onEachWrite func()
}
+func (*dataFrame) isTransportResponseFrame() bool { return false }
+
type incomingWindowUpdate struct {
streamID uint32
increment uint32
}
+func (*incomingWindowUpdate) isTransportResponseFrame() bool { return false }
+
type outgoingWindowUpdate struct {
streamID uint32
increment uint32
}
+func (*outgoingWindowUpdate) isTransportResponseFrame() bool {
+ return false // window updates are throttled by thresholds
+}
+
type incomingSettings struct {
ss []http2.Setting
}
+func (*incomingSettings) isTransportResponseFrame() bool { return true } // Results in a settings ACK
+
type outgoingSettings struct {
ss []http2.Setting
}
+func (*outgoingSettings) isTransportResponseFrame() bool { return false }
+
type incomingGoAway struct {
}
+func (*incomingGoAway) isTransportResponseFrame() bool { return false }
+
type goAway struct {
code http2.ErrCode
debugData []byte
@@ -147,15 +180,21 @@
closeConn bool
}
+func (*goAway) isTransportResponseFrame() bool { return false }
+
type ping struct {
ack bool
data [8]byte
}
+func (*ping) isTransportResponseFrame() bool { return true }
+
type outFlowControlSizeRequest struct {
resp chan uint32
}
+func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }
+
type outStreamState int
const (
@@ -238,6 +277,14 @@
consumerWaiting bool
list *itemList
err error
+
+ // transportResponseFrames counts the number of queued items that represent
+ // the response of an action initiated by the peer. trfChan is created
+ // when transportResponseFrames >= maxQueuedTransportResponseFrames and is
+ // closed and nilled when transportResponseFrames drops below the
+ // threshold. Both fields are protected by mu.
+ transportResponseFrames int
+ trfChan atomic.Value // *chan struct{}
}
func newControlBuffer(done <-chan struct{}) *controlBuffer {
@@ -248,12 +295,24 @@
}
}
-func (c *controlBuffer) put(it interface{}) error {
+// throttle blocks if there are too many incomingSettings/cleanupStreams in the
+// controlbuf.
+func (c *controlBuffer) throttle() {
+ ch, _ := c.trfChan.Load().(*chan struct{})
+ if ch != nil {
+ select {
+ case <-*ch:
+ case <-c.done:
+ }
+ }
+}
+
+func (c *controlBuffer) put(it cbItem) error {
_, err := c.executeAndPut(nil, it)
return err
}
-func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it interface{}) (bool, error) {
+func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (bool, error) {
var wakeUp bool
c.mu.Lock()
if c.err != nil {
@@ -271,6 +330,15 @@
c.consumerWaiting = false
}
c.list.enqueue(it)
+ if it.isTransportResponseFrame() {
+ c.transportResponseFrames++
+ if c.transportResponseFrames == maxQueuedTransportResponseFrames {
+ // We are adding the frame that puts us over the threshold; create
+ // a throttling channel.
+ ch := make(chan struct{})
+ c.trfChan.Store(&ch)
+ }
+ }
c.mu.Unlock()
if wakeUp {
select {
@@ -304,7 +372,17 @@
return nil, c.err
}
if !c.list.isEmpty() {
- h := c.list.dequeue()
+ h := c.list.dequeue().(cbItem)
+ if h.isTransportResponseFrame() {
+ if c.transportResponseFrames == maxQueuedTransportResponseFrames {
+ // We are removing the frame that put us over the
+ // threshold; close and clear the throttling channel.
+ ch := c.trfChan.Load().(*chan struct{})
+ close(*ch)
+ c.trfChan.Store((*chan struct{})(nil))
+ }
+ c.transportResponseFrames--
+ }
c.mu.Unlock()
return h, nil
}
@@ -559,21 +637,17 @@
func (l *loopyWriter) originateStream(str *outStream) error {
hdr := str.itl.dequeue().(*headerFrame)
- sendPing, err := hdr.initStream(str.id)
- if err != nil {
+ if err := hdr.initStream(str.id); err != nil {
if err == ErrConnClosing {
return err
}
// Other errors(errStreamDrain) need not close transport.
return nil
}
- if err = l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
+ if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
return err
}
l.estdStreams[str.id] = str
- if sendPing {
- return l.pingHandler(&ping{data: [8]byte{}})
- }
return nil
}
diff --git a/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go b/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go
index 5ea997a..f262edd 100644
--- a/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go
+++ b/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go
@@ -149,6 +149,7 @@
n = uint32(math.MaxInt32)
}
f.mu.Lock()
+ defer f.mu.Unlock()
// estSenderQuota is the receiver's view of the maximum number of bytes the sender
// can send without a window update.
estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate))
@@ -169,10 +170,8 @@
// is padded; We will fallback on the current available window(at least a 1/4th of the limit).
f.delta = n
}
- f.mu.Unlock()
return f.delta
}
- f.mu.Unlock()
return 0
}
diff --git a/vendor/google.golang.org/grpc/internal/transport/handler_server.go b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
index f2de84d..78f9ddc 100644
--- a/vendor/google.golang.org/grpc/internal/transport/handler_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
@@ -24,6 +24,7 @@
package transport
import (
+ "bytes"
"context"
"errors"
"fmt"
@@ -347,7 +348,7 @@
ht.stats.HandleRPC(s.ctx, inHeader)
}
s.trReader = &transportReader{
- reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf},
+ reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf, freeBuffer: func(*bytes.Buffer) {}},
windowHandler: func(int) {},
}
@@ -361,7 +362,7 @@
for buf := make([]byte, readSize); ; {
n, err := req.Body.Read(buf)
if n > 0 {
- s.buf.put(recvMsg{data: buf[:n:n]})
+ s.buf.put(recvMsg{buffer: bytes.NewBuffer(buf[:n:n])})
buf = buf[n:]
}
if err != nil {
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
index 91e446f..9bd8c27 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
@@ -62,8 +62,6 @@
// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
// that the server sent GoAway on this transport.
goAway chan struct{}
- // awakenKeepalive is used to wake up keepalive when after it has gone dormant.
- awakenKeepalive chan struct{}
framer *framer
// controlBuf delivers all the control related tasks (e.g., window
@@ -110,6 +108,16 @@
// goAwayReason records the http2.ErrCode and debug data received with the
// GoAway frame.
goAwayReason GoAwayReason
+ // A condition variable used to signal when the keepalive goroutine should
+ // go dormant. The condition for dormancy is based on the number of active
+ // streams and the `PermitWithoutStream` keepalive client parameter. And
+ // since the number of active streams is guarded by the above mutex, we use
+ // the same for this condition variable as well.
+ kpDormancyCond *sync.Cond
+ // A boolean to track whether the keepalive goroutine is dormant or not.
+ // This is checked before attempting to signal the above condition
+ // variable.
+ kpDormant bool
// Fields below are for channelz metric collection.
channelzID int64 // channelz unique identification number
@@ -117,6 +125,8 @@
onGoAway func(GoAwayReason)
onClose func()
+
+ bufferPool *bufferPool
}
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
@@ -230,7 +240,6 @@
readerDone: make(chan struct{}),
writerDone: make(chan struct{}),
goAway: make(chan struct{}),
- awakenKeepalive: make(chan struct{}, 1),
framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
fc: &trInFlow{limit: uint32(icwz)},
scheme: scheme,
@@ -249,6 +258,7 @@
onGoAway: onGoAway,
onClose: onClose,
keepaliveEnabled: keepaliveEnabled,
+ bufferPool: newBufferPool(),
}
t.controlBuf = newControlBuffer(t.ctxDone)
if opts.InitialWindowSize >= defaultWindowSize {
@@ -261,9 +271,6 @@
updateFlowControl: t.updateFlowControl,
}
}
- // Make sure awakenKeepalive can't be written upon.
- // keepalive routine will make it writable, if need be.
- t.awakenKeepalive <- struct{}{}
if t.statsHandler != nil {
t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
@@ -278,6 +285,7 @@
t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
}
if t.keepaliveEnabled {
+ t.kpDormancyCond = sync.NewCond(&t.mu)
go t.keepalive()
}
// Start the reader goroutine for incoming message. Each transport has
@@ -367,6 +375,7 @@
closeStream: func(err error) {
t.CloseStream(s, err)
},
+ freeBuffer: t.bufferPool.put,
},
windowHandler: func(n int) {
t.updateWindow(s, uint32(n))
@@ -437,6 +446,15 @@
if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
var k string
+ for k, vv := range md {
+ // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
+ if isReservedHeader(k) {
+ continue
+ }
+ for _, v := range vv {
+ headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
+ }
+ }
for _, vv := range added {
for i, v := range vv {
if i%2 == 0 {
@@ -450,15 +468,6 @@
headerFields = append(headerFields, hpack.HeaderField{Name: strings.ToLower(k), Value: encodeMetadataHeader(k, v)})
}
}
- for k, vv := range md {
- // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
- if isReservedHeader(k) {
- continue
- }
- for _, v := range vv {
- headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
- }
- }
}
if md, ok := t.md.(*metadata.MD); ok {
for k, vv := range *md {
@@ -489,6 +498,9 @@
}
func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) {
+ if len(t.perRPCCreds) == 0 {
+ return nil, nil
+ }
authData := map[string]string{}
for _, c := range t.perRPCCreds {
data, err := c.GetRequestMetadata(ctx, audience)
@@ -509,7 +521,7 @@
}
func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) {
- callAuthData := map[string]string{}
+ var callAuthData map[string]string
// Check if credentials.PerRPCCredentials were provided via call options.
// Note: if these credentials are provided both via dial options and call
// options, then both sets of credentials will be applied.
@@ -521,6 +533,7 @@
if err != nil {
return nil, status.Errorf(codes.Internal, "transport: %v", err)
}
+ callAuthData = make(map[string]string, len(data))
for k, v := range data {
// Capital header names are illegal in HTTP/2
k = strings.ToLower(k)
@@ -549,15 +562,14 @@
s.write(recvMsg{err: err})
close(s.done)
// If headerChan isn't closed, then close it.
- if atomic.SwapUint32(&s.headerDone, 1) == 0 {
+ if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
close(s.headerChan)
}
-
}
hdr := &headerFrame{
hf: headerFields,
endStream: false,
- initStream: func(id uint32) (bool, error) {
+ initStream: func(id uint32) error {
t.mu.Lock()
if state := t.state; state != reachable {
t.mu.Unlock()
@@ -567,29 +579,19 @@
err = ErrConnClosing
}
cleanup(err)
- return false, err
+ return err
}
t.activeStreams[id] = s
if channelz.IsOn() {
atomic.AddInt64(&t.czData.streamsStarted, 1)
atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
}
- var sendPing bool
- // If the number of active streams change from 0 to 1, then check if keepalive
- // has gone dormant. If so, wake it up.
- if len(t.activeStreams) == 1 && t.keepaliveEnabled {
- select {
- case t.awakenKeepalive <- struct{}{}:
- sendPing = true
- // Fill the awakenKeepalive channel again as this channel must be
- // kept non-writable except at the point that the keepalive()
- // goroutine is waiting either to be awaken or shutdown.
- t.awakenKeepalive <- struct{}{}
- default:
- }
+ // If the keepalive goroutine has gone dormant, wake it up.
+ if t.kpDormant {
+ t.kpDormancyCond.Signal()
}
t.mu.Unlock()
- return sendPing, nil
+ return nil
},
onOrphaned: cleanup,
wq: s.wq,
@@ -713,7 +715,7 @@
s.write(recvMsg{err: err})
}
// If headerChan isn't closed, then close it.
- if atomic.SwapUint32(&s.headerDone, 1) == 0 {
+ if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
s.noHeaders = true
close(s.headerChan)
}
@@ -765,9 +767,17 @@
t.mu.Unlock()
return nil
}
+ // Call t.onClose before setting the state to closing to prevent the client
+ // from attempting to create new streams ASAP.
+ t.onClose()
t.state = closing
streams := t.activeStreams
t.activeStreams = nil
+ if t.kpDormant {
+ // If the keepalive goroutine is blocked on this condition variable, we
+ // should unblock it so that the goroutine eventually exits.
+ t.kpDormancyCond.Signal()
+ }
t.mu.Unlock()
t.controlBuf.finish()
t.cancel()
@@ -785,7 +795,6 @@
}
t.statsHandler.HandleConn(t.ctx, connEnd)
}
- t.onClose()
return err
}
@@ -844,11 +853,11 @@
return t.controlBuf.put(df)
}
-func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
+func (t *http2Client) getStream(f http2.Frame) *Stream {
t.mu.Lock()
- defer t.mu.Unlock()
- s, ok := t.activeStreams[f.Header().StreamID]
- return s, ok
+ s := t.activeStreams[f.Header().StreamID]
+ t.mu.Unlock()
+ return s
}
// adjustWindow sends out extra window update over the initial window size
@@ -928,8 +937,8 @@
t.controlBuf.put(bdpPing)
}
// Select the right stream to dispatch.
- s, ok := t.getStream(f)
- if !ok {
+ s := t.getStream(f)
+ if s == nil {
return
}
if size > 0 {
@@ -946,9 +955,10 @@
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
if len(f.Data()) > 0 {
- data := make([]byte, len(f.Data()))
- copy(data, f.Data())
- s.write(recvMsg{data: data})
+ buffer := t.bufferPool.get()
+ buffer.Reset()
+ buffer.Write(f.Data())
+ s.write(recvMsg{buffer: buffer})
}
}
// The server has closed the stream without sending trailers. Record that
@@ -959,8 +969,8 @@
}
func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
- s, ok := t.getStream(f)
- if !ok {
+ s := t.getStream(f)
+ if s == nil {
return
}
if f.ErrCode == http2.ErrCodeRefusedStream {
@@ -973,9 +983,9 @@
statusCode = codes.Unknown
}
if statusCode == codes.Canceled {
- // Our deadline was already exceeded, and that was likely the cause of
- // this cancelation. Alter the status code accordingly.
- if d, ok := s.ctx.Deadline(); ok && d.After(time.Now()) {
+ if d, ok := s.ctx.Deadline(); ok && !d.After(time.Now()) {
+ // Our deadline was already exceeded, and that was likely the cause
+ // of this cancelation. Alter the status code accordingly.
statusCode = codes.DeadlineExceeded
}
}
@@ -1080,11 +1090,12 @@
default:
t.setGoAwayReason(f)
close(t.goAway)
- t.state = draining
t.controlBuf.put(&incomingGoAway{})
-
- // This has to be a new goroutine because we're still using the current goroutine to read in the transport.
+ // Notify the clientconn about the GOAWAY before we set the state to
+ // draining, to allow the client to stop attempting to create streams
+ // before disallowing new streams on this connection.
t.onGoAway(t.goAwayReason)
+ t.state = draining
}
// All streams with IDs greater than the GoAwayId
// and smaller than the previous GoAway ID should be killed.
@@ -1136,32 +1147,30 @@
// operateHeaders takes action on the decoded headers.
func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
- s, ok := t.getStream(frame)
- if !ok {
+ s := t.getStream(frame)
+ if s == nil {
return
}
endStream := frame.StreamEnded()
atomic.StoreUint32(&s.bytesReceived, 1)
- initialHeader := atomic.SwapUint32(&s.headerDone, 1) == 0
+ initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0
if !initialHeader && !endStream {
- // As specified by RFC 7540, a HEADERS frame (and associated CONTINUATION frames) can only appear
- // at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
+ // As specified by gRPC over HTTP2, a HEADERS frame (and associated CONTINUATION frames) can only appear at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
st := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream")
t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, false)
return
}
state := &decodeState{}
- // Initialize isGRPC value to be !initialHeader, since if a gRPC ResponseHeader has been received
- // which indicates peer speaking gRPC, we are in gRPC mode.
+ // Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode.
state.data.isGRPC = !initialHeader
if err := state.decodeHeader(frame); err != nil {
t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream)
return
}
- var isHeader bool
+ isHeader := false
defer func() {
if t.statsHandler != nil {
if isHeader {
@@ -1180,10 +1189,10 @@
}
}()
- // If headers haven't been received yet.
- if initialHeader {
+ // If headerChan hasn't been closed yet
+ if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
if !endStream {
- // Headers frame is ResponseHeader.
+ // HEADERS frame block carries a Response-Headers.
isHeader = true
// These values can be set without any synchronization because
// stream goroutine will read it only after seeing a closed
@@ -1192,14 +1201,17 @@
if len(state.data.mdata) > 0 {
s.header = state.data.mdata
}
- close(s.headerChan)
- return
+ } else {
+ // HEADERS frame block carries a Trailers-Only.
+ s.noHeaders = true
}
- // Headers frame is Trailers-only.
- s.noHeaders = true
close(s.headerChan)
}
+ if !endStream {
+ return
+ }
+
// if client received END_STREAM from server while stream was still active, send RST_STREAM
rst := s.getState() == streamActive
t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true)
@@ -1233,6 +1245,7 @@
// loop to keep reading incoming messages on this transport.
for {
+ t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame()
if t.keepaliveEnabled {
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
@@ -1290,29 +1303,32 @@
timer.Reset(t.kp.Time)
continue
}
- // Check if keepalive should go dormant.
t.mu.Lock()
- if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
- // Make awakenKeepalive writable.
- <-t.awakenKeepalive
+ if t.state == closing {
+ // If the transport is closing, we should exit from the
+ // keepalive goroutine here. If not, we could have a race
+ // between the call to Signal() from Close() and the call to
+ // Wait() here, whereby the keepalive goroutine ends up
+ // blocking on the condition variable which will never be
+ // signalled again.
t.mu.Unlock()
- select {
- case <-t.awakenKeepalive:
- // If the control gets here a ping has been sent
- // need to reset the timer with keepalive.Timeout.
- case <-t.ctx.Done():
- return
- }
- } else {
- t.mu.Unlock()
- if channelz.IsOn() {
- atomic.AddInt64(&t.czData.kpCount, 1)
- }
- // Send ping.
- t.controlBuf.put(p)
+ return
}
+ if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
+ t.kpDormant = true
+ t.kpDormancyCond.Wait()
+ }
+ t.kpDormant = false
+ t.mu.Unlock()
- // By the time control gets here a ping has been sent one way or the other.
+ if channelz.IsOn() {
+ atomic.AddInt64(&t.czData.kpCount, 1)
+ }
+ // We get here either because we were dormant and a new stream was
+ // created which unblocked the Wait() call, or because the
+ // keepalive timer expired. In both cases, we need to send a ping.
+ t.controlBuf.put(p)
+
timer.Reset(t.kp.Timeout)
select {
case <-timer.C:
@@ -1320,6 +1336,7 @@
timer.Reset(t.kp.Time)
continue
}
+ infof("transport: closing client transport due to idleness.")
t.Close()
return
case <-t.ctx.Done():
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
index 435092e..33686a1 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
@@ -35,9 +35,11 @@
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
+ spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/keepalive"
@@ -55,13 +57,15 @@
// ErrHeaderListSizeLimitViolation indicates that the header list size is larger
// than the limit set by peer.
ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
+ // statusRawProto is a function to get to the raw status proto wrapped in a
+ // status.Status without a proto.Clone().
+ statusRawProto = internal.StatusRawProto.(func(*status.Status) *spb.Status)
)
// http2Server implements the ServerTransport interface with HTTP2.
type http2Server struct {
ctx context.Context
- ctxDone <-chan struct{} // Cache the context.Done() chan
- cancel context.CancelFunc
+ done chan struct{}
conn net.Conn
loopy *loopyWriter
readerDone chan struct{} // sync point to enable testing.
@@ -119,6 +123,7 @@
// Fields below are for channelz metric collection.
channelzID int64 // channelz unique identification number
czData *channelzData
+ bufferPool *bufferPool
}
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
@@ -132,7 +137,10 @@
}
framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
// Send initial settings as connection preface to client.
- var isettings []http2.Setting
+ isettings := []http2.Setting{{
+ ID: http2.SettingMaxFrameSize,
+ Val: http2MaxFrameLen,
+ }}
// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
// permitted in the HTTP2 spec.
maxStreams := config.MaxStreams
@@ -197,11 +205,10 @@
if kep.MinTime == 0 {
kep.MinTime = defaultKeepalivePolicyMinTime
}
- ctx, cancel := context.WithCancel(context.Background())
+ done := make(chan struct{})
t := &http2Server{
- ctx: ctx,
- cancel: cancel,
- ctxDone: ctx.Done(),
+ ctx: context.Background(),
+ done: done,
conn: conn,
remoteAddr: conn.RemoteAddr(),
localAddr: conn.LocalAddr(),
@@ -220,8 +227,9 @@
kep: kep,
initialWindowSize: iwz,
czData: new(channelzData),
+ bufferPool: newBufferPool(),
}
- t.controlBuf = newControlBuffer(t.ctxDone)
+ t.controlBuf = newControlBuffer(t.done)
if dynamicWindow {
t.bdpEst = &bdpEstimator{
bdp: initialWindowSize,
@@ -352,12 +360,14 @@
rstCode: http2.ErrCodeRefusedStream,
onWrite: func() {},
})
+ s.cancel()
return false
}
}
t.mu.Lock()
if t.state != reachable {
t.mu.Unlock()
+ s.cancel()
return false
}
if uint32(len(t.activeStreams)) >= t.maxStreams {
@@ -368,12 +378,14 @@
rstCode: http2.ErrCodeRefusedStream,
onWrite: func() {},
})
+ s.cancel()
return false
}
if streamID%2 != 1 || streamID <= t.maxStreamID {
t.mu.Unlock()
// illegal gRPC stream id.
errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
+ s.cancel()
return true
}
t.maxStreamID = streamID
@@ -405,9 +417,10 @@
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
s.trReader = &transportReader{
reader: &recvBufferReader{
- ctx: s.ctx,
- ctxDone: s.ctxDone,
- recv: s.buf,
+ ctx: s.ctx,
+ ctxDone: s.ctxDone,
+ recv: s.buf,
+ freeBuffer: t.bufferPool.put,
},
windowHandler: func(n int) {
t.updateWindow(s, uint32(n))
@@ -428,6 +441,7 @@
func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
defer close(t.readerDone)
for {
+ t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame()
atomic.StoreUint32(&t.activity, 1)
if err != nil {
@@ -591,9 +605,10 @@
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
if len(f.Data()) > 0 {
- data := make([]byte, len(f.Data()))
- copy(data, f.Data())
- s.write(recvMsg{data: data})
+ buffer := t.bufferPool.get()
+ buffer.Reset()
+ buffer.Write(f.Data())
+ s.write(recvMsg{buffer: buffer})
}
}
if f.Header().Flags.Has(http2.FlagDataEndStream) {
@@ -757,6 +772,10 @@
return nil
}
+func (t *http2Server) setResetPingStrikes() {
+ atomic.StoreUint32(&t.resetPingStrikes, 1)
+}
+
func (t *http2Server) writeHeaderLocked(s *Stream) error {
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
// first and create a slice of that exact size.
@@ -771,9 +790,7 @@
streamID: s.id,
hf: headerFields,
endStream: false,
- onWrite: func() {
- atomic.StoreUint32(&t.resetPingStrikes, 1)
- },
+ onWrite: t.setResetPingStrikes,
})
if !success {
if err != nil {
@@ -817,7 +834,7 @@
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
- if p := st.Proto(); p != nil && len(p.Details) > 0 {
+ if p := statusRawProto(st); p != nil && len(p.Details) > 0 {
stBytes, err := proto.Marshal(p)
if err != nil {
// TODO: return error instead, when callers are able to handle it.
@@ -833,9 +850,7 @@
streamID: s.id,
hf: headerFields,
endStream: true,
- onWrite: func() {
- atomic.StoreUint32(&t.resetPingStrikes, 1)
- },
+ onWrite: t.setResetPingStrikes,
}
s.hdrMu.Unlock()
success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
@@ -872,7 +887,7 @@
// TODO(mmukhi, dfawley): Should the server write also return io.EOF?
s.cancel()
select {
- case <-t.ctx.Done():
+ case <-t.done:
return ErrConnClosing
default:
}
@@ -887,16 +902,14 @@
hdr = append(hdr, data[:emptyLen]...)
data = data[emptyLen:]
df := &dataFrame{
- streamID: s.id,
- h: hdr,
- d: data,
- onEachWrite: func() {
- atomic.StoreUint32(&t.resetPingStrikes, 1)
- },
+ streamID: s.id,
+ h: hdr,
+ d: data,
+ onEachWrite: t.setResetPingStrikes,
}
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
select {
- case <-t.ctx.Done():
+ case <-t.done:
return ErrConnClosing
default:
}
@@ -958,10 +971,11 @@
select {
case <-maxAge.C:
// Close the connection after grace period.
+ infof("transport: closing server transport due to maximum connection age.")
t.Close()
// Resetting the timer so that the clean-up doesn't deadlock.
maxAge.Reset(infinity)
- case <-t.ctx.Done():
+ case <-t.done:
}
return
case <-keepalive.C:
@@ -971,6 +985,7 @@
continue
}
if pingSent {
+ infof("transport: closing server transport due to idleness.")
t.Close()
// Resetting the timer so that the clean-up doesn't deadlock.
keepalive.Reset(infinity)
@@ -982,7 +997,7 @@
}
t.controlBuf.put(p)
keepalive.Reset(t.kp.Timeout)
- case <-t.ctx.Done():
+ case <-t.done:
return
}
}
@@ -1002,7 +1017,7 @@
t.activeStreams = nil
t.mu.Unlock()
t.controlBuf.finish()
- t.cancel()
+ close(t.done)
err := t.conn.Close()
if channelz.IsOn() {
channelz.RemoveEntry(t.channelzID)
@@ -1019,13 +1034,7 @@
}
// deleteStream deletes the stream s from transport's active streams.
-func (t *http2Server) deleteStream(s *Stream, eosReceived bool) (oldState streamState) {
- oldState = s.swapState(streamDone)
- if oldState == streamDone {
- // If the stream was already done, return.
- return oldState
- }
-
+func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
// In case stream sending and receiving are invoked in separate
// goroutines (e.g., bi-directional streaming), cancel needs to be
// called to interrupt the potential blocking on other goroutines.
@@ -1047,15 +1056,13 @@
atomic.AddInt64(&t.czData.streamsFailed, 1)
}
}
-
- return oldState
}
// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
- oldState := t.deleteStream(s, eosReceived)
- // If the stream is already closed, then don't put trailing header to controlbuf.
+ oldState := s.swapState(streamDone)
if oldState == streamDone {
+ // If the stream was already done, return.
return
}
@@ -1063,14 +1070,18 @@
streamID: s.id,
rst: rst,
rstCode: rstCode,
- onWrite: func() {},
+ onWrite: func() {
+ t.deleteStream(s, eosReceived)
+ },
}
t.controlBuf.put(hdr)
}
// closeStream clears the footprint of a stream when the stream is not needed any more.
func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
+ s.swapState(streamDone)
t.deleteStream(s, eosReceived)
+
t.controlBuf.put(&cleanupStream{
streamID: s.id,
rst: rst,
@@ -1146,7 +1157,7 @@
select {
case <-t.drainChan:
case <-timer.C:
- case <-t.ctx.Done():
+ case <-t.done:
return
}
t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
@@ -1196,7 +1207,7 @@
select {
case sz := <-resp:
return int64(sz)
- case <-t.ctxDone:
+ case <-t.done:
return -1
case <-timer.C:
return -2
diff --git a/vendor/google.golang.org/grpc/internal/transport/http_util.go b/vendor/google.golang.org/grpc/internal/transport/http_util.go
index 9d21286..8f5f334 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http_util.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http_util.go
@@ -667,6 +667,7 @@
writer: w,
fr: http2.NewFramer(w, r),
}
+ f.fr.SetMaxReadFrameSize(http2MaxFrameLen)
// Opt-in to Frame reuse API on framer to reduce garbage.
// Frames aren't safe to read from after a subsequent call to ReadFrame.
f.fr.SetReuseFrames()
diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go
index 846147a..1c1d106 100644
--- a/vendor/google.golang.org/grpc/internal/transport/transport.go
+++ b/vendor/google.golang.org/grpc/internal/transport/transport.go
@@ -22,6 +22,7 @@
package transport
import (
+ "bytes"
"context"
"errors"
"fmt"
@@ -39,10 +40,32 @@
"google.golang.org/grpc/tap"
)
+type bufferPool struct {
+ pool sync.Pool
+}
+
+func newBufferPool() *bufferPool {
+ return &bufferPool{
+ pool: sync.Pool{
+ New: func() interface{} {
+ return new(bytes.Buffer)
+ },
+ },
+ }
+}
+
+func (p *bufferPool) get() *bytes.Buffer {
+ return p.pool.Get().(*bytes.Buffer)
+}
+
+func (p *bufferPool) put(b *bytes.Buffer) {
+ p.pool.Put(b)
+}
+
// recvMsg represents the received msg from the transport. All transport
// protocol specific info has been removed.
type recvMsg struct {
- data []byte
+ buffer *bytes.Buffer
// nil: received some data
// io.EOF: stream is completed. data is nil.
// other non-nil error: transport failure. data is nil.
@@ -117,8 +140,9 @@
ctx context.Context
ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
recv *recvBuffer
- last []byte // Stores the remaining data in the previous calls.
+ last *bytes.Buffer // Stores the remaining data in the previous calls.
err error
+ freeBuffer func(*bytes.Buffer)
}
// Read reads the next len(p) bytes from last. If last is drained, it tries to
@@ -128,10 +152,13 @@
if r.err != nil {
return 0, r.err
}
- if r.last != nil && len(r.last) > 0 {
+ if r.last != nil {
// Read remaining data left in last call.
- copied := copy(p, r.last)
- r.last = r.last[copied:]
+ copied, _ := r.last.Read(p)
+ if r.last.Len() == 0 {
+ r.freeBuffer(r.last)
+ r.last = nil
+ }
return copied, nil
}
if r.closeStream != nil {
@@ -157,6 +184,19 @@
// r.readAdditional acts on that message and returns the necessary error.
select {
case <-r.ctxDone:
+ // Note that this adds the ctx error to the end of recv buffer, and
+ // reads from the head. This will delay the error until recv buffer is
+ // empty, thus will delay ctx cancellation in Recv().
+ //
+ // It's done this way to fix a race between ctx cancel and trailer. The
+ // race was, stream.Recv() may return ctx error if ctxDone wins the
+ // race, but stream.Trailer() may return a non-nil md because the stream
+ // was not marked as done when trailer is received. This closeStream
+ // call will mark stream as done, thus fix the race.
+ //
+ // TODO: delaying ctx error seems like a unnecessary side effect. What
+ // we really want is to mark the stream as done, and return ctx error
+ // faster.
r.closeStream(ContextErr(r.ctx.Err()))
m := <-r.recv.get()
return r.readAdditional(m, p)
@@ -170,8 +210,13 @@
if m.err != nil {
return 0, m.err
}
- copied := copy(p, m.data)
- r.last = m.data[copied:]
+ copied, _ := m.buffer.Read(p)
+ if m.buffer.Len() == 0 {
+ r.freeBuffer(m.buffer)
+ r.last = nil
+ } else {
+ r.last = m.buffer
+ }
return copied, nil
}
@@ -204,8 +249,8 @@
// is used to adjust flow control, if needed.
requestRead func(int)
- headerChan chan struct{} // closed to indicate the end of header metadata.
- headerDone uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
+ headerChan chan struct{} // closed to indicate the end of header metadata.
+ headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
// hdrMu protects header and trailer metadata on the server-side.
hdrMu sync.Mutex
@@ -266,6 +311,14 @@
}
select {
case <-s.ctx.Done():
+ // We prefer success over failure when reading messages because we delay
+ // context error in stream.Read(). To keep behavior consistent, we also
+ // prefer success here.
+ select {
+ case <-s.headerChan:
+ return nil
+ default:
+ }
return ContextErr(s.ctx.Err())
case <-s.headerChan:
return nil
diff --git a/vendor/google.golang.org/grpc/naming/naming.go b/vendor/google.golang.org/grpc/naming/naming.go
index c99fdbe..f4c1c8b 100644
--- a/vendor/google.golang.org/grpc/naming/naming.go
+++ b/vendor/google.golang.org/grpc/naming/naming.go
@@ -17,9 +17,8 @@
*/
// Package naming defines the naming API and related data structures for gRPC.
-// The interface is EXPERIMENTAL and may be subject to change.
//
-// Deprecated: please use package resolver.
+// This package is deprecated: please use package resolver instead.
package naming
// Operation defines the corresponding operations for a name resolution change.
diff --git a/vendor/google.golang.org/grpc/pickfirst.go b/vendor/google.golang.org/grpc/pickfirst.go
index d1e38aa..ed05b02 100644
--- a/vendor/google.golang.org/grpc/pickfirst.go
+++ b/vendor/google.golang.org/grpc/pickfirst.go
@@ -51,14 +51,18 @@
func (b *pickfirstBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
if err != nil {
- grpclog.Infof("pickfirstBalancer: HandleResolvedAddrs called with error %v", err)
+ if grpclog.V(2) {
+ grpclog.Infof("pickfirstBalancer: HandleResolvedAddrs called with error %v", err)
+ }
return
}
if b.sc == nil {
b.sc, err = b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{})
if err != nil {
//TODO(yuxuanli): why not change the cc state to Idle?
- grpclog.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err)
+ if grpclog.V(2) {
+ grpclog.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err)
+ }
return
}
b.cc.UpdateBalancerState(connectivity.Idle, &picker{sc: b.sc})
@@ -70,9 +74,13 @@
}
func (b *pickfirstBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
- grpclog.Infof("pickfirstBalancer: HandleSubConnStateChange: %p, %v", sc, s)
+ if grpclog.V(2) {
+ grpclog.Infof("pickfirstBalancer: HandleSubConnStateChange: %p, %v", sc, s)
+ }
if b.sc != sc {
- grpclog.Infof("pickfirstBalancer: ignored state change because sc is not recognized")
+ if grpclog.V(2) {
+ grpclog.Infof("pickfirstBalancer: ignored state change because sc is not recognized")
+ }
return
}
if s == connectivity.Shutdown {
diff --git a/vendor/google.golang.org/grpc/reflection/grpc_reflection_v1alpha/reflection.pb.go b/vendor/google.golang.org/grpc/reflection/grpc_reflection_v1alpha/reflection.pb.go
index ae5aa7d..0a12ad2 100644
--- a/vendor/google.golang.org/grpc/reflection/grpc_reflection_v1alpha/reflection.pb.go
+++ b/vendor/google.golang.org/grpc/reflection/grpc_reflection_v1alpha/reflection.pb.go
@@ -3,13 +3,14 @@
package grpc_reflection_v1alpha
-import proto "github.com/golang/protobuf/proto"
-import fmt "fmt"
-import math "math"
-
import (
- context "golang.org/x/net/context"
+ context "context"
+ fmt "fmt"
+ proto "github.com/golang/protobuf/proto"
grpc "google.golang.org/grpc"
+ codes "google.golang.org/grpc/codes"
+ status "google.golang.org/grpc/status"
+ math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
@@ -21,7 +22,7 @@
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
-const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
+const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
// The message sent by the client when calling ServerReflectionInfo method.
type ServerReflectionRequest struct {
@@ -46,16 +47,17 @@
func (m *ServerReflectionRequest) String() string { return proto.CompactTextString(m) }
func (*ServerReflectionRequest) ProtoMessage() {}
func (*ServerReflectionRequest) Descriptor() ([]byte, []int) {
- return fileDescriptor_reflection_178bd1e101bf8b63, []int{0}
+ return fileDescriptor_42a8ac412db3cb03, []int{0}
}
+
func (m *ServerReflectionRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ServerReflectionRequest.Unmarshal(m, b)
}
func (m *ServerReflectionRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ServerReflectionRequest.Marshal(b, m, deterministic)
}
-func (dst *ServerReflectionRequest) XXX_Merge(src proto.Message) {
- xxx_messageInfo_ServerReflectionRequest.Merge(dst, src)
+func (m *ServerReflectionRequest) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_ServerReflectionRequest.Merge(m, src)
}
func (m *ServerReflectionRequest) XXX_Size() int {
return xxx_messageInfo_ServerReflectionRequest.Size(m)
@@ -149,9 +151,9 @@
return ""
}
-// XXX_OneofFuncs is for the internal use of the proto package.
-func (*ServerReflectionRequest) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) {
- return _ServerReflectionRequest_OneofMarshaler, _ServerReflectionRequest_OneofUnmarshaler, _ServerReflectionRequest_OneofSizer, []interface{}{
+// XXX_OneofWrappers is for the internal use of the proto package.
+func (*ServerReflectionRequest) XXX_OneofWrappers() []interface{} {
+ return []interface{}{
(*ServerReflectionRequest_FileByFilename)(nil),
(*ServerReflectionRequest_FileContainingSymbol)(nil),
(*ServerReflectionRequest_FileContainingExtension)(nil),
@@ -160,110 +162,6 @@
}
}
-func _ServerReflectionRequest_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
- m := msg.(*ServerReflectionRequest)
- // message_request
- switch x := m.MessageRequest.(type) {
- case *ServerReflectionRequest_FileByFilename:
- b.EncodeVarint(3<<3 | proto.WireBytes)
- b.EncodeStringBytes(x.FileByFilename)
- case *ServerReflectionRequest_FileContainingSymbol:
- b.EncodeVarint(4<<3 | proto.WireBytes)
- b.EncodeStringBytes(x.FileContainingSymbol)
- case *ServerReflectionRequest_FileContainingExtension:
- b.EncodeVarint(5<<3 | proto.WireBytes)
- if err := b.EncodeMessage(x.FileContainingExtension); err != nil {
- return err
- }
- case *ServerReflectionRequest_AllExtensionNumbersOfType:
- b.EncodeVarint(6<<3 | proto.WireBytes)
- b.EncodeStringBytes(x.AllExtensionNumbersOfType)
- case *ServerReflectionRequest_ListServices:
- b.EncodeVarint(7<<3 | proto.WireBytes)
- b.EncodeStringBytes(x.ListServices)
- case nil:
- default:
- return fmt.Errorf("ServerReflectionRequest.MessageRequest has unexpected type %T", x)
- }
- return nil
-}
-
-func _ServerReflectionRequest_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) {
- m := msg.(*ServerReflectionRequest)
- switch tag {
- case 3: // message_request.file_by_filename
- if wire != proto.WireBytes {
- return true, proto.ErrInternalBadWireType
- }
- x, err := b.DecodeStringBytes()
- m.MessageRequest = &ServerReflectionRequest_FileByFilename{x}
- return true, err
- case 4: // message_request.file_containing_symbol
- if wire != proto.WireBytes {
- return true, proto.ErrInternalBadWireType
- }
- x, err := b.DecodeStringBytes()
- m.MessageRequest = &ServerReflectionRequest_FileContainingSymbol{x}
- return true, err
- case 5: // message_request.file_containing_extension
- if wire != proto.WireBytes {
- return true, proto.ErrInternalBadWireType
- }
- msg := new(ExtensionRequest)
- err := b.DecodeMessage(msg)
- m.MessageRequest = &ServerReflectionRequest_FileContainingExtension{msg}
- return true, err
- case 6: // message_request.all_extension_numbers_of_type
- if wire != proto.WireBytes {
- return true, proto.ErrInternalBadWireType
- }
- x, err := b.DecodeStringBytes()
- m.MessageRequest = &ServerReflectionRequest_AllExtensionNumbersOfType{x}
- return true, err
- case 7: // message_request.list_services
- if wire != proto.WireBytes {
- return true, proto.ErrInternalBadWireType
- }
- x, err := b.DecodeStringBytes()
- m.MessageRequest = &ServerReflectionRequest_ListServices{x}
- return true, err
- default:
- return false, nil
- }
-}
-
-func _ServerReflectionRequest_OneofSizer(msg proto.Message) (n int) {
- m := msg.(*ServerReflectionRequest)
- // message_request
- switch x := m.MessageRequest.(type) {
- case *ServerReflectionRequest_FileByFilename:
- n += 1 // tag and wire
- n += proto.SizeVarint(uint64(len(x.FileByFilename)))
- n += len(x.FileByFilename)
- case *ServerReflectionRequest_FileContainingSymbol:
- n += 1 // tag and wire
- n += proto.SizeVarint(uint64(len(x.FileContainingSymbol)))
- n += len(x.FileContainingSymbol)
- case *ServerReflectionRequest_FileContainingExtension:
- s := proto.Size(x.FileContainingExtension)
- n += 1 // tag and wire
- n += proto.SizeVarint(uint64(s))
- n += s
- case *ServerReflectionRequest_AllExtensionNumbersOfType:
- n += 1 // tag and wire
- n += proto.SizeVarint(uint64(len(x.AllExtensionNumbersOfType)))
- n += len(x.AllExtensionNumbersOfType)
- case *ServerReflectionRequest_ListServices:
- n += 1 // tag and wire
- n += proto.SizeVarint(uint64(len(x.ListServices)))
- n += len(x.ListServices)
- case nil:
- default:
- panic(fmt.Sprintf("proto: unexpected type %T in oneof", x))
- }
- return n
-}
-
// The type name and extension number sent by the client when requesting
// file_containing_extension.
type ExtensionRequest struct {
@@ -279,16 +177,17 @@
func (m *ExtensionRequest) String() string { return proto.CompactTextString(m) }
func (*ExtensionRequest) ProtoMessage() {}
func (*ExtensionRequest) Descriptor() ([]byte, []int) {
- return fileDescriptor_reflection_178bd1e101bf8b63, []int{1}
+ return fileDescriptor_42a8ac412db3cb03, []int{1}
}
+
func (m *ExtensionRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ExtensionRequest.Unmarshal(m, b)
}
func (m *ExtensionRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ExtensionRequest.Marshal(b, m, deterministic)
}
-func (dst *ExtensionRequest) XXX_Merge(src proto.Message) {
- xxx_messageInfo_ExtensionRequest.Merge(dst, src)
+func (m *ExtensionRequest) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_ExtensionRequest.Merge(m, src)
}
func (m *ExtensionRequest) XXX_Size() int {
return xxx_messageInfo_ExtensionRequest.Size(m)
@@ -335,16 +234,17 @@
func (m *ServerReflectionResponse) String() string { return proto.CompactTextString(m) }
func (*ServerReflectionResponse) ProtoMessage() {}
func (*ServerReflectionResponse) Descriptor() ([]byte, []int) {
- return fileDescriptor_reflection_178bd1e101bf8b63, []int{2}
+ return fileDescriptor_42a8ac412db3cb03, []int{2}
}
+
func (m *ServerReflectionResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ServerReflectionResponse.Unmarshal(m, b)
}
func (m *ServerReflectionResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ServerReflectionResponse.Marshal(b, m, deterministic)
}
-func (dst *ServerReflectionResponse) XXX_Merge(src proto.Message) {
- xxx_messageInfo_ServerReflectionResponse.Merge(dst, src)
+func (m *ServerReflectionResponse) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_ServerReflectionResponse.Merge(m, src)
}
func (m *ServerReflectionResponse) XXX_Size() int {
return xxx_messageInfo_ServerReflectionResponse.Size(m)
@@ -433,9 +333,9 @@
return nil
}
-// XXX_OneofFuncs is for the internal use of the proto package.
-func (*ServerReflectionResponse) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) {
- return _ServerReflectionResponse_OneofMarshaler, _ServerReflectionResponse_OneofUnmarshaler, _ServerReflectionResponse_OneofSizer, []interface{}{
+// XXX_OneofWrappers is for the internal use of the proto package.
+func (*ServerReflectionResponse) XXX_OneofWrappers() []interface{} {
+ return []interface{}{
(*ServerReflectionResponse_FileDescriptorResponse)(nil),
(*ServerReflectionResponse_AllExtensionNumbersResponse)(nil),
(*ServerReflectionResponse_ListServicesResponse)(nil),
@@ -443,108 +343,6 @@
}
}
-func _ServerReflectionResponse_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
- m := msg.(*ServerReflectionResponse)
- // message_response
- switch x := m.MessageResponse.(type) {
- case *ServerReflectionResponse_FileDescriptorResponse:
- b.EncodeVarint(4<<3 | proto.WireBytes)
- if err := b.EncodeMessage(x.FileDescriptorResponse); err != nil {
- return err
- }
- case *ServerReflectionResponse_AllExtensionNumbersResponse:
- b.EncodeVarint(5<<3 | proto.WireBytes)
- if err := b.EncodeMessage(x.AllExtensionNumbersResponse); err != nil {
- return err
- }
- case *ServerReflectionResponse_ListServicesResponse:
- b.EncodeVarint(6<<3 | proto.WireBytes)
- if err := b.EncodeMessage(x.ListServicesResponse); err != nil {
- return err
- }
- case *ServerReflectionResponse_ErrorResponse:
- b.EncodeVarint(7<<3 | proto.WireBytes)
- if err := b.EncodeMessage(x.ErrorResponse); err != nil {
- return err
- }
- case nil:
- default:
- return fmt.Errorf("ServerReflectionResponse.MessageResponse has unexpected type %T", x)
- }
- return nil
-}
-
-func _ServerReflectionResponse_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) {
- m := msg.(*ServerReflectionResponse)
- switch tag {
- case 4: // message_response.file_descriptor_response
- if wire != proto.WireBytes {
- return true, proto.ErrInternalBadWireType
- }
- msg := new(FileDescriptorResponse)
- err := b.DecodeMessage(msg)
- m.MessageResponse = &ServerReflectionResponse_FileDescriptorResponse{msg}
- return true, err
- case 5: // message_response.all_extension_numbers_response
- if wire != proto.WireBytes {
- return true, proto.ErrInternalBadWireType
- }
- msg := new(ExtensionNumberResponse)
- err := b.DecodeMessage(msg)
- m.MessageResponse = &ServerReflectionResponse_AllExtensionNumbersResponse{msg}
- return true, err
- case 6: // message_response.list_services_response
- if wire != proto.WireBytes {
- return true, proto.ErrInternalBadWireType
- }
- msg := new(ListServiceResponse)
- err := b.DecodeMessage(msg)
- m.MessageResponse = &ServerReflectionResponse_ListServicesResponse{msg}
- return true, err
- case 7: // message_response.error_response
- if wire != proto.WireBytes {
- return true, proto.ErrInternalBadWireType
- }
- msg := new(ErrorResponse)
- err := b.DecodeMessage(msg)
- m.MessageResponse = &ServerReflectionResponse_ErrorResponse{msg}
- return true, err
- default:
- return false, nil
- }
-}
-
-func _ServerReflectionResponse_OneofSizer(msg proto.Message) (n int) {
- m := msg.(*ServerReflectionResponse)
- // message_response
- switch x := m.MessageResponse.(type) {
- case *ServerReflectionResponse_FileDescriptorResponse:
- s := proto.Size(x.FileDescriptorResponse)
- n += 1 // tag and wire
- n += proto.SizeVarint(uint64(s))
- n += s
- case *ServerReflectionResponse_AllExtensionNumbersResponse:
- s := proto.Size(x.AllExtensionNumbersResponse)
- n += 1 // tag and wire
- n += proto.SizeVarint(uint64(s))
- n += s
- case *ServerReflectionResponse_ListServicesResponse:
- s := proto.Size(x.ListServicesResponse)
- n += 1 // tag and wire
- n += proto.SizeVarint(uint64(s))
- n += s
- case *ServerReflectionResponse_ErrorResponse:
- s := proto.Size(x.ErrorResponse)
- n += 1 // tag and wire
- n += proto.SizeVarint(uint64(s))
- n += s
- case nil:
- default:
- panic(fmt.Sprintf("proto: unexpected type %T in oneof", x))
- }
- return n
-}
-
// Serialized FileDescriptorProto messages sent by the server answering
// a file_by_filename, file_containing_symbol, or file_containing_extension
// request.
@@ -562,16 +360,17 @@
func (m *FileDescriptorResponse) String() string { return proto.CompactTextString(m) }
func (*FileDescriptorResponse) ProtoMessage() {}
func (*FileDescriptorResponse) Descriptor() ([]byte, []int) {
- return fileDescriptor_reflection_178bd1e101bf8b63, []int{3}
+ return fileDescriptor_42a8ac412db3cb03, []int{3}
}
+
func (m *FileDescriptorResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_FileDescriptorResponse.Unmarshal(m, b)
}
func (m *FileDescriptorResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_FileDescriptorResponse.Marshal(b, m, deterministic)
}
-func (dst *FileDescriptorResponse) XXX_Merge(src proto.Message) {
- xxx_messageInfo_FileDescriptorResponse.Merge(dst, src)
+func (m *FileDescriptorResponse) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_FileDescriptorResponse.Merge(m, src)
}
func (m *FileDescriptorResponse) XXX_Size() int {
return xxx_messageInfo_FileDescriptorResponse.Size(m)
@@ -605,16 +404,17 @@
func (m *ExtensionNumberResponse) String() string { return proto.CompactTextString(m) }
func (*ExtensionNumberResponse) ProtoMessage() {}
func (*ExtensionNumberResponse) Descriptor() ([]byte, []int) {
- return fileDescriptor_reflection_178bd1e101bf8b63, []int{4}
+ return fileDescriptor_42a8ac412db3cb03, []int{4}
}
+
func (m *ExtensionNumberResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ExtensionNumberResponse.Unmarshal(m, b)
}
func (m *ExtensionNumberResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ExtensionNumberResponse.Marshal(b, m, deterministic)
}
-func (dst *ExtensionNumberResponse) XXX_Merge(src proto.Message) {
- xxx_messageInfo_ExtensionNumberResponse.Merge(dst, src)
+func (m *ExtensionNumberResponse) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_ExtensionNumberResponse.Merge(m, src)
}
func (m *ExtensionNumberResponse) XXX_Size() int {
return xxx_messageInfo_ExtensionNumberResponse.Size(m)
@@ -653,16 +453,17 @@
func (m *ListServiceResponse) String() string { return proto.CompactTextString(m) }
func (*ListServiceResponse) ProtoMessage() {}
func (*ListServiceResponse) Descriptor() ([]byte, []int) {
- return fileDescriptor_reflection_178bd1e101bf8b63, []int{5}
+ return fileDescriptor_42a8ac412db3cb03, []int{5}
}
+
func (m *ListServiceResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListServiceResponse.Unmarshal(m, b)
}
func (m *ListServiceResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ListServiceResponse.Marshal(b, m, deterministic)
}
-func (dst *ListServiceResponse) XXX_Merge(src proto.Message) {
- xxx_messageInfo_ListServiceResponse.Merge(dst, src)
+func (m *ListServiceResponse) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_ListServiceResponse.Merge(m, src)
}
func (m *ListServiceResponse) XXX_Size() int {
return xxx_messageInfo_ListServiceResponse.Size(m)
@@ -695,16 +496,17 @@
func (m *ServiceResponse) String() string { return proto.CompactTextString(m) }
func (*ServiceResponse) ProtoMessage() {}
func (*ServiceResponse) Descriptor() ([]byte, []int) {
- return fileDescriptor_reflection_178bd1e101bf8b63, []int{6}
+ return fileDescriptor_42a8ac412db3cb03, []int{6}
}
+
func (m *ServiceResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ServiceResponse.Unmarshal(m, b)
}
func (m *ServiceResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ServiceResponse.Marshal(b, m, deterministic)
}
-func (dst *ServiceResponse) XXX_Merge(src proto.Message) {
- xxx_messageInfo_ServiceResponse.Merge(dst, src)
+func (m *ServiceResponse) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_ServiceResponse.Merge(m, src)
}
func (m *ServiceResponse) XXX_Size() int {
return xxx_messageInfo_ServiceResponse.Size(m)
@@ -736,16 +538,17 @@
func (m *ErrorResponse) String() string { return proto.CompactTextString(m) }
func (*ErrorResponse) ProtoMessage() {}
func (*ErrorResponse) Descriptor() ([]byte, []int) {
- return fileDescriptor_reflection_178bd1e101bf8b63, []int{7}
+ return fileDescriptor_42a8ac412db3cb03, []int{7}
}
+
func (m *ErrorResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ErrorResponse.Unmarshal(m, b)
}
func (m *ErrorResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ErrorResponse.Marshal(b, m, deterministic)
}
-func (dst *ErrorResponse) XXX_Merge(src proto.Message) {
- xxx_messageInfo_ErrorResponse.Merge(dst, src)
+func (m *ErrorResponse) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_ErrorResponse.Merge(m, src)
}
func (m *ErrorResponse) XXX_Size() int {
return xxx_messageInfo_ErrorResponse.Size(m)
@@ -781,6 +584,55 @@
proto.RegisterType((*ErrorResponse)(nil), "grpc.reflection.v1alpha.ErrorResponse")
}
+func init() {
+ proto.RegisterFile("grpc_reflection_v1alpha/reflection.proto", fileDescriptor_42a8ac412db3cb03)
+}
+
+var fileDescriptor_42a8ac412db3cb03 = []byte{
+ // 656 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0x51, 0x73, 0xd2, 0x40,
+ 0x10, 0x6e, 0x5a, 0x68, 0x87, 0x85, 0x02, 0x5e, 0x2b, 0xa4, 0x3a, 0x75, 0x98, 0x68, 0x35, 0x75,
+ 0x1c, 0xda, 0xe2, 0x8c, 0x3f, 0x80, 0xaa, 0x83, 0x33, 0xb5, 0x75, 0x0e, 0x5f, 0x1c, 0x1f, 0x6e,
+ 0x02, 0x2c, 0x34, 0x1a, 0x72, 0xf1, 0x2e, 0x45, 0x79, 0xf2, 0x47, 0xf8, 0xa3, 0xfc, 0x4b, 0x3e,
+ 0x3a, 0x77, 0x09, 0x21, 0xa4, 0x44, 0xa7, 0x4f, 0x30, 0xdf, 0xee, 0xde, 0xb7, 0xbb, 0xdf, 0xb7,
+ 0x01, 0x7b, 0x22, 0x82, 0x21, 0x13, 0x38, 0xf6, 0x70, 0x18, 0xba, 0xdc, 0x67, 0xb3, 0x33, 0xc7,
+ 0x0b, 0xae, 0x9d, 0x93, 0x25, 0xd4, 0x0e, 0x04, 0x0f, 0x39, 0x69, 0xaa, 0xcc, 0x76, 0x0a, 0x8e,
+ 0x33, 0xad, 0x3f, 0x9b, 0xd0, 0xec, 0xa3, 0x98, 0xa1, 0xa0, 0x49, 0x90, 0xe2, 0xb7, 0x1b, 0x94,
+ 0x21, 0x21, 0x50, 0xb8, 0xe6, 0x32, 0x34, 0x8d, 0x96, 0x61, 0x97, 0xa8, 0xfe, 0x4f, 0x9e, 0x43,
+ 0x7d, 0xec, 0x7a, 0xc8, 0x06, 0x73, 0xa6, 0x7e, 0x7d, 0x67, 0x8a, 0xe6, 0x96, 0x8a, 0xf7, 0x36,
+ 0x68, 0x55, 0x21, 0xdd, 0xf9, 0xdb, 0x18, 0x27, 0xaf, 0xa0, 0xa1, 0x73, 0x87, 0xdc, 0x0f, 0x1d,
+ 0xd7, 0x77, 0xfd, 0x09, 0x93, 0xf3, 0xe9, 0x80, 0x7b, 0x66, 0x21, 0xae, 0xd8, 0x57, 0xf1, 0xf3,
+ 0x24, 0xdc, 0xd7, 0x51, 0x32, 0x81, 0x83, 0x6c, 0x1d, 0xfe, 0x08, 0xd1, 0x97, 0x2e, 0xf7, 0xcd,
+ 0x62, 0xcb, 0xb0, 0xcb, 0x9d, 0xe3, 0x76, 0xce, 0x40, 0xed, 0x37, 0x8b, 0xcc, 0x78, 0x8a, 0xde,
+ 0x06, 0x6d, 0xae, 0xb2, 0x24, 0x19, 0xa4, 0x0b, 0x87, 0x8e, 0xe7, 0x2d, 0x1f, 0x67, 0xfe, 0xcd,
+ 0x74, 0x80, 0x42, 0x32, 0x3e, 0x66, 0xe1, 0x3c, 0x40, 0x73, 0x3b, 0xee, 0xf3, 0xc0, 0xf1, 0xbc,
+ 0xa4, 0xec, 0x32, 0x4a, 0xba, 0x1a, 0x7f, 0x9c, 0x07, 0x48, 0x8e, 0x60, 0xd7, 0x73, 0x65, 0xc8,
+ 0x24, 0x8a, 0x99, 0x3b, 0x44, 0x69, 0xee, 0xc4, 0x35, 0x15, 0x05, 0xf7, 0x63, 0xb4, 0x7b, 0x0f,
+ 0x6a, 0x53, 0x94, 0xd2, 0x99, 0x20, 0x13, 0x51, 0x63, 0xd6, 0x18, 0xea, 0xd9, 0x66, 0xc9, 0x33,
+ 0xa8, 0xa5, 0xa6, 0xd6, 0x3d, 0x44, 0xdb, 0xaf, 0x2e, 0x61, 0x4d, 0x7b, 0x0c, 0xf5, 0x6c, 0xdb,
+ 0xe6, 0x66, 0xcb, 0xb0, 0x8b, 0xb4, 0x86, 0xab, 0x8d, 0x5a, 0xbf, 0x0b, 0x60, 0xde, 0x96, 0x58,
+ 0x06, 0xdc, 0x97, 0x48, 0x0e, 0x01, 0x66, 0x8e, 0xe7, 0x8e, 0x58, 0x4a, 0xe9, 0x92, 0x46, 0x7a,
+ 0x4a, 0xee, 0xcf, 0x50, 0xe7, 0xc2, 0x9d, 0xb8, 0xbe, 0xe3, 0x2d, 0xfa, 0xd6, 0x34, 0xe5, 0xce,
+ 0x69, 0xae, 0x02, 0x39, 0x76, 0xa2, 0xb5, 0xc5, 0x4b, 0x8b, 0x61, 0xbf, 0x82, 0xa9, 0x75, 0x1e,
+ 0xa1, 0x1c, 0x0a, 0x37, 0x08, 0xb9, 0x60, 0x22, 0xee, 0x4b, 0x3b, 0xa4, 0xdc, 0x39, 0xc9, 0x25,
+ 0x51, 0x26, 0x7b, 0x9d, 0xd4, 0x2d, 0xc6, 0xe9, 0x6d, 0x50, 0x6d, 0xb9, 0xdb, 0x11, 0xf2, 0x1d,
+ 0x1e, 0xad, 0xd7, 0x3a, 0xa1, 0x2c, 0xfe, 0x67, 0xae, 0x8c, 0x01, 0x52, 0x9c, 0x0f, 0xd7, 0xd8,
+ 0x23, 0x21, 0x1e, 0x41, 0x63, 0xc5, 0x20, 0x4b, 0xc2, 0x6d, 0x4d, 0xf8, 0x22, 0x97, 0xf0, 0x62,
+ 0x69, 0xa0, 0x14, 0xd9, 0x7e, 0xda, 0x57, 0x09, 0xcb, 0x15, 0x54, 0x51, 0x88, 0xf4, 0x06, 0x77,
+ 0xf4, 0xeb, 0x4f, 0xf3, 0xc7, 0x51, 0xe9, 0xa9, 0x77, 0x77, 0x31, 0x0d, 0x74, 0x09, 0xd4, 0x97,
+ 0x86, 0x8d, 0x30, 0xeb, 0x02, 0x1a, 0xeb, 0xf7, 0x4e, 0x3a, 0x70, 0x3f, 0x2b, 0xa5, 0xfe, 0xf0,
+ 0x98, 0x46, 0x6b, 0xcb, 0xae, 0xd0, 0xbd, 0x55, 0x51, 0x3e, 0xa8, 0x90, 0xf5, 0x05, 0x9a, 0x39,
+ 0x2b, 0x25, 0x4f, 0xa0, 0x3a, 0x70, 0x24, 0xea, 0x03, 0x60, 0xfa, 0x1b, 0x13, 0x39, 0xb3, 0xa2,
+ 0x50, 0xe5, 0xff, 0x4b, 0xf5, 0x7d, 0x59, 0x7f, 0x03, 0x5b, 0xeb, 0x6e, 0xe0, 0x13, 0xec, 0xad,
+ 0xd9, 0x26, 0xe9, 0xc2, 0x4e, 0x2c, 0x8b, 0x6e, 0xb4, 0xdc, 0xb1, 0xff, 0xe9, 0xea, 0x54, 0x29,
+ 0x5d, 0x14, 0x5a, 0x47, 0x50, 0xcb, 0x3e, 0x4b, 0xa0, 0x90, 0x6a, 0x5a, 0xff, 0xb7, 0xfa, 0xb0,
+ 0xbb, 0xb2, 0x71, 0x75, 0x79, 0x91, 0x62, 0x43, 0x3e, 0x8a, 0x52, 0x8b, 0xb4, 0xa4, 0x91, 0x73,
+ 0x3e, 0x42, 0xf2, 0x18, 0x22, 0x41, 0x58, 0xac, 0x82, 0x3e, 0xbb, 0x12, 0xad, 0x68, 0xf0, 0x7d,
+ 0x84, 0x75, 0x7e, 0x19, 0x50, 0xcf, 0x9e, 0x1b, 0xf9, 0x09, 0xfb, 0x59, 0xec, 0x9d, 0x3f, 0xe6,
+ 0xe4, 0xce, 0x17, 0xfb, 0xe0, 0xec, 0x0e, 0x15, 0xd1, 0x54, 0xb6, 0x71, 0x6a, 0x0c, 0xb6, 0xb5,
+ 0xf4, 0x2f, 0xff, 0x06, 0x00, 0x00, 0xff, 0xff, 0x85, 0x02, 0x09, 0x9d, 0x9f, 0x06, 0x00, 0x00,
+}
+
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
@@ -844,6 +696,14 @@
ServerReflectionInfo(ServerReflection_ServerReflectionInfoServer) error
}
+// UnimplementedServerReflectionServer can be embedded to have forward compatible implementations.
+type UnimplementedServerReflectionServer struct {
+}
+
+func (*UnimplementedServerReflectionServer) ServerReflectionInfo(srv ServerReflection_ServerReflectionInfoServer) error {
+ return status.Errorf(codes.Unimplemented, "method ServerReflectionInfo not implemented")
+}
+
func RegisterServerReflectionServer(s *grpc.Server, srv ServerReflectionServer) {
s.RegisterService(&_ServerReflection_serviceDesc, srv)
}
@@ -888,52 +748,3 @@
},
Metadata: "grpc_reflection_v1alpha/reflection.proto",
}
-
-func init() {
- proto.RegisterFile("grpc_reflection_v1alpha/reflection.proto", fileDescriptor_reflection_178bd1e101bf8b63)
-}
-
-var fileDescriptor_reflection_178bd1e101bf8b63 = []byte{
- // 656 bytes of a gzipped FileDescriptorProto
- 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0x51, 0x73, 0xd2, 0x40,
- 0x10, 0x6e, 0x5a, 0x68, 0x87, 0x85, 0x02, 0x5e, 0x2b, 0xa4, 0x3a, 0x75, 0x98, 0x68, 0x35, 0x75,
- 0x1c, 0xda, 0xe2, 0x8c, 0x3f, 0x80, 0xaa, 0x83, 0x33, 0xb5, 0x75, 0x0e, 0x5f, 0x1c, 0x1f, 0x6e,
- 0x02, 0x2c, 0x34, 0x1a, 0x72, 0xf1, 0x2e, 0x45, 0x79, 0xf2, 0x47, 0xf8, 0xa3, 0xfc, 0x4b, 0x3e,
- 0x3a, 0x77, 0x09, 0x21, 0xa4, 0x44, 0xa7, 0x4f, 0x30, 0xdf, 0xee, 0xde, 0xb7, 0xbb, 0xdf, 0xb7,
- 0x01, 0x7b, 0x22, 0x82, 0x21, 0x13, 0x38, 0xf6, 0x70, 0x18, 0xba, 0xdc, 0x67, 0xb3, 0x33, 0xc7,
- 0x0b, 0xae, 0x9d, 0x93, 0x25, 0xd4, 0x0e, 0x04, 0x0f, 0x39, 0x69, 0xaa, 0xcc, 0x76, 0x0a, 0x8e,
- 0x33, 0xad, 0x3f, 0x9b, 0xd0, 0xec, 0xa3, 0x98, 0xa1, 0xa0, 0x49, 0x90, 0xe2, 0xb7, 0x1b, 0x94,
- 0x21, 0x21, 0x50, 0xb8, 0xe6, 0x32, 0x34, 0x8d, 0x96, 0x61, 0x97, 0xa8, 0xfe, 0x4f, 0x9e, 0x43,
- 0x7d, 0xec, 0x7a, 0xc8, 0x06, 0x73, 0xa6, 0x7e, 0x7d, 0x67, 0x8a, 0xe6, 0x96, 0x8a, 0xf7, 0x36,
- 0x68, 0x55, 0x21, 0xdd, 0xf9, 0xdb, 0x18, 0x27, 0xaf, 0xa0, 0xa1, 0x73, 0x87, 0xdc, 0x0f, 0x1d,
- 0xd7, 0x77, 0xfd, 0x09, 0x93, 0xf3, 0xe9, 0x80, 0x7b, 0x66, 0x21, 0xae, 0xd8, 0x57, 0xf1, 0xf3,
- 0x24, 0xdc, 0xd7, 0x51, 0x32, 0x81, 0x83, 0x6c, 0x1d, 0xfe, 0x08, 0xd1, 0x97, 0x2e, 0xf7, 0xcd,
- 0x62, 0xcb, 0xb0, 0xcb, 0x9d, 0xe3, 0x76, 0xce, 0x40, 0xed, 0x37, 0x8b, 0xcc, 0x78, 0x8a, 0xde,
- 0x06, 0x6d, 0xae, 0xb2, 0x24, 0x19, 0xa4, 0x0b, 0x87, 0x8e, 0xe7, 0x2d, 0x1f, 0x67, 0xfe, 0xcd,
- 0x74, 0x80, 0x42, 0x32, 0x3e, 0x66, 0xe1, 0x3c, 0x40, 0x73, 0x3b, 0xee, 0xf3, 0xc0, 0xf1, 0xbc,
- 0xa4, 0xec, 0x32, 0x4a, 0xba, 0x1a, 0x7f, 0x9c, 0x07, 0x48, 0x8e, 0x60, 0xd7, 0x73, 0x65, 0xc8,
- 0x24, 0x8a, 0x99, 0x3b, 0x44, 0x69, 0xee, 0xc4, 0x35, 0x15, 0x05, 0xf7, 0x63, 0xb4, 0x7b, 0x0f,
- 0x6a, 0x53, 0x94, 0xd2, 0x99, 0x20, 0x13, 0x51, 0x63, 0xd6, 0x18, 0xea, 0xd9, 0x66, 0xc9, 0x33,
- 0xa8, 0xa5, 0xa6, 0xd6, 0x3d, 0x44, 0xdb, 0xaf, 0x2e, 0x61, 0x4d, 0x7b, 0x0c, 0xf5, 0x6c, 0xdb,
- 0xe6, 0x66, 0xcb, 0xb0, 0x8b, 0xb4, 0x86, 0xab, 0x8d, 0x5a, 0xbf, 0x0b, 0x60, 0xde, 0x96, 0x58,
- 0x06, 0xdc, 0x97, 0x48, 0x0e, 0x01, 0x66, 0x8e, 0xe7, 0x8e, 0x58, 0x4a, 0xe9, 0x92, 0x46, 0x7a,
- 0x4a, 0xee, 0xcf, 0x50, 0xe7, 0xc2, 0x9d, 0xb8, 0xbe, 0xe3, 0x2d, 0xfa, 0xd6, 0x34, 0xe5, 0xce,
- 0x69, 0xae, 0x02, 0x39, 0x76, 0xa2, 0xb5, 0xc5, 0x4b, 0x8b, 0x61, 0xbf, 0x82, 0xa9, 0x75, 0x1e,
- 0xa1, 0x1c, 0x0a, 0x37, 0x08, 0xb9, 0x60, 0x22, 0xee, 0x4b, 0x3b, 0xa4, 0xdc, 0x39, 0xc9, 0x25,
- 0x51, 0x26, 0x7b, 0x9d, 0xd4, 0x2d, 0xc6, 0xe9, 0x6d, 0x50, 0x6d, 0xb9, 0xdb, 0x11, 0xf2, 0x1d,
- 0x1e, 0xad, 0xd7, 0x3a, 0xa1, 0x2c, 0xfe, 0x67, 0xae, 0x8c, 0x01, 0x52, 0x9c, 0x0f, 0xd7, 0xd8,
- 0x23, 0x21, 0x1e, 0x41, 0x63, 0xc5, 0x20, 0x4b, 0xc2, 0x6d, 0x4d, 0xf8, 0x22, 0x97, 0xf0, 0x62,
- 0x69, 0xa0, 0x14, 0xd9, 0x7e, 0xda, 0x57, 0x09, 0xcb, 0x15, 0x54, 0x51, 0x88, 0xf4, 0x06, 0x77,
- 0xf4, 0xeb, 0x4f, 0xf3, 0xc7, 0x51, 0xe9, 0xa9, 0x77, 0x77, 0x31, 0x0d, 0x74, 0x09, 0xd4, 0x97,
- 0x86, 0x8d, 0x30, 0xeb, 0x02, 0x1a, 0xeb, 0xf7, 0x4e, 0x3a, 0x70, 0x3f, 0x2b, 0xa5, 0xfe, 0xf0,
- 0x98, 0x46, 0x6b, 0xcb, 0xae, 0xd0, 0xbd, 0x55, 0x51, 0x3e, 0xa8, 0x90, 0xf5, 0x05, 0x9a, 0x39,
- 0x2b, 0x25, 0x4f, 0xa0, 0x3a, 0x70, 0x24, 0xea, 0x03, 0x60, 0xfa, 0x1b, 0x13, 0x39, 0xb3, 0xa2,
- 0x50, 0xe5, 0xff, 0x4b, 0xf5, 0x7d, 0x59, 0x7f, 0x03, 0x5b, 0xeb, 0x6e, 0xe0, 0x13, 0xec, 0xad,
- 0xd9, 0x26, 0xe9, 0xc2, 0x4e, 0x2c, 0x8b, 0x6e, 0xb4, 0xdc, 0xb1, 0xff, 0xe9, 0xea, 0x54, 0x29,
- 0x5d, 0x14, 0x5a, 0x47, 0x50, 0xcb, 0x3e, 0x4b, 0xa0, 0x90, 0x6a, 0x5a, 0xff, 0xb7, 0xfa, 0xb0,
- 0xbb, 0xb2, 0x71, 0x75, 0x79, 0x91, 0x62, 0x43, 0x3e, 0x8a, 0x52, 0x8b, 0xb4, 0xa4, 0x91, 0x73,
- 0x3e, 0x42, 0xf2, 0x18, 0x22, 0x41, 0x58, 0xac, 0x82, 0x3e, 0xbb, 0x12, 0xad, 0x68, 0xf0, 0x7d,
- 0x84, 0x75, 0x7e, 0x19, 0x50, 0xcf, 0x9e, 0x1b, 0xf9, 0x09, 0xfb, 0x59, 0xec, 0x9d, 0x3f, 0xe6,
- 0xe4, 0xce, 0x17, 0xfb, 0xe0, 0xec, 0x0e, 0x15, 0xd1, 0x54, 0xb6, 0x71, 0x6a, 0x0c, 0xb6, 0xb5,
- 0xf4, 0x2f, 0xff, 0x06, 0x00, 0x00, 0xff, 0xff, 0x85, 0x02, 0x09, 0x9d, 0x9f, 0x06, 0x00, 0x00,
-}
diff --git a/vendor/google.golang.org/grpc/resolver/resolver.go b/vendor/google.golang.org/grpc/resolver/resolver.go
index f6f6934..e83da34 100644
--- a/vendor/google.golang.org/grpc/resolver/resolver.go
+++ b/vendor/google.golang.org/grpc/resolver/resolver.go
@@ -20,6 +20,10 @@
// All APIs in this package are experimental.
package resolver
+import (
+ "google.golang.org/grpc/serviceconfig"
+)
+
var (
// m is a map from scheme to resolver builder.
m = make(map[string]Builder)
@@ -100,11 +104,12 @@
// State contains the current Resolver state relevant to the ClientConn.
type State struct {
- Addresses []Address // Resolved addresses for the target
- ServiceConfig string // JSON representation of the service config
+ Addresses []Address // Resolved addresses for the target
+ // ServiceConfig is the parsed service config; obtained from
+ // serviceconfig.Parse.
+ ServiceConfig serviceconfig.Config
// TODO: add Err error
- // TODO: add ParsedServiceConfig interface{}
}
// ClientConn contains the callbacks for resolver to notify any updates
diff --git a/vendor/google.golang.org/grpc/resolver_conn_wrapper.go b/vendor/google.golang.org/grpc/resolver_conn_wrapper.go
index e9cef3a..6934905 100644
--- a/vendor/google.golang.org/grpc/resolver_conn_wrapper.go
+++ b/vendor/google.golang.org/grpc/resolver_conn_wrapper.go
@@ -138,19 +138,22 @@
return
}
grpclog.Infof("ccResolverWrapper: got new service config: %v", sc)
- if channelz.IsOn() {
- ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: sc})
+ c, err := parseServiceConfig(sc)
+ if err != nil {
+ return
}
- ccr.curState.ServiceConfig = sc
+ if channelz.IsOn() {
+ ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: c})
+ }
+ ccr.curState.ServiceConfig = c
ccr.cc.updateResolverState(ccr.curState)
}
func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
- if s.ServiceConfig == ccr.curState.ServiceConfig && (len(ccr.curState.Addresses) == 0) == (len(s.Addresses) == 0) {
- return
- }
var updates []string
- if s.ServiceConfig != ccr.curState.ServiceConfig {
+ oldSC, oldOK := ccr.curState.ServiceConfig.(*ServiceConfig)
+ newSC, newOK := s.ServiceConfig.(*ServiceConfig)
+ if oldOK != newOK || (oldOK && newOK && oldSC.rawJSONString != newSC.rawJSONString) {
updates = append(updates, "service config updated")
}
if len(ccr.curState.Addresses) > 0 && len(s.Addresses) == 0 {
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index 495a4f9..f064b73 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -42,6 +42,7 @@
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/binarylog"
"google.golang.org/grpc/internal/channelz"
+ "google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
@@ -56,6 +57,8 @@
defaultServerMaxSendMessageSize = math.MaxInt32
)
+var statusOK = status.New(codes.OK, "")
+
type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
// MethodDesc represents an RPC service's method specification.
@@ -90,17 +93,15 @@
mu sync.Mutex // guards following
lis map[net.Listener]bool
- conns map[io.Closer]bool
+ conns map[transport.ServerTransport]bool
serve bool
drain bool
cv *sync.Cond // signaled when connections close for GracefulStop
m map[string]*service // service name -> service info
events trace.EventLog
- quit chan struct{}
- done chan struct{}
- quitOnce sync.Once
- doneOnce sync.Once
+ quit *grpcsync.Event
+ done *grpcsync.Event
channelzRemoveOnce sync.Once
serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
@@ -386,10 +387,10 @@
s := &Server{
lis: make(map[net.Listener]bool),
opts: opts,
- conns: make(map[io.Closer]bool),
+ conns: make(map[transport.ServerTransport]bool),
m: make(map[string]*service),
- quit: make(chan struct{}),
- done: make(chan struct{}),
+ quit: grpcsync.NewEvent(),
+ done: grpcsync.NewEvent(),
czData: new(channelzData),
}
s.cv = sync.NewCond(&s.mu)
@@ -556,11 +557,9 @@
s.serveWG.Add(1)
defer func() {
s.serveWG.Done()
- select {
- // Stop or GracefulStop called; block until done and return nil.
- case <-s.quit:
- <-s.done
- default:
+ if s.quit.HasFired() {
+ // Stop or GracefulStop called; block until done and return nil.
+ <-s.done.Done()
}
}()
@@ -603,7 +602,7 @@
timer := time.NewTimer(tempDelay)
select {
case <-timer.C:
- case <-s.quit:
+ case <-s.quit.Done():
timer.Stop()
return nil
}
@@ -613,10 +612,8 @@
s.printf("done serving; Accept = %v", err)
s.mu.Unlock()
- select {
- case <-s.quit:
+ if s.quit.HasFired() {
return nil
- default:
}
return err
}
@@ -637,6 +634,10 @@
// handleRawConn forks a goroutine to handle a just-accepted connection that
// has not had any I/O performed on it yet.
func (s *Server) handleRawConn(rawConn net.Conn) {
+ if s.quit.HasFired() {
+ rawConn.Close()
+ return
+ }
rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
if err != nil {
@@ -653,14 +654,6 @@
return
}
- s.mu.Lock()
- if s.conns == nil {
- s.mu.Unlock()
- conn.Close()
- return
- }
- s.mu.Unlock()
-
// Finish handshaking (HTTP2)
st := s.newHTTP2Transport(conn, authInfo)
if st == nil {
@@ -768,6 +761,9 @@
// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
// If tracing is not enabled, it returns nil.
func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
+ if !EnableTracing {
+ return nil
+ }
tr, ok := trace.FromContext(stream.Context())
if !ok {
return nil
@@ -786,27 +782,27 @@
return trInfo
}
-func (s *Server) addConn(c io.Closer) bool {
+func (s *Server) addConn(st transport.ServerTransport) bool {
s.mu.Lock()
defer s.mu.Unlock()
if s.conns == nil {
- c.Close()
+ st.Close()
return false
}
if s.drain {
// Transport added after we drained our existing conns: drain it
// immediately.
- c.(transport.ServerTransport).Drain()
+ st.Drain()
}
- s.conns[c] = true
+ s.conns[st] = true
return true
}
-func (s *Server) removeConn(c io.Closer) {
+func (s *Server) removeConn(st transport.ServerTransport) {
s.mu.Lock()
defer s.mu.Unlock()
if s.conns != nil {
- delete(s.conns, c)
+ delete(s.conns, st)
s.cv.Broadcast()
}
}
@@ -978,10 +974,11 @@
}
if sh != nil {
sh.HandleRPC(stream.Context(), &stats.InPayload{
- RecvTime: time.Now(),
- Payload: v,
- Data: d,
- Length: len(d),
+ RecvTime: time.Now(),
+ Payload: v,
+ WireLength: payInfo.wireLength,
+ Data: d,
+ Length: len(d),
})
}
if binlog != nil {
@@ -1077,7 +1074,7 @@
// TODO: Should we be logging if writing status failed here, like above?
// Should the logging be in WriteStatus? Should we ignore the WriteStatus
// error or allow the stats handler to see it?
- err = t.WriteStatus(stream, status.New(codes.OK, ""))
+ err = t.WriteStatus(stream, statusOK)
if binlog != nil {
binlog.Log(&binarylog.ServerTrailer{
Trailer: stream.Trailer(),
@@ -1235,7 +1232,7 @@
ss.trInfo.tr.LazyLog(stringer("OK"), false)
ss.mu.Unlock()
}
- err = t.WriteStatus(ss.s, status.New(codes.OK, ""))
+ err = t.WriteStatus(ss.s, statusOK)
if ss.binlog != nil {
ss.binlog.Log(&binarylog.ServerTrailer{
Trailer: ss.s.Trailer(),
@@ -1352,15 +1349,11 @@
// pending RPCs on the client side will get notified by connection
// errors.
func (s *Server) Stop() {
- s.quitOnce.Do(func() {
- close(s.quit)
- })
+ s.quit.Fire()
defer func() {
s.serveWG.Wait()
- s.doneOnce.Do(func() {
- close(s.done)
- })
+ s.done.Fire()
}()
s.channelzRemoveOnce.Do(func() {
@@ -1397,15 +1390,8 @@
// accepting new connections and RPCs and blocks until all the pending RPCs are
// finished.
func (s *Server) GracefulStop() {
- s.quitOnce.Do(func() {
- close(s.quit)
- })
-
- defer func() {
- s.doneOnce.Do(func() {
- close(s.done)
- })
- }()
+ s.quit.Fire()
+ defer s.done.Fire()
s.channelzRemoveOnce.Do(func() {
if channelz.IsOn() {
@@ -1423,8 +1409,8 @@
}
s.lis = nil
if !s.drain {
- for c := range s.conns {
- c.(transport.ServerTransport).Drain()
+ for st := range s.conns {
+ st.Drain()
}
s.drain = true
}
diff --git a/vendor/google.golang.org/grpc/service_config.go b/vendor/google.golang.org/grpc/service_config.go
index 1c52274..686ad7b 100644
--- a/vendor/google.golang.org/grpc/service_config.go
+++ b/vendor/google.golang.org/grpc/service_config.go
@@ -25,8 +25,11 @@
"strings"
"time"
+ "google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/internal"
+ "google.golang.org/grpc/serviceconfig"
)
const maxInt = int(^uint(0) >> 1)
@@ -61,6 +64,11 @@
retryPolicy *retryPolicy
}
+type lbConfig struct {
+ name string
+ cfg serviceconfig.LoadBalancingConfig
+}
+
// ServiceConfig is provided by the service provider and contains parameters for how
// clients that connect to the service should behave.
//
@@ -68,10 +76,18 @@
// through name resolver, as specified here
// https://github.com/grpc/grpc/blob/master/doc/service_config.md
type ServiceConfig struct {
- // LB is the load balancer the service providers recommends. The balancer specified
- // via grpc.WithBalancer will override this.
+ serviceconfig.Config
+
+ // LB is the load balancer the service providers recommends. The balancer
+ // specified via grpc.WithBalancer will override this. This is deprecated;
+ // lbConfigs is preferred. If lbConfig and LB are both present, lbConfig
+ // will be used.
LB *string
+ // lbConfig is the service config's load balancing configuration. If
+ // lbConfig and LB are both present, lbConfig will be used.
+ lbConfig *lbConfig
+
// Methods contains a map for the methods in this service. If there is an
// exact match for a method (i.e. /service/method) in the map, use the
// corresponding MethodConfig. If there's no exact match, look for the
@@ -233,15 +249,27 @@
RetryPolicy *jsonRetryPolicy
}
+type loadBalancingConfig map[string]json.RawMessage
+
// TODO(lyuxuan): delete this struct after cleaning up old service config implementation.
type jsonSC struct {
LoadBalancingPolicy *string
+ LoadBalancingConfig *[]loadBalancingConfig
MethodConfig *[]jsonMC
RetryThrottling *retryThrottlingPolicy
HealthCheckConfig *healthCheckConfig
}
+func init() {
+ internal.ParseServiceConfig = func(sc string) (interface{}, error) {
+ return parseServiceConfig(sc)
+ }
+}
+
func parseServiceConfig(js string) (*ServiceConfig, error) {
+ if len(js) == 0 {
+ return nil, fmt.Errorf("no JSON service config provided")
+ }
var rsc jsonSC
err := json.Unmarshal([]byte(js), &rsc)
if err != nil {
@@ -255,10 +283,46 @@
healthCheckConfig: rsc.HealthCheckConfig,
rawJSONString: js,
}
+ if rsc.LoadBalancingConfig != nil {
+ for i, lbcfg := range *rsc.LoadBalancingConfig {
+ if len(lbcfg) != 1 {
+ err := fmt.Errorf("invalid loadBalancingConfig: entry %v does not contain exactly 1 policy/config pair: %q", i, lbcfg)
+ grpclog.Warningf(err.Error())
+ return nil, err
+ }
+ var name string
+ var jsonCfg json.RawMessage
+ for name, jsonCfg = range lbcfg {
+ }
+ builder := balancer.Get(name)
+ if builder == nil {
+ continue
+ }
+ sc.lbConfig = &lbConfig{name: name}
+ if parser, ok := builder.(balancer.ConfigParser); ok {
+ var err error
+ sc.lbConfig.cfg, err = parser.ParseConfig(jsonCfg)
+ if err != nil {
+ return nil, fmt.Errorf("error parsing loadBalancingConfig for policy %q: %v", name, err)
+ }
+ } else if string(jsonCfg) != "{}" {
+ grpclog.Warningf("non-empty balancer configuration %q, but balancer does not implement ParseConfig", string(jsonCfg))
+ }
+ break
+ }
+ if sc.lbConfig == nil {
+ // We had a loadBalancingConfig field but did not encounter a
+ // supported policy. The config is considered invalid in this
+ // case.
+ err := fmt.Errorf("invalid loadBalancingConfig: no supported policies found")
+ grpclog.Warningf(err.Error())
+ return nil, err
+ }
+ }
+
if rsc.MethodConfig == nil {
return &sc, nil
}
-
for _, m := range *rsc.MethodConfig {
if m.Name == nil {
continue
@@ -299,11 +363,11 @@
}
if sc.retryThrottling != nil {
- if sc.retryThrottling.MaxTokens <= 0 ||
- sc.retryThrottling.MaxTokens > 1000 ||
- sc.retryThrottling.TokenRatio <= 0 {
- // Illegal throttling config; disable throttling.
- sc.retryThrottling = nil
+ if mt := sc.retryThrottling.MaxTokens; mt <= 0 || mt > 1000 {
+ return nil, fmt.Errorf("invalid retry throttling config: maxTokens (%v) out of range (0, 1000]", mt)
+ }
+ if tr := sc.retryThrottling.TokenRatio; tr <= 0 {
+ return nil, fmt.Errorf("invalid retry throttling config: tokenRatio (%v) may not be negative", tr)
}
}
return &sc, nil
diff --git a/vendor/google.golang.org/grpc/serviceconfig/serviceconfig.go b/vendor/google.golang.org/grpc/serviceconfig/serviceconfig.go
new file mode 100644
index 0000000..53b2787
--- /dev/null
+++ b/vendor/google.golang.org/grpc/serviceconfig/serviceconfig.go
@@ -0,0 +1,48 @@
+/*
+ *
+ * Copyright 2019 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package serviceconfig defines types and methods for operating on gRPC
+// service configs.
+//
+// This package is EXPERIMENTAL.
+package serviceconfig
+
+import (
+ "google.golang.org/grpc/internal"
+)
+
+// Config represents an opaque data structure holding a service config.
+type Config interface {
+ isConfig()
+}
+
+// LoadBalancingConfig represents an opaque data structure holding a load
+// balancer config.
+type LoadBalancingConfig interface {
+ isLoadBalancingConfig()
+}
+
+// Parse parses the JSON service config provided into an internal form or
+// returns an error if the config is invalid.
+func Parse(ServiceConfigJSON string) (Config, error) {
+ c, err := internal.ParseServiceConfig(ServiceConfigJSON)
+ if err != nil {
+ return nil, err
+ }
+ return c.(Config), err
+}
diff --git a/vendor/google.golang.org/grpc/status/status.go b/vendor/google.golang.org/grpc/status/status.go
index ed36681..a1348e9 100644
--- a/vendor/google.golang.org/grpc/status/status.go
+++ b/vendor/google.golang.org/grpc/status/status.go
@@ -36,8 +36,15 @@
"github.com/golang/protobuf/ptypes"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
+ "google.golang.org/grpc/internal"
)
+func init() {
+ internal.StatusRawProto = statusRawProto
+}
+
+func statusRawProto(s *Status) *spb.Status { return s.s }
+
// statusError is an alias of a status proto. It implements error and Status,
// and a nil statusError should never be returned by this package.
type statusError spb.Status
@@ -51,6 +58,17 @@
return &Status{s: (*spb.Status)(se)}
}
+// Is implements future error.Is functionality.
+// A statusError is equivalent if the code and message are identical.
+func (se *statusError) Is(target error) bool {
+ tse, ok := target.(*statusError)
+ if !ok {
+ return false
+ }
+
+ return proto.Equal((*spb.Status)(se), (*spb.Status)(tse))
+}
+
// Status represents an RPC status code, message, and details. It is immutable
// and should be created with New, Newf, or FromProto.
type Status struct {
@@ -125,7 +143,7 @@
// Status is returned with codes.Unknown and the original error message.
func FromError(err error) (s *Status, ok bool) {
if err == nil {
- return &Status{s: &spb.Status{Code: int32(codes.OK)}}, true
+ return nil, true
}
if se, ok := err.(interface {
GRPCStatus() *Status
@@ -199,7 +217,7 @@
func FromContextError(err error) *Status {
switch err {
case nil:
- return New(codes.OK, "")
+ return nil
case context.DeadlineExceeded:
return New(codes.DeadlineExceeded, err.Error())
case context.Canceled:
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index e10e623..134a624 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -30,7 +30,6 @@
"golang.org/x/net/trace"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
- "google.golang.org/grpc/connectivity"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/balancerload"
@@ -328,13 +327,23 @@
return cs, nil
}
-func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) error {
- cs.attempt = &csAttempt{
+// newAttemptLocked creates a new attempt with a transport.
+// If it succeeds, then it replaces clientStream's attempt with this new attempt.
+func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (retErr error) {
+ newAttempt := &csAttempt{
cs: cs,
dc: cs.cc.dopts.dc,
statsHandler: sh,
trInfo: trInfo,
}
+ defer func() {
+ if retErr != nil {
+ // This attempt is not set in the clientStream, so it's finish won't
+ // be called. Call it here for stats and trace in case they are not
+ // nil.
+ newAttempt.finish(retErr)
+ }
+ }()
if err := cs.ctx.Err(); err != nil {
return toRPCErr(err)
@@ -346,8 +355,9 @@
if trInfo != nil {
trInfo.firstLine.SetRemoteAddr(t.RemoteAddr())
}
- cs.attempt.t = t
- cs.attempt.done = done
+ newAttempt.t = t
+ newAttempt.done = done
+ cs.attempt = newAttempt
return nil
}
@@ -396,11 +406,18 @@
serverHeaderBinlogged bool
mu sync.Mutex
- firstAttempt bool // if true, transparent retry is valid
- numRetries int // exclusive of transparent retry attempt(s)
- numRetriesSincePushback int // retries since pushback; to reset backoff
- finished bool // TODO: replace with atomic cmpxchg or sync.Once?
- attempt *csAttempt // the active client stream attempt
+ firstAttempt bool // if true, transparent retry is valid
+ numRetries int // exclusive of transparent retry attempt(s)
+ numRetriesSincePushback int // retries since pushback; to reset backoff
+ finished bool // TODO: replace with atomic cmpxchg or sync.Once?
+ // attempt is the active client stream attempt.
+ // The only place where it is written is the newAttemptLocked method and this method never writes nil.
+ // So, attempt can be nil only inside newClientStream function when clientStream is first created.
+ // One of the first things done after clientStream's creation, is to call newAttemptLocked which either
+ // assigns a non nil value to the attempt or returns an error. If an error is returned from newAttemptLocked,
+ // then newClientStream calls finish on the clientStream and returns. So, finish method is the only
+ // place where we need to check if the attempt is nil.
+ attempt *csAttempt
// TODO(hedging): hedging will have multiple attempts simultaneously.
committed bool // active attempt committed for retry?
buffer []func(a *csAttempt) error // operations to replay on retry
@@ -458,8 +475,8 @@
if cs.attempt.s != nil {
<-cs.attempt.s.Done()
}
- if cs.firstAttempt && !cs.callInfo.failFast && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) {
- // First attempt, wait-for-ready, stream unprocessed: transparently retry.
+ if cs.firstAttempt && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) {
+ // First attempt, stream unprocessed: transparently retry.
cs.firstAttempt = false
return nil
}
@@ -806,11 +823,11 @@
}
if cs.attempt != nil {
cs.attempt.finish(err)
- }
- // after functions all rely upon having a stream.
- if cs.attempt.s != nil {
- for _, o := range cs.opts {
- o.after(cs.callInfo)
+ // after functions all rely upon having a stream.
+ if cs.attempt.s != nil {
+ for _, o := range cs.opts {
+ o.after(cs.callInfo)
+ }
}
}
cs.cancel()
@@ -965,19 +982,18 @@
a.mu.Unlock()
}
-func (ac *addrConn) newClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, opts ...CallOption) (_ ClientStream, err error) {
- ac.mu.Lock()
- if ac.transport != t {
- ac.mu.Unlock()
- return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
- }
- // transition to CONNECTING state when an attempt starts
- if ac.state != connectivity.Connecting {
- ac.updateConnectivityState(connectivity.Connecting)
- ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
- }
- ac.mu.Unlock()
-
+// newClientStream creates a ClientStream with the specified transport, on the
+// given addrConn.
+//
+// It's expected that the given transport is either the same one in addrConn, or
+// is already closed. To avoid race, transport is specified separately, instead
+// of using ac.transpot.
+//
+// Main difference between this and ClientConn.NewStream:
+// - no retry
+// - no service config (or wait for service config)
+// - no tracing or stats
+func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, ac *addrConn, opts ...CallOption) (_ ClientStream, err error) {
if t == nil {
// TODO: return RPC error here?
return nil, errors.New("transport provided is nil")
@@ -985,14 +1001,6 @@
// defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct.
c := &callInfo{}
- for _, o := range opts {
- if err := o.before(c); err != nil {
- return nil, toRPCErr(err)
- }
- }
- c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
- c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
-
// Possible context leak:
// The cancel function for the child context we create will only be called
// when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
@@ -1005,6 +1013,13 @@
}
}()
+ for _, o := range opts {
+ if err := o.before(c); err != nil {
+ return nil, toRPCErr(err)
+ }
+ }
+ c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
+ c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
if err := setCallInfoCodec(c); err != nil {
return nil, err
}
@@ -1037,6 +1052,7 @@
callHdr.Creds = c.creds
}
+ // Use a special addrConnStream to avoid retry.
as := &addrConnStream{
callHdr: callHdr,
ac: ac,
diff --git a/vendor/google.golang.org/grpc/version.go b/vendor/google.golang.org/grpc/version.go
index 376f0b0..483ef89 100644
--- a/vendor/google.golang.org/grpc/version.go
+++ b/vendor/google.golang.org/grpc/version.go
@@ -19,4 +19,4 @@
package grpc
// Version is the current grpc version.
-const Version = "1.21.0"
+const Version = "1.24.0"
diff --git a/vendor/google.golang.org/grpc/vet.sh b/vendor/google.golang.org/grpc/vet.sh
index 11037b9..2d79b1c 100644
--- a/vendor/google.golang.org/grpc/vet.sh
+++ b/vendor/google.golang.org/grpc/vet.sh
@@ -67,7 +67,7 @@
fi
# - Ensure all source files contain a copyright message.
-git ls-files "*.go" | xargs grep -L "\(Copyright [0-9]\{4,\} gRPC authors\)\|DO NOT EDIT" 2>&1 | fail_on_output
+(! git grep -L "\(Copyright [0-9]\{4,\} gRPC authors\)\|DO NOT EDIT" -- '*.go')
# - Make sure all tests in grpc and grpc/test use leakcheck via Teardown.
(! grep 'func Test[^(]' *_test.go)
@@ -75,10 +75,10 @@
# - Do not import math/rand for real library code. Use internal/grpcrand for
# thread safety.
-git ls-files "*.go" | xargs grep -l '"math/rand"' 2>&1 | (! grep -v '^examples\|^stress\|grpcrand\|wrr_test')
+git grep -l '"math/rand"' -- "*.go" 2>&1 | (! grep -v '^examples\|^stress\|grpcrand\|wrr_test')
# - Ensure all ptypes proto packages are renamed when importing.
-git ls-files "*.go" | (! xargs grep "\(import \|^\s*\)\"github.com/golang/protobuf/ptypes/")
+(! git grep "\(import \|^\s*\)\"github.com/golang/protobuf/ptypes/" -- "*.go")
# - Check imports that are illegal in appengine (until Go 1.11).
# TODO: Remove when we drop Go 1.10 support
@@ -86,7 +86,7 @@
# - gofmt, goimports, golint (with exceptions for generated code), go vet.
gofmt -s -d -l . 2>&1 | fail_on_output
-goimports -l . 2>&1 | (! grep -vE "(_mock|\.pb)\.go:") | fail_on_output
+goimports -l . 2>&1 | (! grep -vE "(_mock|\.pb)\.go") | fail_on_output
golint ./... 2>&1 | (! grep -vE "(_mock|\.pb)\.go:")
go vet -all .
@@ -105,12 +105,15 @@
fi
# - Collection of static analysis checks
-# TODO(menghanl): fix errors in transport_test.
+# TODO(dfawley): don't use deprecated functions in examples.
staticcheck -go 1.9 -checks 'inherit,-ST1015' -ignore '
google.golang.org/grpc/balancer.go:SA1019
+google.golang.org/grpc/balancer/grpclb/grpclb_remote_balancer.go:SA1019
google.golang.org/grpc/balancer/roundrobin/roundrobin_test.go:SA1019
-google.golang.org/grpc/balancer/xds/edsbalancer/balancergroup.go:SA1019
-google.golang.org/grpc/balancer/xds/xds.go:SA1019
+google.golang.org/grpc/xds/internal/balancer/edsbalancer/balancergroup.go:SA1019
+google.golang.org/grpc/xds/internal/resolver/xds_resolver.go:SA1019
+google.golang.org/grpc/xds/internal/balancer/xds.go:SA1019
+google.golang.org/grpc/xds/internal/balancer/xds_client.go:SA1019
google.golang.org/grpc/balancer_conn_wrappers.go:SA1019
google.golang.org/grpc/balancer_test.go:SA1019
google.golang.org/grpc/benchmark/benchmain/main.go:SA1019
@@ -118,10 +121,13 @@
google.golang.org/grpc/clientconn.go:S1024
google.golang.org/grpc/clientconn_state_transition_test.go:SA1019
google.golang.org/grpc/clientconn_test.go:SA1019
+google.golang.org/grpc/examples/features/debugging/client/main.go:SA1019
+google.golang.org/grpc/examples/features/load_balancing/client/main.go:SA1019
google.golang.org/grpc/internal/transport/handler_server.go:SA1019
google.golang.org/grpc/internal/transport/handler_server_test.go:SA1019
google.golang.org/grpc/resolver/dns/dns_resolver.go:SA1019
google.golang.org/grpc/stats/stats_test.go:SA1019
+google.golang.org/grpc/test/balancer_test.go:SA1019
google.golang.org/grpc/test/channelz_test.go:SA1019
google.golang.org/grpc/test/end2end_test.go:SA1019
google.golang.org/grpc/test/healthcheck_test.go:SA1019