gRPC migration update
Change-Id: Icdd1a824948fa994cd36bd121c962f5ecf74e3cf
diff --git a/vendor/google.golang.org/grpc/internal/backoff/backoff.go b/vendor/google.golang.org/grpc/internal/backoff/backoff.go
index 1bd0cce..5fc0ee3 100644
--- a/vendor/google.golang.org/grpc/internal/backoff/backoff.go
+++ b/vendor/google.golang.org/grpc/internal/backoff/backoff.go
@@ -25,44 +25,39 @@
import (
"time"
+ grpcbackoff "google.golang.org/grpc/backoff"
"google.golang.org/grpc/internal/grpcrand"
)
// Strategy defines the methodology for backing off after a grpc connection
// failure.
-//
type Strategy interface {
// Backoff returns the amount of time to wait before the next retry given
// the number of consecutive failures.
Backoff(retries int) time.Duration
}
-const (
- // baseDelay is the amount of time to wait before retrying after the first
- // failure.
- baseDelay = 1.0 * time.Second
- // factor is applied to the backoff after each retry.
- factor = 1.6
- // jitter provides a range to randomize backoff delays.
- jitter = 0.2
-)
+// DefaultExponential is an exponential backoff implementation using the
+// default values for all the configurable knobs defined in
+// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md.
+var DefaultExponential = Exponential{Config: grpcbackoff.DefaultConfig}
// Exponential implements exponential backoff algorithm as defined in
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md.
type Exponential struct {
- // MaxDelay is the upper bound of backoff delay.
- MaxDelay time.Duration
+ // Config contains all options to configure the backoff algorithm.
+ Config grpcbackoff.Config
}
// Backoff returns the amount of time to wait before the next retry given the
// number of retries.
func (bc Exponential) Backoff(retries int) time.Duration {
if retries == 0 {
- return baseDelay
+ return bc.Config.BaseDelay
}
- backoff, max := float64(baseDelay), float64(bc.MaxDelay)
+ backoff, max := float64(bc.Config.BaseDelay), float64(bc.Config.MaxDelay)
for backoff < max && retries > 0 {
- backoff *= factor
+ backoff *= bc.Config.Multiplier
retries--
}
if backoff > max {
@@ -70,7 +65,7 @@
}
// Randomize backoff delays so that if a cluster of requests start at
// the same time, they won't operate in lockstep.
- backoff *= 1 + jitter*(grpcrand.Float64()*2-1)
+ backoff *= 1 + bc.Config.Jitter*(grpcrand.Float64()*2-1)
if backoff < 0 {
return 0
}
diff --git a/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go b/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go
index fee6aec..4062c02 100644
--- a/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go
+++ b/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go
@@ -34,7 +34,7 @@
}
// binLogger is the global binary logger for the binary. One of this should be
-// built at init time from the configuration (environment varialbe or flags).
+// built at init time from the configuration (environment variable or flags).
//
// It is used to get a methodLogger for each individual method.
var binLogger Logger
diff --git a/vendor/google.golang.org/grpc/internal/binarylog/env_config.go b/vendor/google.golang.org/grpc/internal/binarylog/env_config.go
index 4cc2525..be30d0e 100644
--- a/vendor/google.golang.org/grpc/internal/binarylog/env_config.go
+++ b/vendor/google.golang.org/grpc/internal/binarylog/env_config.go
@@ -43,7 +43,7 @@
// Foo.
//
// If two configs exist for one certain method or service, the one specified
-// later overrides the privous config.
+// later overrides the previous config.
func NewLoggerFromConfigString(s string) Logger {
if s == "" {
return nil
@@ -74,7 +74,7 @@
return fmt.Errorf("invalid config: %q, %v", config, err)
}
if m == "*" {
- return fmt.Errorf("invalid config: %q, %v", config, "* not allowd in blacklist config")
+ return fmt.Errorf("invalid config: %q, %v", config, "* not allowed in blacklist config")
}
if suffix != "" {
return fmt.Errorf("invalid config: %q, %v", config, "header/message limit not allowed in blacklist config")
diff --git a/vendor/google.golang.org/grpc/internal/binarylog/sink.go b/vendor/google.golang.org/grpc/internal/binarylog/sink.go
index 20d044f..a2e7c34 100644
--- a/vendor/google.golang.org/grpc/internal/binarylog/sink.go
+++ b/vendor/google.golang.org/grpc/internal/binarylog/sink.go
@@ -63,7 +63,7 @@
// newWriterSink creates a binary log sink with the given writer.
//
-// Write() marshalls the proto message and writes it to the given writer. Each
+// Write() marshals the proto message and writes it to the given writer. Each
// message is prefixed with a 4 byte big endian unsigned integer as the length.
//
// No buffer is done, Close() doesn't try to close the writer.
diff --git a/vendor/google.golang.org/grpc/internal/buffer/unbounded.go b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go
new file mode 100644
index 0000000..2cb3109
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2019 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package buffer provides an implementation of an unbounded buffer.
+package buffer
+
+import "sync"
+
+// Unbounded is an implementation of an unbounded buffer which does not use
+// extra goroutines. This is typically used for passing updates from one entity
+// to another within gRPC.
+//
+// All methods on this type are thread-safe and don't block on anything except
+// the underlying mutex used for synchronization.
+type Unbounded struct {
+ c chan interface{}
+ mu sync.Mutex
+ backlog []interface{}
+}
+
+// NewUnbounded returns a new instance of Unbounded.
+func NewUnbounded() *Unbounded {
+ return &Unbounded{c: make(chan interface{}, 1)}
+}
+
+// Put adds t to the unbounded buffer.
+func (b *Unbounded) Put(t interface{}) {
+ b.mu.Lock()
+ if len(b.backlog) == 0 {
+ select {
+ case b.c <- t:
+ b.mu.Unlock()
+ return
+ default:
+ }
+ }
+ b.backlog = append(b.backlog, t)
+ b.mu.Unlock()
+}
+
+// Load sends the earliest buffered data, if any, onto the read channel
+// returned by Get(). Users are expected to call this every time they read a
+// value from the read channel.
+func (b *Unbounded) Load() {
+ b.mu.Lock()
+ if len(b.backlog) > 0 {
+ select {
+ case b.c <- b.backlog[0]:
+ b.backlog[0] = nil
+ b.backlog = b.backlog[1:]
+ default:
+ }
+ }
+ b.mu.Unlock()
+}
+
+// Get returns a read channel on which values added to the buffer, via Put(),
+// are sent on.
+//
+// Upon reading a value from this channel, users are expected to call Load() to
+// send the next buffered value onto the channel if there is any.
+func (b *Unbounded) Get() <-chan interface{} {
+ return b.c
+}
diff --git a/vendor/google.golang.org/grpc/internal/internal.go b/vendor/google.golang.org/grpc/internal/internal.go
index bc1f99a..b96b359 100644
--- a/vendor/google.golang.org/grpc/internal/internal.go
+++ b/vendor/google.golang.org/grpc/internal/internal.go
@@ -28,9 +28,9 @@
)
var (
- // WithResolverBuilder is exported by dialoptions.go
+ // WithResolverBuilder is set by dialoptions.go
WithResolverBuilder interface{} // func (resolver.Builder) grpc.DialOption
- // WithHealthCheckFunc is not exported by dialoptions.go
+ // WithHealthCheckFunc is set by dialoptions.go
WithHealthCheckFunc interface{} // func (HealthChecker) DialOption
// HealthCheckFunc is used to provide client-side LB channel health checking
HealthCheckFunc HealthChecker
@@ -39,14 +39,17 @@
// KeepaliveMinPingTime is the minimum ping interval. This must be 10s by
// default, but tests may wish to set it lower for convenience.
KeepaliveMinPingTime = 10 * time.Second
- // ParseServiceConfig is a function to parse JSON service configs into
- // opaque data structures.
- ParseServiceConfig func(sc string) (interface{}, error)
// StatusRawProto is exported by status/status.go. This func returns a
// pointer to the wrapped Status proto for a given status.Status without a
// call to proto.Clone(). The returned Status proto should not be mutated by
// the caller.
StatusRawProto interface{} // func (*status.Status) *spb.Status
+ // NewRequestInfoContext creates a new context based on the argument context attaching
+ // the passed in RequestInfo to the new context.
+ NewRequestInfoContext interface{} // func(context.Context, credentials.RequestInfo) context.Context
+ // ParseServiceConfigForTesting is for creating a fake
+ // ClientConn for resolver testing only
+ ParseServiceConfigForTesting interface{} // func(string) *serviceconfig.ParseResult
)
// HealthChecker defines the signature of the client-side LB channel health checking function.
diff --git a/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
new file mode 100644
index 0000000..abc0f92
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
@@ -0,0 +1,460 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package dns implements a dns resolver to be installed as the default resolver
+// in grpc.
+package dns
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "net"
+ "os"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "google.golang.org/grpc/backoff"
+ "google.golang.org/grpc/grpclog"
+ internalbackoff "google.golang.org/grpc/internal/backoff"
+ "google.golang.org/grpc/internal/grpcrand"
+ "google.golang.org/grpc/resolver"
+)
+
+func init() {
+ resolver.Register(NewBuilder())
+}
+
+const (
+ defaultPort = "443"
+ defaultFreq = time.Minute * 30
+ defaultDNSSvrPort = "53"
+ golang = "GO"
+ // txtPrefix is the prefix string to be prepended to the host name for txt record lookup.
+ txtPrefix = "_grpc_config."
+ // In DNS, service config is encoded in a TXT record via the mechanism
+ // described in RFC-1464 using the attribute name grpc_config.
+ txtAttribute = "grpc_config="
+)
+
+var (
+ errMissingAddr = errors.New("dns resolver: missing address")
+
+ // Addresses ending with a colon that is supposed to be the separator
+ // between host and port is not allowed. E.g. "::" is a valid address as
+ // it is an IPv6 address (host only) and "[::]:" is invalid as it ends with
+ // a colon as the host and port separator
+ errEndsWithColon = errors.New("dns resolver: missing port after port-separator colon")
+)
+
+var (
+ defaultResolver netResolver = net.DefaultResolver
+ // To prevent excessive re-resolution, we enforce a rate limit on DNS
+ // resolution requests.
+ minDNSResRate = 30 * time.Second
+)
+
+var customAuthorityDialler = func(authority string) func(ctx context.Context, network, address string) (net.Conn, error) {
+ return func(ctx context.Context, network, address string) (net.Conn, error) {
+ var dialer net.Dialer
+ return dialer.DialContext(ctx, network, authority)
+ }
+}
+
+var customAuthorityResolver = func(authority string) (netResolver, error) {
+ host, port, err := parseTarget(authority, defaultDNSSvrPort)
+ if err != nil {
+ return nil, err
+ }
+
+ authorityWithPort := net.JoinHostPort(host, port)
+
+ return &net.Resolver{
+ PreferGo: true,
+ Dial: customAuthorityDialler(authorityWithPort),
+ }, nil
+}
+
+// NewBuilder creates a dnsBuilder which is used to factory DNS resolvers.
+func NewBuilder() resolver.Builder {
+ return &dnsBuilder{minFreq: defaultFreq}
+}
+
+type dnsBuilder struct {
+ // minimum frequency of polling the DNS server.
+ minFreq time.Duration
+}
+
+// Build creates and starts a DNS resolver that watches the name resolution of the target.
+func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
+ host, port, err := parseTarget(target.Endpoint, defaultPort)
+ if err != nil {
+ return nil, err
+ }
+
+ // IP address.
+ if net.ParseIP(host) != nil {
+ host, _ = formatIP(host)
+ addr := []resolver.Address{{Addr: host + ":" + port}}
+ i := &ipResolver{
+ cc: cc,
+ ip: addr,
+ rn: make(chan struct{}, 1),
+ q: make(chan struct{}),
+ }
+ cc.NewAddress(addr)
+ go i.watcher()
+ return i, nil
+ }
+
+ // DNS address (non-IP).
+ ctx, cancel := context.WithCancel(context.Background())
+ bc := backoff.DefaultConfig
+ bc.MaxDelay = b.minFreq
+ d := &dnsResolver{
+ freq: b.minFreq,
+ backoff: internalbackoff.Exponential{Config: bc},
+ host: host,
+ port: port,
+ ctx: ctx,
+ cancel: cancel,
+ cc: cc,
+ t: time.NewTimer(0),
+ rn: make(chan struct{}, 1),
+ disableServiceConfig: opts.DisableServiceConfig,
+ }
+
+ if target.Authority == "" {
+ d.resolver = defaultResolver
+ } else {
+ d.resolver, err = customAuthorityResolver(target.Authority)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ d.wg.Add(1)
+ go d.watcher()
+ return d, nil
+}
+
+// Scheme returns the naming scheme of this resolver builder, which is "dns".
+func (b *dnsBuilder) Scheme() string {
+ return "dns"
+}
+
+type netResolver interface {
+ LookupHost(ctx context.Context, host string) (addrs []string, err error)
+ LookupSRV(ctx context.Context, service, proto, name string) (cname string, addrs []*net.SRV, err error)
+ LookupTXT(ctx context.Context, name string) (txts []string, err error)
+}
+
+// ipResolver watches for the name resolution update for an IP address.
+type ipResolver struct {
+ cc resolver.ClientConn
+ ip []resolver.Address
+ // rn channel is used by ResolveNow() to force an immediate resolution of the target.
+ rn chan struct{}
+ q chan struct{}
+}
+
+// ResolveNow resend the address it stores, no resolution is needed.
+func (i *ipResolver) ResolveNow(opt resolver.ResolveNowOption) {
+ select {
+ case i.rn <- struct{}{}:
+ default:
+ }
+}
+
+// Close closes the ipResolver.
+func (i *ipResolver) Close() {
+ close(i.q)
+}
+
+func (i *ipResolver) watcher() {
+ for {
+ select {
+ case <-i.rn:
+ i.cc.NewAddress(i.ip)
+ case <-i.q:
+ return
+ }
+ }
+}
+
+// dnsResolver watches for the name resolution update for a non-IP target.
+type dnsResolver struct {
+ freq time.Duration
+ backoff internalbackoff.Exponential
+ retryCount int
+ host string
+ port string
+ resolver netResolver
+ ctx context.Context
+ cancel context.CancelFunc
+ cc resolver.ClientConn
+ // rn channel is used by ResolveNow() to force an immediate resolution of the target.
+ rn chan struct{}
+ t *time.Timer
+ // wg is used to enforce Close() to return after the watcher() goroutine has finished.
+ // Otherwise, data race will be possible. [Race Example] in dns_resolver_test we
+ // replace the real lookup functions with mocked ones to facilitate testing.
+ // If Close() doesn't wait for watcher() goroutine finishes, race detector sometimes
+ // will warns lookup (READ the lookup function pointers) inside watcher() goroutine
+ // has data race with replaceNetFunc (WRITE the lookup function pointers).
+ wg sync.WaitGroup
+ disableServiceConfig bool
+}
+
+// ResolveNow invoke an immediate resolution of the target that this dnsResolver watches.
+func (d *dnsResolver) ResolveNow(opt resolver.ResolveNowOption) {
+ select {
+ case d.rn <- struct{}{}:
+ default:
+ }
+}
+
+// Close closes the dnsResolver.
+func (d *dnsResolver) Close() {
+ d.cancel()
+ d.wg.Wait()
+ d.t.Stop()
+}
+
+func (d *dnsResolver) watcher() {
+ defer d.wg.Done()
+ for {
+ select {
+ case <-d.ctx.Done():
+ return
+ case <-d.t.C:
+ case <-d.rn:
+ if !d.t.Stop() {
+ // Before resetting a timer, it should be stopped to prevent racing with
+ // reads on it's channel.
+ <-d.t.C
+ }
+ }
+
+ result, sc := d.lookup()
+ // Next lookup should happen within an interval defined by d.freq. It may be
+ // more often due to exponential retry on empty address list.
+ if len(result) == 0 {
+ d.retryCount++
+ d.t.Reset(d.backoff.Backoff(d.retryCount))
+ } else {
+ d.retryCount = 0
+ d.t.Reset(d.freq)
+ }
+ d.cc.NewServiceConfig(sc)
+ d.cc.NewAddress(result)
+
+ // Sleep to prevent excessive re-resolutions. Incoming resolution requests
+ // will be queued in d.rn.
+ t := time.NewTimer(minDNSResRate)
+ select {
+ case <-t.C:
+ case <-d.ctx.Done():
+ t.Stop()
+ return
+ }
+ }
+}
+
+func (d *dnsResolver) lookupSRV() []resolver.Address {
+ var newAddrs []resolver.Address
+ _, srvs, err := d.resolver.LookupSRV(d.ctx, "grpclb", "tcp", d.host)
+ if err != nil {
+ grpclog.Infof("grpc: failed dns SRV record lookup due to %v.\n", err)
+ return nil
+ }
+ for _, s := range srvs {
+ lbAddrs, err := d.resolver.LookupHost(d.ctx, s.Target)
+ if err != nil {
+ grpclog.Infof("grpc: failed load balancer address dns lookup due to %v.\n", err)
+ continue
+ }
+ for _, a := range lbAddrs {
+ a, ok := formatIP(a)
+ if !ok {
+ grpclog.Errorf("grpc: failed IP parsing due to %v.\n", err)
+ continue
+ }
+ addr := a + ":" + strconv.Itoa(int(s.Port))
+ newAddrs = append(newAddrs, resolver.Address{Addr: addr, Type: resolver.GRPCLB, ServerName: s.Target})
+ }
+ }
+ return newAddrs
+}
+
+func (d *dnsResolver) lookupTXT() string {
+ ss, err := d.resolver.LookupTXT(d.ctx, txtPrefix+d.host)
+ if err != nil {
+ grpclog.Infof("grpc: failed dns TXT record lookup due to %v.\n", err)
+ return ""
+ }
+ var res string
+ for _, s := range ss {
+ res += s
+ }
+
+ // TXT record must have "grpc_config=" attribute in order to be used as service config.
+ if !strings.HasPrefix(res, txtAttribute) {
+ grpclog.Warningf("grpc: TXT record %v missing %v attribute", res, txtAttribute)
+ return ""
+ }
+ return strings.TrimPrefix(res, txtAttribute)
+}
+
+func (d *dnsResolver) lookupHost() []resolver.Address {
+ var newAddrs []resolver.Address
+ addrs, err := d.resolver.LookupHost(d.ctx, d.host)
+ if err != nil {
+ grpclog.Warningf("grpc: failed dns A record lookup due to %v.\n", err)
+ return nil
+ }
+ for _, a := range addrs {
+ a, ok := formatIP(a)
+ if !ok {
+ grpclog.Errorf("grpc: failed IP parsing due to %v.\n", err)
+ continue
+ }
+ addr := a + ":" + d.port
+ newAddrs = append(newAddrs, resolver.Address{Addr: addr})
+ }
+ return newAddrs
+}
+
+func (d *dnsResolver) lookup() ([]resolver.Address, string) {
+ newAddrs := d.lookupSRV()
+ // Support fallback to non-balancer address.
+ newAddrs = append(newAddrs, d.lookupHost()...)
+ if d.disableServiceConfig {
+ return newAddrs, ""
+ }
+ sc := d.lookupTXT()
+ return newAddrs, canaryingSC(sc)
+}
+
+// formatIP returns ok = false if addr is not a valid textual representation of an IP address.
+// If addr is an IPv4 address, return the addr and ok = true.
+// If addr is an IPv6 address, return the addr enclosed in square brackets and ok = true.
+func formatIP(addr string) (addrIP string, ok bool) {
+ ip := net.ParseIP(addr)
+ if ip == nil {
+ return "", false
+ }
+ if ip.To4() != nil {
+ return addr, true
+ }
+ return "[" + addr + "]", true
+}
+
+// parseTarget takes the user input target string and default port, returns formatted host and port info.
+// If target doesn't specify a port, set the port to be the defaultPort.
+// If target is in IPv6 format and host-name is enclosed in square brackets, brackets
+// are stripped when setting the host.
+// examples:
+// target: "www.google.com" defaultPort: "443" returns host: "www.google.com", port: "443"
+// target: "ipv4-host:80" defaultPort: "443" returns host: "ipv4-host", port: "80"
+// target: "[ipv6-host]" defaultPort: "443" returns host: "ipv6-host", port: "443"
+// target: ":80" defaultPort: "443" returns host: "localhost", port: "80"
+func parseTarget(target, defaultPort string) (host, port string, err error) {
+ if target == "" {
+ return "", "", errMissingAddr
+ }
+ if ip := net.ParseIP(target); ip != nil {
+ // target is an IPv4 or IPv6(without brackets) address
+ return target, defaultPort, nil
+ }
+ if host, port, err = net.SplitHostPort(target); err == nil {
+ if port == "" {
+ // If the port field is empty (target ends with colon), e.g. "[::1]:", this is an error.
+ return "", "", errEndsWithColon
+ }
+ // target has port, i.e ipv4-host:port, [ipv6-host]:port, host-name:port
+ if host == "" {
+ // Keep consistent with net.Dial(): If the host is empty, as in ":80", the local system is assumed.
+ host = "localhost"
+ }
+ return host, port, nil
+ }
+ if host, port, err = net.SplitHostPort(target + ":" + defaultPort); err == nil {
+ // target doesn't have port
+ return host, port, nil
+ }
+ return "", "", fmt.Errorf("invalid target address %v, error info: %v", target, err)
+}
+
+type rawChoice struct {
+ ClientLanguage *[]string `json:"clientLanguage,omitempty"`
+ Percentage *int `json:"percentage,omitempty"`
+ ClientHostName *[]string `json:"clientHostName,omitempty"`
+ ServiceConfig *json.RawMessage `json:"serviceConfig,omitempty"`
+}
+
+func containsString(a *[]string, b string) bool {
+ if a == nil {
+ return true
+ }
+ for _, c := range *a {
+ if c == b {
+ return true
+ }
+ }
+ return false
+}
+
+func chosenByPercentage(a *int) bool {
+ if a == nil {
+ return true
+ }
+ return grpcrand.Intn(100)+1 <= *a
+}
+
+func canaryingSC(js string) string {
+ if js == "" {
+ return ""
+ }
+ var rcs []rawChoice
+ err := json.Unmarshal([]byte(js), &rcs)
+ if err != nil {
+ grpclog.Warningf("grpc: failed to parse service config json string due to %v.\n", err)
+ return ""
+ }
+ cliHostname, err := os.Hostname()
+ if err != nil {
+ grpclog.Warningf("grpc: failed to get client hostname due to %v.\n", err)
+ return ""
+ }
+ var sc string
+ for _, c := range rcs {
+ if !containsString(c.ClientLanguage, golang) ||
+ !chosenByPercentage(c.Percentage) ||
+ !containsString(c.ClientHostName, cliHostname) ||
+ c.ServiceConfig == nil {
+ continue
+ }
+ sc = string(*c.ServiceConfig)
+ break
+ }
+ return sc
+}
diff --git a/vendor/google.golang.org/grpc/internal/resolver/passthrough/passthrough.go b/vendor/google.golang.org/grpc/internal/resolver/passthrough/passthrough.go
new file mode 100644
index 0000000..893d5d1
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/resolver/passthrough/passthrough.go
@@ -0,0 +1,57 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package passthrough implements a pass-through resolver. It sends the target
+// name without scheme back to gRPC as resolved address.
+package passthrough
+
+import "google.golang.org/grpc/resolver"
+
+const scheme = "passthrough"
+
+type passthroughBuilder struct{}
+
+func (*passthroughBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
+ r := &passthroughResolver{
+ target: target,
+ cc: cc,
+ }
+ r.start()
+ return r, nil
+}
+
+func (*passthroughBuilder) Scheme() string {
+ return scheme
+}
+
+type passthroughResolver struct {
+ target resolver.Target
+ cc resolver.ClientConn
+}
+
+func (r *passthroughResolver) start() {
+ r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: r.target.Endpoint}}})
+}
+
+func (*passthroughResolver) ResolveNow(o resolver.ResolveNowOption) {}
+
+func (*passthroughResolver) Close() {}
+
+func init() {
+ resolver.Register(&passthroughBuilder{})
+}
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
index 9bd8c27..294661a 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
@@ -35,6 +35,7 @@
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/syscall"
"google.golang.org/grpc/keepalive"
@@ -46,6 +47,7 @@
// http2Client implements the ClientTransport interface with HTTP2.
type http2Client struct {
+ lastRead int64 // keep this field 64-bit aligned
ctx context.Context
cancel context.CancelFunc
ctxDone <-chan struct{} // Cache the ctx.Done() chan.
@@ -75,9 +77,6 @@
perRPCCreds []credentials.PerRPCCredentials
- // Boolean to keep track of reading activity on transport.
- // 1 is true and 0 is false.
- activity uint32 // Accessed atomically.
kp keepalive.ClientParameters
keepaliveEnabled bool
@@ -352,6 +351,7 @@
func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
// TODO(zhaoq): Handle uint32 overflow of Stream.id.
s := &Stream{
+ ct: t,
done: make(chan struct{}),
method: callHdr.Method,
sendCompress: callHdr.SendCompress,
@@ -385,23 +385,23 @@
}
func (t *http2Client) getPeer() *peer.Peer {
- pr := &peer.Peer{
- Addr: t.remoteAddr,
+ return &peer.Peer{
+ Addr: t.remoteAddr,
+ AuthInfo: t.authInfo,
}
- // Attach Auth info if there is any.
- if t.authInfo != nil {
- pr.AuthInfo = t.authInfo
- }
- return pr
}
func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
aud := t.createAudience(callHdr)
- authData, err := t.getTrAuthData(ctx, aud)
+ ri := credentials.RequestInfo{
+ Method: callHdr.Method,
+ }
+ ctxWithRequestInfo := internal.NewRequestInfoContext.(func(context.Context, credentials.RequestInfo) context.Context)(ctx, ri)
+ authData, err := t.getTrAuthData(ctxWithRequestInfo, aud)
if err != nil {
return nil, err
}
- callAuthData, err := t.getCallAuthData(ctx, aud, callHdr)
+ callAuthData, err := t.getCallAuthData(ctxWithRequestInfo, aud, callHdr)
if err != nil {
return nil, err
}
@@ -1191,6 +1191,7 @@
// If headerChan hasn't been closed yet
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
+ s.headerValid = true
if !endStream {
// HEADERS frame block carries a Response-Headers.
isHeader = true
@@ -1233,7 +1234,7 @@
}
t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
if t.keepaliveEnabled {
- atomic.CompareAndSwapUint32(&t.activity, 0, 1)
+ atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
}
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
@@ -1248,7 +1249,7 @@
t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame()
if t.keepaliveEnabled {
- atomic.CompareAndSwapUint32(&t.activity, 0, 1)
+ atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
}
if err != nil {
// Abort an active stream if the http2.Framer returns a
@@ -1292,17 +1293,41 @@
}
}
+func minTime(a, b time.Duration) time.Duration {
+ if a < b {
+ return a
+ }
+ return b
+}
+
// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
func (t *http2Client) keepalive() {
p := &ping{data: [8]byte{}}
+ // True iff a ping has been sent, and no data has been received since then.
+ outstandingPing := false
+ // Amount of time remaining before which we should receive an ACK for the
+ // last sent ping.
+ timeoutLeft := time.Duration(0)
+ // Records the last value of t.lastRead before we go block on the timer.
+ // This is required to check for read activity since then.
+ prevNano := time.Now().UnixNano()
timer := time.NewTimer(t.kp.Time)
for {
select {
case <-timer.C:
- if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
- timer.Reset(t.kp.Time)
+ lastRead := atomic.LoadInt64(&t.lastRead)
+ if lastRead > prevNano {
+ // There has been read activity since the last time we were here.
+ outstandingPing = false
+ // Next timer should fire at kp.Time seconds from lastRead time.
+ timer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
+ prevNano = lastRead
continue
}
+ if outstandingPing && timeoutLeft <= 0 {
+ t.Close()
+ return
+ }
t.mu.Lock()
if t.state == closing {
// If the transport is closing, we should exit from the
@@ -1315,36 +1340,37 @@
return
}
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
+ // If a ping was sent out previously (because there were active
+ // streams at that point) which wasn't acked and its timeout
+ // hadn't fired, but we got here and are about to go dormant,
+ // we should make sure that we unconditionally send a ping once
+ // we awaken.
+ outstandingPing = false
t.kpDormant = true
t.kpDormancyCond.Wait()
}
t.kpDormant = false
t.mu.Unlock()
- if channelz.IsOn() {
- atomic.AddInt64(&t.czData.kpCount, 1)
- }
// We get here either because we were dormant and a new stream was
// created which unblocked the Wait() call, or because the
// keepalive timer expired. In both cases, we need to send a ping.
- t.controlBuf.put(p)
-
- timer.Reset(t.kp.Timeout)
- select {
- case <-timer.C:
- if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
- timer.Reset(t.kp.Time)
- continue
+ if !outstandingPing {
+ if channelz.IsOn() {
+ atomic.AddInt64(&t.czData.kpCount, 1)
}
- infof("transport: closing client transport due to idleness.")
- t.Close()
- return
- case <-t.ctx.Done():
- if !timer.Stop() {
- <-timer.C
- }
- return
+ t.controlBuf.put(p)
+ timeoutLeft = t.kp.Timeout
+ outstandingPing = true
}
+ // The amount of time to sleep here is the minimum of kp.Time and
+ // timeoutLeft. This will ensure that we wait only for kp.Time
+ // before sending out the next ping (for cases where the ping is
+ // acked).
+ sleepDuration := minTime(t.kp.Time, timeoutLeft)
+ timeoutLeft -= sleepDuration
+ prevNano = lastRead
+ timer.Reset(sleepDuration)
case <-t.ctx.Done():
if !timer.Stop() {
<-timer.C
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
index 33686a1..0760383 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
@@ -174,6 +174,12 @@
Val: *config.MaxHeaderListSize,
})
}
+ if config.HeaderTableSize != nil {
+ isettings = append(isettings, http2.Setting{
+ ID: http2.SettingHeaderTableSize,
+ Val: *config.HeaderTableSize,
+ })
+ }
if err := framer.fr.WriteSettings(isettings...); err != nil {
return nil, connectionErrorf(false, err, "transport: %v", err)
}
@@ -751,7 +757,7 @@
return true
}
-// WriteHeader sends the header metedata md back to the client.
+// WriteHeader sends the header metadata md back to the client.
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
if s.updateHeaderSent() || s.getState() == streamDone {
return ErrIllegalHeaderWrite
diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go
index 1c1d106..bfab940 100644
--- a/vendor/google.golang.org/grpc/internal/transport/transport.go
+++ b/vendor/google.golang.org/grpc/internal/transport/transport.go
@@ -233,6 +233,7 @@
type Stream struct {
id uint32
st ServerTransport // nil for client side Stream
+ ct *http2Client // nil for server side Stream
ctx context.Context // the associated context of the stream
cancel context.CancelFunc // always nil for client side Stream
done chan struct{} // closed at the end of stream to unblock writers. On the client side.
@@ -251,6 +252,10 @@
headerChan chan struct{} // closed to indicate the end of header metadata.
headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
+ // headerValid indicates whether a valid header was received. Only
+ // meaningful after headerChan is closed (always call waitOnHeader() before
+ // reading its value). Not valid on server side.
+ headerValid bool
// hdrMu protects header and trailer metadata on the server-side.
hdrMu sync.Mutex
@@ -303,34 +308,28 @@
return streamState(atomic.LoadUint32((*uint32)(&s.state)))
}
-func (s *Stream) waitOnHeader() error {
+func (s *Stream) waitOnHeader() {
if s.headerChan == nil {
// On the server headerChan is always nil since a stream originates
// only after having received headers.
- return nil
+ return
}
select {
case <-s.ctx.Done():
- // We prefer success over failure when reading messages because we delay
- // context error in stream.Read(). To keep behavior consistent, we also
- // prefer success here.
- select {
- case <-s.headerChan:
- return nil
- default:
- }
- return ContextErr(s.ctx.Err())
+ // Close the stream to prevent headers/trailers from changing after
+ // this function returns.
+ s.ct.CloseStream(s, ContextErr(s.ctx.Err()))
+ // headerChan could possibly not be closed yet if closeStream raced
+ // with operateHeaders; wait until it is closed explicitly here.
+ <-s.headerChan
case <-s.headerChan:
- return nil
}
}
// RecvCompress returns the compression algorithm applied to the inbound
// message. It is empty string if there is no compression applied.
func (s *Stream) RecvCompress() string {
- if err := s.waitOnHeader(); err != nil {
- return ""
- }
+ s.waitOnHeader()
return s.recvCompress
}
@@ -351,36 +350,27 @@
// available. It blocks until i) the metadata is ready or ii) there is no header
// metadata or iii) the stream is canceled/expired.
//
-// On server side, it returns the out header after t.WriteHeader is called.
+// On server side, it returns the out header after t.WriteHeader is called. It
+// does not block and must not be called until after WriteHeader.
func (s *Stream) Header() (metadata.MD, error) {
- if s.headerChan == nil && s.header != nil {
+ if s.headerChan == nil {
// On server side, return the header in stream. It will be the out
// header after t.WriteHeader is called.
return s.header.Copy(), nil
}
- err := s.waitOnHeader()
- // Even if the stream is closed, header is returned if available.
- select {
- case <-s.headerChan:
- if s.header == nil {
- return nil, nil
- }
- return s.header.Copy(), nil
- default:
+ s.waitOnHeader()
+ if !s.headerValid {
+ return nil, s.status.Err()
}
- return nil, err
+ return s.header.Copy(), nil
}
// TrailersOnly blocks until a header or trailers-only frame is received and
// then returns true if the stream was trailers-only. If the stream ends
-// before headers are received, returns true, nil. If a context error happens
-// first, returns it as a status error. Client-side only.
-func (s *Stream) TrailersOnly() (bool, error) {
- err := s.waitOnHeader()
- if err != nil {
- return false, err
- }
- return s.noHeaders, nil
+// before headers are received, returns true, nil. Client-side only.
+func (s *Stream) TrailersOnly() bool {
+ s.waitOnHeader()
+ return s.noHeaders
}
// Trailer returns the cached trailer metedata. Note that if it is not called
@@ -534,6 +524,7 @@
ReadBufferSize int
ChannelzParentID int64
MaxHeaderListSize *uint32
+ HeaderTableSize *uint32
}
// NewServerTransport creates a ServerTransport with conn or non-nil error