interim commit go fmt stuff
Change-Id: I617a7d771b50c2b7999eabbbebbabef4e68d8713
diff --git a/vendor/google.golang.org/grpc/internal/backoff/backoff.go b/vendor/google.golang.org/grpc/internal/backoff/backoff.go
index 1bd0cce..5fc0ee3 100644
--- a/vendor/google.golang.org/grpc/internal/backoff/backoff.go
+++ b/vendor/google.golang.org/grpc/internal/backoff/backoff.go
@@ -25,44 +25,39 @@
import (
"time"
+ grpcbackoff "google.golang.org/grpc/backoff"
"google.golang.org/grpc/internal/grpcrand"
)
// Strategy defines the methodology for backing off after a grpc connection
// failure.
-//
type Strategy interface {
// Backoff returns the amount of time to wait before the next retry given
// the number of consecutive failures.
Backoff(retries int) time.Duration
}
-const (
- // baseDelay is the amount of time to wait before retrying after the first
- // failure.
- baseDelay = 1.0 * time.Second
- // factor is applied to the backoff after each retry.
- factor = 1.6
- // jitter provides a range to randomize backoff delays.
- jitter = 0.2
-)
+// DefaultExponential is an exponential backoff implementation using the
+// default values for all the configurable knobs defined in
+// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md.
+var DefaultExponential = Exponential{Config: grpcbackoff.DefaultConfig}
// Exponential implements exponential backoff algorithm as defined in
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md.
type Exponential struct {
- // MaxDelay is the upper bound of backoff delay.
- MaxDelay time.Duration
+ // Config contains all options to configure the backoff algorithm.
+ Config grpcbackoff.Config
}
// Backoff returns the amount of time to wait before the next retry given the
// number of retries.
func (bc Exponential) Backoff(retries int) time.Duration {
if retries == 0 {
- return baseDelay
+ return bc.Config.BaseDelay
}
- backoff, max := float64(baseDelay), float64(bc.MaxDelay)
+ backoff, max := float64(bc.Config.BaseDelay), float64(bc.Config.MaxDelay)
for backoff < max && retries > 0 {
- backoff *= factor
+ backoff *= bc.Config.Multiplier
retries--
}
if backoff > max {
@@ -70,7 +65,7 @@
}
// Randomize backoff delays so that if a cluster of requests start at
// the same time, they won't operate in lockstep.
- backoff *= 1 + jitter*(grpcrand.Float64()*2-1)
+ backoff *= 1 + bc.Config.Jitter*(grpcrand.Float64()*2-1)
if backoff < 0 {
return 0
}
diff --git a/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go b/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go
index fee6aec..4062c02 100644
--- a/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go
+++ b/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go
@@ -34,7 +34,7 @@
}
// binLogger is the global binary logger for the binary. One of this should be
-// built at init time from the configuration (environment varialbe or flags).
+// built at init time from the configuration (environment variable or flags).
//
// It is used to get a methodLogger for each individual method.
var binLogger Logger
diff --git a/vendor/google.golang.org/grpc/internal/binarylog/env_config.go b/vendor/google.golang.org/grpc/internal/binarylog/env_config.go
index 4cc2525..be30d0e 100644
--- a/vendor/google.golang.org/grpc/internal/binarylog/env_config.go
+++ b/vendor/google.golang.org/grpc/internal/binarylog/env_config.go
@@ -43,7 +43,7 @@
// Foo.
//
// If two configs exist for one certain method or service, the one specified
-// later overrides the privous config.
+// later overrides the previous config.
func NewLoggerFromConfigString(s string) Logger {
if s == "" {
return nil
@@ -74,7 +74,7 @@
return fmt.Errorf("invalid config: %q, %v", config, err)
}
if m == "*" {
- return fmt.Errorf("invalid config: %q, %v", config, "* not allowd in blacklist config")
+ return fmt.Errorf("invalid config: %q, %v", config, "* not allowed in blacklist config")
}
if suffix != "" {
return fmt.Errorf("invalid config: %q, %v", config, "header/message limit not allowed in blacklist config")
diff --git a/vendor/google.golang.org/grpc/internal/binarylog/regenerate.sh b/vendor/google.golang.org/grpc/internal/binarylog/regenerate.sh
old mode 100755
new mode 100644
diff --git a/vendor/google.golang.org/grpc/internal/binarylog/sink.go b/vendor/google.golang.org/grpc/internal/binarylog/sink.go
index 20d044f..a2e7c34 100644
--- a/vendor/google.golang.org/grpc/internal/binarylog/sink.go
+++ b/vendor/google.golang.org/grpc/internal/binarylog/sink.go
@@ -63,7 +63,7 @@
// newWriterSink creates a binary log sink with the given writer.
//
-// Write() marshalls the proto message and writes it to the given writer. Each
+// Write() marshals the proto message and writes it to the given writer. Each
// message is prefixed with a 4 byte big endian unsigned integer as the length.
//
// No buffer is done, Close() doesn't try to close the writer.
diff --git a/vendor/google.golang.org/grpc/internal/buffer/unbounded.go b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go
new file mode 100644
index 0000000..2cb3109
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2019 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package buffer provides an implementation of an unbounded buffer.
+package buffer
+
+import "sync"
+
+// Unbounded is an implementation of an unbounded buffer which does not use
+// extra goroutines. This is typically used for passing updates from one entity
+// to another within gRPC.
+//
+// All methods on this type are thread-safe and don't block on anything except
+// the underlying mutex used for synchronization.
+type Unbounded struct {
+ c chan interface{}
+ mu sync.Mutex
+ backlog []interface{}
+}
+
+// NewUnbounded returns a new instance of Unbounded.
+func NewUnbounded() *Unbounded {
+ return &Unbounded{c: make(chan interface{}, 1)}
+}
+
+// Put adds t to the unbounded buffer.
+func (b *Unbounded) Put(t interface{}) {
+ b.mu.Lock()
+ if len(b.backlog) == 0 {
+ select {
+ case b.c <- t:
+ b.mu.Unlock()
+ return
+ default:
+ }
+ }
+ b.backlog = append(b.backlog, t)
+ b.mu.Unlock()
+}
+
+// Load sends the earliest buffered data, if any, onto the read channel
+// returned by Get(). Users are expected to call this every time they read a
+// value from the read channel.
+func (b *Unbounded) Load() {
+ b.mu.Lock()
+ if len(b.backlog) > 0 {
+ select {
+ case b.c <- b.backlog[0]:
+ b.backlog[0] = nil
+ b.backlog = b.backlog[1:]
+ default:
+ }
+ }
+ b.mu.Unlock()
+}
+
+// Get returns a read channel on which values added to the buffer, via Put(),
+// are sent on.
+//
+// Upon reading a value from this channel, users are expected to call Load() to
+// send the next buffered value onto the channel if there is any.
+func (b *Unbounded) Get() <-chan interface{} {
+ return b.c
+}
diff --git a/vendor/google.golang.org/grpc/internal/internal.go b/vendor/google.golang.org/grpc/internal/internal.go
index bc1f99a..b96b359 100644
--- a/vendor/google.golang.org/grpc/internal/internal.go
+++ b/vendor/google.golang.org/grpc/internal/internal.go
@@ -28,9 +28,9 @@
)
var (
- // WithResolverBuilder is exported by dialoptions.go
+ // WithResolverBuilder is set by dialoptions.go
WithResolverBuilder interface{} // func (resolver.Builder) grpc.DialOption
- // WithHealthCheckFunc is not exported by dialoptions.go
+ // WithHealthCheckFunc is set by dialoptions.go
WithHealthCheckFunc interface{} // func (HealthChecker) DialOption
// HealthCheckFunc is used to provide client-side LB channel health checking
HealthCheckFunc HealthChecker
@@ -39,14 +39,17 @@
// KeepaliveMinPingTime is the minimum ping interval. This must be 10s by
// default, but tests may wish to set it lower for convenience.
KeepaliveMinPingTime = 10 * time.Second
- // ParseServiceConfig is a function to parse JSON service configs into
- // opaque data structures.
- ParseServiceConfig func(sc string) (interface{}, error)
// StatusRawProto is exported by status/status.go. This func returns a
// pointer to the wrapped Status proto for a given status.Status without a
// call to proto.Clone(). The returned Status proto should not be mutated by
// the caller.
StatusRawProto interface{} // func (*status.Status) *spb.Status
+ // NewRequestInfoContext creates a new context based on the argument context attaching
+ // the passed in RequestInfo to the new context.
+ NewRequestInfoContext interface{} // func(context.Context, credentials.RequestInfo) context.Context
+ // ParseServiceConfigForTesting is for creating a fake
+ // ClientConn for resolver testing only
+ ParseServiceConfigForTesting interface{} // func(string) *serviceconfig.ParseResult
)
// HealthChecker defines the signature of the client-side LB channel health checking function.
diff --git a/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
new file mode 100644
index 0000000..abc0f92
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
@@ -0,0 +1,460 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package dns implements a dns resolver to be installed as the default resolver
+// in grpc.
+package dns
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "net"
+ "os"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "google.golang.org/grpc/backoff"
+ "google.golang.org/grpc/grpclog"
+ internalbackoff "google.golang.org/grpc/internal/backoff"
+ "google.golang.org/grpc/internal/grpcrand"
+ "google.golang.org/grpc/resolver"
+)
+
+func init() {
+ resolver.Register(NewBuilder())
+}
+
+const (
+ defaultPort = "443"
+ defaultFreq = time.Minute * 30
+ defaultDNSSvrPort = "53"
+ golang = "GO"
+ // txtPrefix is the prefix string to be prepended to the host name for txt record lookup.
+ txtPrefix = "_grpc_config."
+ // In DNS, service config is encoded in a TXT record via the mechanism
+ // described in RFC-1464 using the attribute name grpc_config.
+ txtAttribute = "grpc_config="
+)
+
+var (
+ errMissingAddr = errors.New("dns resolver: missing address")
+
+ // Addresses ending with a colon that is supposed to be the separator
+ // between host and port is not allowed. E.g. "::" is a valid address as
+ // it is an IPv6 address (host only) and "[::]:" is invalid as it ends with
+ // a colon as the host and port separator
+ errEndsWithColon = errors.New("dns resolver: missing port after port-separator colon")
+)
+
+var (
+ defaultResolver netResolver = net.DefaultResolver
+ // To prevent excessive re-resolution, we enforce a rate limit on DNS
+ // resolution requests.
+ minDNSResRate = 30 * time.Second
+)
+
+var customAuthorityDialler = func(authority string) func(ctx context.Context, network, address string) (net.Conn, error) {
+ return func(ctx context.Context, network, address string) (net.Conn, error) {
+ var dialer net.Dialer
+ return dialer.DialContext(ctx, network, authority)
+ }
+}
+
+var customAuthorityResolver = func(authority string) (netResolver, error) {
+ host, port, err := parseTarget(authority, defaultDNSSvrPort)
+ if err != nil {
+ return nil, err
+ }
+
+ authorityWithPort := net.JoinHostPort(host, port)
+
+ return &net.Resolver{
+ PreferGo: true,
+ Dial: customAuthorityDialler(authorityWithPort),
+ }, nil
+}
+
+// NewBuilder creates a dnsBuilder which is used to factory DNS resolvers.
+func NewBuilder() resolver.Builder {
+ return &dnsBuilder{minFreq: defaultFreq}
+}
+
+type dnsBuilder struct {
+ // minimum frequency of polling the DNS server.
+ minFreq time.Duration
+}
+
+// Build creates and starts a DNS resolver that watches the name resolution of the target.
+func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
+ host, port, err := parseTarget(target.Endpoint, defaultPort)
+ if err != nil {
+ return nil, err
+ }
+
+ // IP address.
+ if net.ParseIP(host) != nil {
+ host, _ = formatIP(host)
+ addr := []resolver.Address{{Addr: host + ":" + port}}
+ i := &ipResolver{
+ cc: cc,
+ ip: addr,
+ rn: make(chan struct{}, 1),
+ q: make(chan struct{}),
+ }
+ cc.NewAddress(addr)
+ go i.watcher()
+ return i, nil
+ }
+
+ // DNS address (non-IP).
+ ctx, cancel := context.WithCancel(context.Background())
+ bc := backoff.DefaultConfig
+ bc.MaxDelay = b.minFreq
+ d := &dnsResolver{
+ freq: b.minFreq,
+ backoff: internalbackoff.Exponential{Config: bc},
+ host: host,
+ port: port,
+ ctx: ctx,
+ cancel: cancel,
+ cc: cc,
+ t: time.NewTimer(0),
+ rn: make(chan struct{}, 1),
+ disableServiceConfig: opts.DisableServiceConfig,
+ }
+
+ if target.Authority == "" {
+ d.resolver = defaultResolver
+ } else {
+ d.resolver, err = customAuthorityResolver(target.Authority)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ d.wg.Add(1)
+ go d.watcher()
+ return d, nil
+}
+
+// Scheme returns the naming scheme of this resolver builder, which is "dns".
+func (b *dnsBuilder) Scheme() string {
+ return "dns"
+}
+
+type netResolver interface {
+ LookupHost(ctx context.Context, host string) (addrs []string, err error)
+ LookupSRV(ctx context.Context, service, proto, name string) (cname string, addrs []*net.SRV, err error)
+ LookupTXT(ctx context.Context, name string) (txts []string, err error)
+}
+
+// ipResolver watches for the name resolution update for an IP address.
+type ipResolver struct {
+ cc resolver.ClientConn
+ ip []resolver.Address
+ // rn channel is used by ResolveNow() to force an immediate resolution of the target.
+ rn chan struct{}
+ q chan struct{}
+}
+
+// ResolveNow resend the address it stores, no resolution is needed.
+func (i *ipResolver) ResolveNow(opt resolver.ResolveNowOption) {
+ select {
+ case i.rn <- struct{}{}:
+ default:
+ }
+}
+
+// Close closes the ipResolver.
+func (i *ipResolver) Close() {
+ close(i.q)
+}
+
+func (i *ipResolver) watcher() {
+ for {
+ select {
+ case <-i.rn:
+ i.cc.NewAddress(i.ip)
+ case <-i.q:
+ return
+ }
+ }
+}
+
+// dnsResolver watches for the name resolution update for a non-IP target.
+type dnsResolver struct {
+ freq time.Duration
+ backoff internalbackoff.Exponential
+ retryCount int
+ host string
+ port string
+ resolver netResolver
+ ctx context.Context
+ cancel context.CancelFunc
+ cc resolver.ClientConn
+ // rn channel is used by ResolveNow() to force an immediate resolution of the target.
+ rn chan struct{}
+ t *time.Timer
+ // wg is used to enforce Close() to return after the watcher() goroutine has finished.
+ // Otherwise, data race will be possible. [Race Example] in dns_resolver_test we
+ // replace the real lookup functions with mocked ones to facilitate testing.
+ // If Close() doesn't wait for watcher() goroutine finishes, race detector sometimes
+ // will warns lookup (READ the lookup function pointers) inside watcher() goroutine
+ // has data race with replaceNetFunc (WRITE the lookup function pointers).
+ wg sync.WaitGroup
+ disableServiceConfig bool
+}
+
+// ResolveNow invoke an immediate resolution of the target that this dnsResolver watches.
+func (d *dnsResolver) ResolveNow(opt resolver.ResolveNowOption) {
+ select {
+ case d.rn <- struct{}{}:
+ default:
+ }
+}
+
+// Close closes the dnsResolver.
+func (d *dnsResolver) Close() {
+ d.cancel()
+ d.wg.Wait()
+ d.t.Stop()
+}
+
+func (d *dnsResolver) watcher() {
+ defer d.wg.Done()
+ for {
+ select {
+ case <-d.ctx.Done():
+ return
+ case <-d.t.C:
+ case <-d.rn:
+ if !d.t.Stop() {
+ // Before resetting a timer, it should be stopped to prevent racing with
+ // reads on it's channel.
+ <-d.t.C
+ }
+ }
+
+ result, sc := d.lookup()
+ // Next lookup should happen within an interval defined by d.freq. It may be
+ // more often due to exponential retry on empty address list.
+ if len(result) == 0 {
+ d.retryCount++
+ d.t.Reset(d.backoff.Backoff(d.retryCount))
+ } else {
+ d.retryCount = 0
+ d.t.Reset(d.freq)
+ }
+ d.cc.NewServiceConfig(sc)
+ d.cc.NewAddress(result)
+
+ // Sleep to prevent excessive re-resolutions. Incoming resolution requests
+ // will be queued in d.rn.
+ t := time.NewTimer(minDNSResRate)
+ select {
+ case <-t.C:
+ case <-d.ctx.Done():
+ t.Stop()
+ return
+ }
+ }
+}
+
+func (d *dnsResolver) lookupSRV() []resolver.Address {
+ var newAddrs []resolver.Address
+ _, srvs, err := d.resolver.LookupSRV(d.ctx, "grpclb", "tcp", d.host)
+ if err != nil {
+ grpclog.Infof("grpc: failed dns SRV record lookup due to %v.\n", err)
+ return nil
+ }
+ for _, s := range srvs {
+ lbAddrs, err := d.resolver.LookupHost(d.ctx, s.Target)
+ if err != nil {
+ grpclog.Infof("grpc: failed load balancer address dns lookup due to %v.\n", err)
+ continue
+ }
+ for _, a := range lbAddrs {
+ a, ok := formatIP(a)
+ if !ok {
+ grpclog.Errorf("grpc: failed IP parsing due to %v.\n", err)
+ continue
+ }
+ addr := a + ":" + strconv.Itoa(int(s.Port))
+ newAddrs = append(newAddrs, resolver.Address{Addr: addr, Type: resolver.GRPCLB, ServerName: s.Target})
+ }
+ }
+ return newAddrs
+}
+
+func (d *dnsResolver) lookupTXT() string {
+ ss, err := d.resolver.LookupTXT(d.ctx, txtPrefix+d.host)
+ if err != nil {
+ grpclog.Infof("grpc: failed dns TXT record lookup due to %v.\n", err)
+ return ""
+ }
+ var res string
+ for _, s := range ss {
+ res += s
+ }
+
+ // TXT record must have "grpc_config=" attribute in order to be used as service config.
+ if !strings.HasPrefix(res, txtAttribute) {
+ grpclog.Warningf("grpc: TXT record %v missing %v attribute", res, txtAttribute)
+ return ""
+ }
+ return strings.TrimPrefix(res, txtAttribute)
+}
+
+func (d *dnsResolver) lookupHost() []resolver.Address {
+ var newAddrs []resolver.Address
+ addrs, err := d.resolver.LookupHost(d.ctx, d.host)
+ if err != nil {
+ grpclog.Warningf("grpc: failed dns A record lookup due to %v.\n", err)
+ return nil
+ }
+ for _, a := range addrs {
+ a, ok := formatIP(a)
+ if !ok {
+ grpclog.Errorf("grpc: failed IP parsing due to %v.\n", err)
+ continue
+ }
+ addr := a + ":" + d.port
+ newAddrs = append(newAddrs, resolver.Address{Addr: addr})
+ }
+ return newAddrs
+}
+
+func (d *dnsResolver) lookup() ([]resolver.Address, string) {
+ newAddrs := d.lookupSRV()
+ // Support fallback to non-balancer address.
+ newAddrs = append(newAddrs, d.lookupHost()...)
+ if d.disableServiceConfig {
+ return newAddrs, ""
+ }
+ sc := d.lookupTXT()
+ return newAddrs, canaryingSC(sc)
+}
+
+// formatIP returns ok = false if addr is not a valid textual representation of an IP address.
+// If addr is an IPv4 address, return the addr and ok = true.
+// If addr is an IPv6 address, return the addr enclosed in square brackets and ok = true.
+func formatIP(addr string) (addrIP string, ok bool) {
+ ip := net.ParseIP(addr)
+ if ip == nil {
+ return "", false
+ }
+ if ip.To4() != nil {
+ return addr, true
+ }
+ return "[" + addr + "]", true
+}
+
+// parseTarget takes the user input target string and default port, returns formatted host and port info.
+// If target doesn't specify a port, set the port to be the defaultPort.
+// If target is in IPv6 format and host-name is enclosed in square brackets, brackets
+// are stripped when setting the host.
+// examples:
+// target: "www.google.com" defaultPort: "443" returns host: "www.google.com", port: "443"
+// target: "ipv4-host:80" defaultPort: "443" returns host: "ipv4-host", port: "80"
+// target: "[ipv6-host]" defaultPort: "443" returns host: "ipv6-host", port: "443"
+// target: ":80" defaultPort: "443" returns host: "localhost", port: "80"
+func parseTarget(target, defaultPort string) (host, port string, err error) {
+ if target == "" {
+ return "", "", errMissingAddr
+ }
+ if ip := net.ParseIP(target); ip != nil {
+ // target is an IPv4 or IPv6(without brackets) address
+ return target, defaultPort, nil
+ }
+ if host, port, err = net.SplitHostPort(target); err == nil {
+ if port == "" {
+ // If the port field is empty (target ends with colon), e.g. "[::1]:", this is an error.
+ return "", "", errEndsWithColon
+ }
+ // target has port, i.e ipv4-host:port, [ipv6-host]:port, host-name:port
+ if host == "" {
+ // Keep consistent with net.Dial(): If the host is empty, as in ":80", the local system is assumed.
+ host = "localhost"
+ }
+ return host, port, nil
+ }
+ if host, port, err = net.SplitHostPort(target + ":" + defaultPort); err == nil {
+ // target doesn't have port
+ return host, port, nil
+ }
+ return "", "", fmt.Errorf("invalid target address %v, error info: %v", target, err)
+}
+
+type rawChoice struct {
+ ClientLanguage *[]string `json:"clientLanguage,omitempty"`
+ Percentage *int `json:"percentage,omitempty"`
+ ClientHostName *[]string `json:"clientHostName,omitempty"`
+ ServiceConfig *json.RawMessage `json:"serviceConfig,omitempty"`
+}
+
+func containsString(a *[]string, b string) bool {
+ if a == nil {
+ return true
+ }
+ for _, c := range *a {
+ if c == b {
+ return true
+ }
+ }
+ return false
+}
+
+func chosenByPercentage(a *int) bool {
+ if a == nil {
+ return true
+ }
+ return grpcrand.Intn(100)+1 <= *a
+}
+
+func canaryingSC(js string) string {
+ if js == "" {
+ return ""
+ }
+ var rcs []rawChoice
+ err := json.Unmarshal([]byte(js), &rcs)
+ if err != nil {
+ grpclog.Warningf("grpc: failed to parse service config json string due to %v.\n", err)
+ return ""
+ }
+ cliHostname, err := os.Hostname()
+ if err != nil {
+ grpclog.Warningf("grpc: failed to get client hostname due to %v.\n", err)
+ return ""
+ }
+ var sc string
+ for _, c := range rcs {
+ if !containsString(c.ClientLanguage, golang) ||
+ !chosenByPercentage(c.Percentage) ||
+ !containsString(c.ClientHostName, cliHostname) ||
+ c.ServiceConfig == nil {
+ continue
+ }
+ sc = string(*c.ServiceConfig)
+ break
+ }
+ return sc
+}
diff --git a/vendor/google.golang.org/grpc/internal/resolver/passthrough/passthrough.go b/vendor/google.golang.org/grpc/internal/resolver/passthrough/passthrough.go
new file mode 100644
index 0000000..893d5d1
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/resolver/passthrough/passthrough.go
@@ -0,0 +1,57 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package passthrough implements a pass-through resolver. It sends the target
+// name without scheme back to gRPC as resolved address.
+package passthrough
+
+import "google.golang.org/grpc/resolver"
+
+const scheme = "passthrough"
+
+type passthroughBuilder struct{}
+
+func (*passthroughBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
+ r := &passthroughResolver{
+ target: target,
+ cc: cc,
+ }
+ r.start()
+ return r, nil
+}
+
+func (*passthroughBuilder) Scheme() string {
+ return scheme
+}
+
+type passthroughResolver struct {
+ target resolver.Target
+ cc resolver.ClientConn
+}
+
+func (r *passthroughResolver) start() {
+ r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: r.target.Endpoint}}})
+}
+
+func (*passthroughResolver) ResolveNow(o resolver.ResolveNowOption) {}
+
+func (*passthroughResolver) Close() {}
+
+func init() {
+ resolver.Register(&passthroughBuilder{})
+}
diff --git a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
index b8e0aa4..ddee20b 100644
--- a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
+++ b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
@@ -107,8 +107,8 @@
type headerFrame struct {
streamID uint32
hf []hpack.HeaderField
- endStream bool // Valid on server side.
- initStream func(uint32) (bool, error) // Used only on the client side.
+ endStream bool // Valid on server side.
+ initStream func(uint32) error // Used only on the client side.
onWrite func()
wq *writeQuota // write quota for the stream created.
cleanup *cleanupStream // Valid on the server side.
@@ -637,21 +637,17 @@
func (l *loopyWriter) originateStream(str *outStream) error {
hdr := str.itl.dequeue().(*headerFrame)
- sendPing, err := hdr.initStream(str.id)
- if err != nil {
+ if err := hdr.initStream(str.id); err != nil {
if err == ErrConnClosing {
return err
}
// Other errors(errStreamDrain) need not close transport.
return nil
}
- if err = l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
+ if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
return err
}
l.estdStreams[str.id] = str
- if sendPing {
- return l.pingHandler(&ping{data: [8]byte{}})
- }
return nil
}
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
index 41a79c5..294661a 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
@@ -35,6 +35,7 @@
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/syscall"
"google.golang.org/grpc/keepalive"
@@ -46,6 +47,7 @@
// http2Client implements the ClientTransport interface with HTTP2.
type http2Client struct {
+ lastRead int64 // keep this field 64-bit aligned
ctx context.Context
cancel context.CancelFunc
ctxDone <-chan struct{} // Cache the ctx.Done() chan.
@@ -62,8 +64,6 @@
// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
// that the server sent GoAway on this transport.
goAway chan struct{}
- // awakenKeepalive is used to wake up keepalive when after it has gone dormant.
- awakenKeepalive chan struct{}
framer *framer
// controlBuf delivers all the control related tasks (e.g., window
@@ -77,9 +77,6 @@
perRPCCreds []credentials.PerRPCCredentials
- // Boolean to keep track of reading activity on transport.
- // 1 is true and 0 is false.
- activity uint32 // Accessed atomically.
kp keepalive.ClientParameters
keepaliveEnabled bool
@@ -110,6 +107,16 @@
// goAwayReason records the http2.ErrCode and debug data received with the
// GoAway frame.
goAwayReason GoAwayReason
+ // A condition variable used to signal when the keepalive goroutine should
+ // go dormant. The condition for dormancy is based on the number of active
+ // streams and the `PermitWithoutStream` keepalive client parameter. And
+ // since the number of active streams is guarded by the above mutex, we use
+ // the same for this condition variable as well.
+ kpDormancyCond *sync.Cond
+ // A boolean to track whether the keepalive goroutine is dormant or not.
+ // This is checked before attempting to signal the above condition
+ // variable.
+ kpDormant bool
// Fields below are for channelz metric collection.
channelzID int64 // channelz unique identification number
@@ -232,7 +239,6 @@
readerDone: make(chan struct{}),
writerDone: make(chan struct{}),
goAway: make(chan struct{}),
- awakenKeepalive: make(chan struct{}, 1),
framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
fc: &trInFlow{limit: uint32(icwz)},
scheme: scheme,
@@ -264,9 +270,6 @@
updateFlowControl: t.updateFlowControl,
}
}
- // Make sure awakenKeepalive can't be written upon.
- // keepalive routine will make it writable, if need be.
- t.awakenKeepalive <- struct{}{}
if t.statsHandler != nil {
t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
@@ -281,6 +284,7 @@
t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
}
if t.keepaliveEnabled {
+ t.kpDormancyCond = sync.NewCond(&t.mu)
go t.keepalive()
}
// Start the reader goroutine for incoming message. Each transport has
@@ -347,6 +351,7 @@
func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
// TODO(zhaoq): Handle uint32 overflow of Stream.id.
s := &Stream{
+ ct: t,
done: make(chan struct{}),
method: callHdr.Method,
sendCompress: callHdr.SendCompress,
@@ -380,23 +385,23 @@
}
func (t *http2Client) getPeer() *peer.Peer {
- pr := &peer.Peer{
- Addr: t.remoteAddr,
+ return &peer.Peer{
+ Addr: t.remoteAddr,
+ AuthInfo: t.authInfo,
}
- // Attach Auth info if there is any.
- if t.authInfo != nil {
- pr.AuthInfo = t.authInfo
- }
- return pr
}
func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
aud := t.createAudience(callHdr)
- authData, err := t.getTrAuthData(ctx, aud)
+ ri := credentials.RequestInfo{
+ Method: callHdr.Method,
+ }
+ ctxWithRequestInfo := internal.NewRequestInfoContext.(func(context.Context, credentials.RequestInfo) context.Context)(ctx, ri)
+ authData, err := t.getTrAuthData(ctxWithRequestInfo, aud)
if err != nil {
return nil, err
}
- callAuthData, err := t.getCallAuthData(ctx, aud, callHdr)
+ callAuthData, err := t.getCallAuthData(ctxWithRequestInfo, aud, callHdr)
if err != nil {
return nil, err
}
@@ -564,7 +569,7 @@
hdr := &headerFrame{
hf: headerFields,
endStream: false,
- initStream: func(id uint32) (bool, error) {
+ initStream: func(id uint32) error {
t.mu.Lock()
if state := t.state; state != reachable {
t.mu.Unlock()
@@ -574,29 +579,19 @@
err = ErrConnClosing
}
cleanup(err)
- return false, err
+ return err
}
t.activeStreams[id] = s
if channelz.IsOn() {
atomic.AddInt64(&t.czData.streamsStarted, 1)
atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
}
- var sendPing bool
- // If the number of active streams change from 0 to 1, then check if keepalive
- // has gone dormant. If so, wake it up.
- if len(t.activeStreams) == 1 && t.keepaliveEnabled {
- select {
- case t.awakenKeepalive <- struct{}{}:
- sendPing = true
- // Fill the awakenKeepalive channel again as this channel must be
- // kept non-writable except at the point that the keepalive()
- // goroutine is waiting either to be awaken or shutdown.
- t.awakenKeepalive <- struct{}{}
- default:
- }
+ // If the keepalive goroutine has gone dormant, wake it up.
+ if t.kpDormant {
+ t.kpDormancyCond.Signal()
}
t.mu.Unlock()
- return sendPing, nil
+ return nil
},
onOrphaned: cleanup,
wq: s.wq,
@@ -778,6 +773,11 @@
t.state = closing
streams := t.activeStreams
t.activeStreams = nil
+ if t.kpDormant {
+ // If the keepalive goroutine is blocked on this condition variable, we
+ // should unblock it so that the goroutine eventually exits.
+ t.kpDormancyCond.Signal()
+ }
t.mu.Unlock()
t.controlBuf.finish()
t.cancel()
@@ -853,11 +853,11 @@
return t.controlBuf.put(df)
}
-func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
+func (t *http2Client) getStream(f http2.Frame) *Stream {
t.mu.Lock()
- defer t.mu.Unlock()
- s, ok := t.activeStreams[f.Header().StreamID]
- return s, ok
+ s := t.activeStreams[f.Header().StreamID]
+ t.mu.Unlock()
+ return s
}
// adjustWindow sends out extra window update over the initial window size
@@ -937,8 +937,8 @@
t.controlBuf.put(bdpPing)
}
// Select the right stream to dispatch.
- s, ok := t.getStream(f)
- if !ok {
+ s := t.getStream(f)
+ if s == nil {
return
}
if size > 0 {
@@ -969,8 +969,8 @@
}
func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
- s, ok := t.getStream(f)
- if !ok {
+ s := t.getStream(f)
+ if s == nil {
return
}
if f.ErrCode == http2.ErrCodeRefusedStream {
@@ -1147,8 +1147,8 @@
// operateHeaders takes action on the decoded headers.
func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
- s, ok := t.getStream(frame)
- if !ok {
+ s := t.getStream(frame)
+ if s == nil {
return
}
endStream := frame.StreamEnded()
@@ -1191,6 +1191,7 @@
// If headerChan hasn't been closed yet
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
+ s.headerValid = true
if !endStream {
// HEADERS frame block carries a Response-Headers.
isHeader = true
@@ -1233,7 +1234,7 @@
}
t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
if t.keepaliveEnabled {
- atomic.CompareAndSwapUint32(&t.activity, 0, 1)
+ atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
}
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
@@ -1248,7 +1249,7 @@
t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame()
if t.keepaliveEnabled {
- atomic.CompareAndSwapUint32(&t.activity, 0, 1)
+ atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
}
if err != nil {
// Abort an active stream if the http2.Framer returns a
@@ -1292,56 +1293,84 @@
}
}
+func minTime(a, b time.Duration) time.Duration {
+ if a < b {
+ return a
+ }
+ return b
+}
+
// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
func (t *http2Client) keepalive() {
p := &ping{data: [8]byte{}}
+ // True iff a ping has been sent, and no data has been received since then.
+ outstandingPing := false
+ // Amount of time remaining before which we should receive an ACK for the
+ // last sent ping.
+ timeoutLeft := time.Duration(0)
+ // Records the last value of t.lastRead before we go block on the timer.
+ // This is required to check for read activity since then.
+ prevNano := time.Now().UnixNano()
timer := time.NewTimer(t.kp.Time)
for {
select {
case <-timer.C:
- if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
- timer.Reset(t.kp.Time)
+ lastRead := atomic.LoadInt64(&t.lastRead)
+ if lastRead > prevNano {
+ // There has been read activity since the last time we were here.
+ outstandingPing = false
+ // Next timer should fire at kp.Time seconds from lastRead time.
+ timer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
+ prevNano = lastRead
continue
}
- // Check if keepalive should go dormant.
+ if outstandingPing && timeoutLeft <= 0 {
+ t.Close()
+ return
+ }
t.mu.Lock()
+ if t.state == closing {
+ // If the transport is closing, we should exit from the
+ // keepalive goroutine here. If not, we could have a race
+ // between the call to Signal() from Close() and the call to
+ // Wait() here, whereby the keepalive goroutine ends up
+ // blocking on the condition variable which will never be
+ // signalled again.
+ t.mu.Unlock()
+ return
+ }
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
- // Make awakenKeepalive writable.
- <-t.awakenKeepalive
- t.mu.Unlock()
- select {
- case <-t.awakenKeepalive:
- // If the control gets here a ping has been sent
- // need to reset the timer with keepalive.Timeout.
- case <-t.ctx.Done():
- return
- }
- } else {
- t.mu.Unlock()
+ // If a ping was sent out previously (because there were active
+ // streams at that point) which wasn't acked and its timeout
+ // hadn't fired, but we got here and are about to go dormant,
+ // we should make sure that we unconditionally send a ping once
+ // we awaken.
+ outstandingPing = false
+ t.kpDormant = true
+ t.kpDormancyCond.Wait()
+ }
+ t.kpDormant = false
+ t.mu.Unlock()
+
+ // We get here either because we were dormant and a new stream was
+ // created which unblocked the Wait() call, or because the
+ // keepalive timer expired. In both cases, we need to send a ping.
+ if !outstandingPing {
if channelz.IsOn() {
atomic.AddInt64(&t.czData.kpCount, 1)
}
- // Send ping.
t.controlBuf.put(p)
+ timeoutLeft = t.kp.Timeout
+ outstandingPing = true
}
-
- // By the time control gets here a ping has been sent one way or the other.
- timer.Reset(t.kp.Timeout)
- select {
- case <-timer.C:
- if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
- timer.Reset(t.kp.Time)
- continue
- }
- infof("transport: closing client transport due to idleness.")
- t.Close()
- return
- case <-t.ctx.Done():
- if !timer.Stop() {
- <-timer.C
- }
- return
- }
+ // The amount of time to sleep here is the minimum of kp.Time and
+ // timeoutLeft. This will ensure that we wait only for kp.Time
+ // before sending out the next ping (for cases where the ping is
+ // acked).
+ sleepDuration := minTime(t.kp.Time, timeoutLeft)
+ timeoutLeft -= sleepDuration
+ prevNano = lastRead
+ timer.Reset(sleepDuration)
case <-t.ctx.Done():
if !timer.Stop() {
<-timer.C
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
index 4e26f6a..0760383 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
@@ -65,8 +65,7 @@
// http2Server implements the ServerTransport interface with HTTP2.
type http2Server struct {
ctx context.Context
- ctxDone <-chan struct{} // Cache the context.Done() chan
- cancel context.CancelFunc
+ done chan struct{}
conn net.Conn
loopy *loopyWriter
readerDone chan struct{} // sync point to enable testing.
@@ -175,6 +174,12 @@
Val: *config.MaxHeaderListSize,
})
}
+ if config.HeaderTableSize != nil {
+ isettings = append(isettings, http2.Setting{
+ ID: http2.SettingHeaderTableSize,
+ Val: *config.HeaderTableSize,
+ })
+ }
if err := framer.fr.WriteSettings(isettings...); err != nil {
return nil, connectionErrorf(false, err, "transport: %v", err)
}
@@ -206,11 +211,10 @@
if kep.MinTime == 0 {
kep.MinTime = defaultKeepalivePolicyMinTime
}
- ctx, cancel := context.WithCancel(context.Background())
+ done := make(chan struct{})
t := &http2Server{
- ctx: ctx,
- cancel: cancel,
- ctxDone: ctx.Done(),
+ ctx: context.Background(),
+ done: done,
conn: conn,
remoteAddr: conn.RemoteAddr(),
localAddr: conn.LocalAddr(),
@@ -231,7 +235,7 @@
czData: new(channelzData),
bufferPool: newBufferPool(),
}
- t.controlBuf = newControlBuffer(t.ctxDone)
+ t.controlBuf = newControlBuffer(t.done)
if dynamicWindow {
t.bdpEst = &bdpEstimator{
bdp: initialWindowSize,
@@ -362,12 +366,14 @@
rstCode: http2.ErrCodeRefusedStream,
onWrite: func() {},
})
+ s.cancel()
return false
}
}
t.mu.Lock()
if t.state != reachable {
t.mu.Unlock()
+ s.cancel()
return false
}
if uint32(len(t.activeStreams)) >= t.maxStreams {
@@ -378,12 +384,14 @@
rstCode: http2.ErrCodeRefusedStream,
onWrite: func() {},
})
+ s.cancel()
return false
}
if streamID%2 != 1 || streamID <= t.maxStreamID {
t.mu.Unlock()
// illegal gRPC stream id.
errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
+ s.cancel()
return true
}
t.maxStreamID = streamID
@@ -749,7 +757,7 @@
return true
}
-// WriteHeader sends the header metedata md back to the client.
+// WriteHeader sends the header metadata md back to the client.
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
if s.updateHeaderSent() || s.getState() == streamDone {
return ErrIllegalHeaderWrite
@@ -885,7 +893,7 @@
// TODO(mmukhi, dfawley): Should the server write also return io.EOF?
s.cancel()
select {
- case <-t.ctx.Done():
+ case <-t.done:
return ErrConnClosing
default:
}
@@ -907,7 +915,7 @@
}
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
select {
- case <-t.ctx.Done():
+ case <-t.done:
return ErrConnClosing
default:
}
@@ -973,7 +981,7 @@
t.Close()
// Resetting the timer so that the clean-up doesn't deadlock.
maxAge.Reset(infinity)
- case <-t.ctx.Done():
+ case <-t.done:
}
return
case <-keepalive.C:
@@ -995,7 +1003,7 @@
}
t.controlBuf.put(p)
keepalive.Reset(t.kp.Timeout)
- case <-t.ctx.Done():
+ case <-t.done:
return
}
}
@@ -1015,7 +1023,7 @@
t.activeStreams = nil
t.mu.Unlock()
t.controlBuf.finish()
- t.cancel()
+ close(t.done)
err := t.conn.Close()
if channelz.IsOn() {
channelz.RemoveEntry(t.channelzID)
@@ -1155,7 +1163,7 @@
select {
case <-t.drainChan:
case <-timer.C:
- case <-t.ctx.Done():
+ case <-t.done:
return
}
t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
@@ -1205,7 +1213,7 @@
select {
case sz := <-resp:
return int64(sz)
- case <-t.ctxDone:
+ case <-t.done:
return -1
case <-timer.C:
return -2
diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go
index 1c1d106..bfab940 100644
--- a/vendor/google.golang.org/grpc/internal/transport/transport.go
+++ b/vendor/google.golang.org/grpc/internal/transport/transport.go
@@ -233,6 +233,7 @@
type Stream struct {
id uint32
st ServerTransport // nil for client side Stream
+ ct *http2Client // nil for server side Stream
ctx context.Context // the associated context of the stream
cancel context.CancelFunc // always nil for client side Stream
done chan struct{} // closed at the end of stream to unblock writers. On the client side.
@@ -251,6 +252,10 @@
headerChan chan struct{} // closed to indicate the end of header metadata.
headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
+ // headerValid indicates whether a valid header was received. Only
+ // meaningful after headerChan is closed (always call waitOnHeader() before
+ // reading its value). Not valid on server side.
+ headerValid bool
// hdrMu protects header and trailer metadata on the server-side.
hdrMu sync.Mutex
@@ -303,34 +308,28 @@
return streamState(atomic.LoadUint32((*uint32)(&s.state)))
}
-func (s *Stream) waitOnHeader() error {
+func (s *Stream) waitOnHeader() {
if s.headerChan == nil {
// On the server headerChan is always nil since a stream originates
// only after having received headers.
- return nil
+ return
}
select {
case <-s.ctx.Done():
- // We prefer success over failure when reading messages because we delay
- // context error in stream.Read(). To keep behavior consistent, we also
- // prefer success here.
- select {
- case <-s.headerChan:
- return nil
- default:
- }
- return ContextErr(s.ctx.Err())
+ // Close the stream to prevent headers/trailers from changing after
+ // this function returns.
+ s.ct.CloseStream(s, ContextErr(s.ctx.Err()))
+ // headerChan could possibly not be closed yet if closeStream raced
+ // with operateHeaders; wait until it is closed explicitly here.
+ <-s.headerChan
case <-s.headerChan:
- return nil
}
}
// RecvCompress returns the compression algorithm applied to the inbound
// message. It is empty string if there is no compression applied.
func (s *Stream) RecvCompress() string {
- if err := s.waitOnHeader(); err != nil {
- return ""
- }
+ s.waitOnHeader()
return s.recvCompress
}
@@ -351,36 +350,27 @@
// available. It blocks until i) the metadata is ready or ii) there is no header
// metadata or iii) the stream is canceled/expired.
//
-// On server side, it returns the out header after t.WriteHeader is called.
+// On server side, it returns the out header after t.WriteHeader is called. It
+// does not block and must not be called until after WriteHeader.
func (s *Stream) Header() (metadata.MD, error) {
- if s.headerChan == nil && s.header != nil {
+ if s.headerChan == nil {
// On server side, return the header in stream. It will be the out
// header after t.WriteHeader is called.
return s.header.Copy(), nil
}
- err := s.waitOnHeader()
- // Even if the stream is closed, header is returned if available.
- select {
- case <-s.headerChan:
- if s.header == nil {
- return nil, nil
- }
- return s.header.Copy(), nil
- default:
+ s.waitOnHeader()
+ if !s.headerValid {
+ return nil, s.status.Err()
}
- return nil, err
+ return s.header.Copy(), nil
}
// TrailersOnly blocks until a header or trailers-only frame is received and
// then returns true if the stream was trailers-only. If the stream ends
-// before headers are received, returns true, nil. If a context error happens
-// first, returns it as a status error. Client-side only.
-func (s *Stream) TrailersOnly() (bool, error) {
- err := s.waitOnHeader()
- if err != nil {
- return false, err
- }
- return s.noHeaders, nil
+// before headers are received, returns true, nil. Client-side only.
+func (s *Stream) TrailersOnly() bool {
+ s.waitOnHeader()
+ return s.noHeaders
}
// Trailer returns the cached trailer metedata. Note that if it is not called
@@ -534,6 +524,7 @@
ReadBufferSize int
ChannelzParentID int64
MaxHeaderListSize *uint32
+ HeaderTableSize *uint32
}
// NewServerTransport creates a ServerTransport with conn or non-nil error