VOL-2138 Use v2 import paths for voltha-lib-go;
migrate from voltha-go to voltha-lib-go
Change-Id: I3db6759f3c0cea3c2164889b3d36eae708b19bde
diff --git a/vendor/google.golang.org/grpc/internal/balancerload/load.go b/vendor/google.golang.org/grpc/internal/balancerload/load.go
new file mode 100644
index 0000000..3a905d9
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/balancerload/load.go
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2019 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package balancerload defines APIs to parse server loads in trailers. The
+// parsed loads are sent to balancers in DoneInfo.
+package balancerload
+
+import (
+ "google.golang.org/grpc/metadata"
+)
+
+// Parser converts loads from metadata into a concrete type.
+type Parser interface {
+ // Parse parses loads from metadata.
+ Parse(md metadata.MD) interface{}
+}
+
+var parser Parser
+
+// SetParser sets the load parser.
+//
+// Not mutex-protected, should be called before any gRPC functions.
+func SetParser(lr Parser) {
+ parser = lr
+}
+
+// Parse calls parser.Read().
+func Parse(md metadata.MD) interface{} {
+ if parser == nil {
+ return nil
+ }
+ return parser.Parse(md)
+}
diff --git a/vendor/google.golang.org/grpc/internal/channelz/funcs.go b/vendor/google.golang.org/grpc/internal/channelz/funcs.go
index 041520d..f0744f9 100644
--- a/vendor/google.golang.org/grpc/internal/channelz/funcs.go
+++ b/vendor/google.golang.org/grpc/internal/channelz/funcs.go
@@ -24,6 +24,7 @@
package channelz
import (
+ "fmt"
"sort"
"sync"
"sync/atomic"
@@ -95,9 +96,14 @@
// NewChannelzStorage initializes channelz data storage and id generator.
//
+// This function returns a cleanup function to wait for all channelz state to be reset by the
+// grpc goroutines when those entities get closed. By using this cleanup function, we make sure tests
+// don't mess up each other, i.e. lingering goroutine from previous test doing entity removal happen
+// to remove some entity just register by the new test, since the id space is the same.
+//
// Note: This function is exported for testing purpose only. User should not call
// it in most cases.
-func NewChannelzStorage() {
+func NewChannelzStorage() (cleanup func() error) {
db.set(&channelMap{
topLevelChannels: make(map[int64]struct{}),
channels: make(map[int64]*channel),
@@ -107,6 +113,28 @@
subChannels: make(map[int64]*subChannel),
})
idGen.reset()
+ return func() error {
+ var err error
+ cm := db.get()
+ if cm == nil {
+ return nil
+ }
+ for i := 0; i < 1000; i++ {
+ cm.mu.Lock()
+ if len(cm.topLevelChannels) == 0 && len(cm.servers) == 0 && len(cm.channels) == 0 && len(cm.subChannels) == 0 && len(cm.listenSockets) == 0 && len(cm.normalSockets) == 0 {
+ cm.mu.Unlock()
+ // all things stored in the channelz map have been cleared.
+ return nil
+ }
+ cm.mu.Unlock()
+ time.Sleep(10 * time.Millisecond)
+ }
+
+ cm.mu.Lock()
+ err = fmt.Errorf("after 10s the channelz map has not been cleaned up yet, topchannels: %d, servers: %d, channels: %d, subchannels: %d, listen sockets: %d, normal sockets: %d", len(cm.topLevelChannels), len(cm.servers), len(cm.channels), len(cm.subChannels), len(cm.listenSockets), len(cm.normalSockets))
+ cm.mu.Unlock()
+ return err
+ }
}
// GetTopChannels returns a slice of top channel's ChannelMetric, along with a
diff --git a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
index 62ed0f2..3ee8740 100644
--- a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
+++ b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
@@ -25,47 +25,11 @@
)
const (
- prefix = "GRPC_GO_"
- retryStr = prefix + "RETRY"
- requireHandshakeStr = prefix + "REQUIRE_HANDSHAKE"
-)
-
-// RequireHandshakeSetting describes the settings for handshaking.
-type RequireHandshakeSetting int
-
-const (
- // RequireHandshakeHybrid (default, deprecated) indicates to not wait for
- // handshake before considering a connection ready, but wait before
- // considering successful.
- RequireHandshakeHybrid RequireHandshakeSetting = iota
- // RequireHandshakeOn (default after the 1.17 release) indicates to wait
- // for handshake before considering a connection ready/successful.
- RequireHandshakeOn
- // RequireHandshakeOff indicates to not wait for handshake before
- // considering a connection ready/successful.
- RequireHandshakeOff
+ prefix = "GRPC_GO_"
+ retryStr = prefix + "RETRY"
)
var (
// Retry is set if retry is explicitly enabled via "GRPC_GO_RETRY=on".
Retry = strings.EqualFold(os.Getenv(retryStr), "on")
- // RequireHandshake is set based upon the GRPC_GO_REQUIRE_HANDSHAKE
- // environment variable.
- //
- // Will be removed after the 1.18 release.
- RequireHandshake RequireHandshakeSetting
)
-
-func init() {
- switch strings.ToLower(os.Getenv(requireHandshakeStr)) {
- case "on":
- fallthrough
- default:
- RequireHandshake = RequireHandshakeOn
- case "off":
- RequireHandshake = RequireHandshakeOff
- case "hybrid":
- // Will be removed after the 1.17 release.
- RequireHandshake = RequireHandshakeHybrid
- }
-}
diff --git a/vendor/google.golang.org/grpc/internal/internal.go b/vendor/google.golang.org/grpc/internal/internal.go
index c1d2c69..bc1f99a 100644
--- a/vendor/google.golang.org/grpc/internal/internal.go
+++ b/vendor/google.golang.org/grpc/internal/internal.go
@@ -23,6 +23,8 @@
import (
"context"
"time"
+
+ "google.golang.org/grpc/connectivity"
)
var (
@@ -37,10 +39,25 @@
// KeepaliveMinPingTime is the minimum ping interval. This must be 10s by
// default, but tests may wish to set it lower for convenience.
KeepaliveMinPingTime = 10 * time.Second
+ // ParseServiceConfig is a function to parse JSON service configs into
+ // opaque data structures.
+ ParseServiceConfig func(sc string) (interface{}, error)
+ // StatusRawProto is exported by status/status.go. This func returns a
+ // pointer to the wrapped Status proto for a given status.Status without a
+ // call to proto.Clone(). The returned Status proto should not be mutated by
+ // the caller.
+ StatusRawProto interface{} // func (*status.Status) *spb.Status
)
// HealthChecker defines the signature of the client-side LB channel health checking function.
-type HealthChecker func(ctx context.Context, newStream func() (interface{}, error), reportHealth func(bool), serviceName string) error
+//
+// The implementation is expected to create a health checking RPC stream by
+// calling newStream(), watch for the health status of serviceName, and report
+// it's health back by calling setConnectivityState().
+//
+// The health checking protocol is defined at:
+// https://github.com/grpc/grpc/blob/master/doc/health-checking.md
+type HealthChecker func(ctx context.Context, newStream func(string) (interface{}, error), setConnectivityState func(connectivity.State), serviceName string) error
const (
// CredsBundleModeFallback switches GoogleDefaultCreds to fallback mode.
diff --git a/vendor/google.golang.org/grpc/internal/syscall/syscall_nonlinux.go b/vendor/google.golang.org/grpc/internal/syscall/syscall_nonlinux.go
index 61678fe..d3fd9da 100644
--- a/vendor/google.golang.org/grpc/internal/syscall/syscall_nonlinux.go
+++ b/vendor/google.golang.org/grpc/internal/syscall/syscall_nonlinux.go
@@ -22,18 +22,24 @@
import (
"net"
+ "sync"
"time"
"google.golang.org/grpc/grpclog"
)
-func init() {
- grpclog.Info("CPU time info is unavailable on non-linux or appengine environment.")
+var once sync.Once
+
+func log() {
+ once.Do(func() {
+ grpclog.Info("CPU time info is unavailable on non-linux or appengine environment.")
+ })
}
// GetCPUTime returns the how much CPU time has passed since the start of this process.
// It always returns 0 under non-linux or appengine environment.
func GetCPUTime() int64 {
+ log()
return 0
}
@@ -42,22 +48,26 @@
// GetRusage is a no-op function under non-linux or appengine environment.
func GetRusage() (rusage *Rusage) {
+ log()
return nil
}
// CPUTimeDiff returns the differences of user CPU time and system CPU time used
// between two Rusage structs. It a no-op function for non-linux or appengine environment.
func CPUTimeDiff(first *Rusage, latest *Rusage) (float64, float64) {
+ log()
return 0, 0
}
// SetTCPUserTimeout is a no-op function under non-linux or appengine environments
func SetTCPUserTimeout(conn net.Conn, timeout time.Duration) error {
+ log()
return nil
}
// GetTCPUserTimeout is a no-op function under non-linux or appengine environments
// a negative return value indicates the operation is not supported
func GetTCPUserTimeout(conn net.Conn) (int, error) {
+ log()
return -1, nil
}
diff --git a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
index 204ba15..ddee20b 100644
--- a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
+++ b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
@@ -23,6 +23,7 @@
"fmt"
"runtime"
"sync"
+ "sync/atomic"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
@@ -84,24 +85,40 @@
// the control buffer of transport. They represent different aspects of
// control tasks, e.g., flow control, settings, streaming resetting, etc.
+// maxQueuedTransportResponseFrames is the most queued "transport response"
+// frames we will buffer before preventing new reads from occurring on the
+// transport. These are control frames sent in response to client requests,
+// such as RST_STREAM due to bad headers or settings acks.
+const maxQueuedTransportResponseFrames = 50
+
+type cbItem interface {
+ isTransportResponseFrame() bool
+}
+
// registerStream is used to register an incoming stream with loopy writer.
type registerStream struct {
streamID uint32
wq *writeQuota
}
+func (*registerStream) isTransportResponseFrame() bool { return false }
+
// headerFrame is also used to register stream on the client-side.
type headerFrame struct {
streamID uint32
hf []hpack.HeaderField
- endStream bool // Valid on server side.
- initStream func(uint32) (bool, error) // Used only on the client side.
+ endStream bool // Valid on server side.
+ initStream func(uint32) error // Used only on the client side.
onWrite func()
wq *writeQuota // write quota for the stream created.
cleanup *cleanupStream // Valid on the server side.
onOrphaned func(error) // Valid on client-side
}
+func (h *headerFrame) isTransportResponseFrame() bool {
+ return h.cleanup != nil && h.cleanup.rst // Results in a RST_STREAM
+}
+
type cleanupStream struct {
streamID uint32
rst bool
@@ -109,6 +126,8 @@
onWrite func()
}
+func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
+
type dataFrame struct {
streamID uint32
endStream bool
@@ -119,27 +138,41 @@
onEachWrite func()
}
+func (*dataFrame) isTransportResponseFrame() bool { return false }
+
type incomingWindowUpdate struct {
streamID uint32
increment uint32
}
+func (*incomingWindowUpdate) isTransportResponseFrame() bool { return false }
+
type outgoingWindowUpdate struct {
streamID uint32
increment uint32
}
+func (*outgoingWindowUpdate) isTransportResponseFrame() bool {
+ return false // window updates are throttled by thresholds
+}
+
type incomingSettings struct {
ss []http2.Setting
}
+func (*incomingSettings) isTransportResponseFrame() bool { return true } // Results in a settings ACK
+
type outgoingSettings struct {
ss []http2.Setting
}
+func (*outgoingSettings) isTransportResponseFrame() bool { return false }
+
type incomingGoAway struct {
}
+func (*incomingGoAway) isTransportResponseFrame() bool { return false }
+
type goAway struct {
code http2.ErrCode
debugData []byte
@@ -147,15 +180,21 @@
closeConn bool
}
+func (*goAway) isTransportResponseFrame() bool { return false }
+
type ping struct {
ack bool
data [8]byte
}
+func (*ping) isTransportResponseFrame() bool { return true }
+
type outFlowControlSizeRequest struct {
resp chan uint32
}
+func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }
+
type outStreamState int
const (
@@ -238,6 +277,14 @@
consumerWaiting bool
list *itemList
err error
+
+ // transportResponseFrames counts the number of queued items that represent
+ // the response of an action initiated by the peer. trfChan is created
+ // when transportResponseFrames >= maxQueuedTransportResponseFrames and is
+ // closed and nilled when transportResponseFrames drops below the
+ // threshold. Both fields are protected by mu.
+ transportResponseFrames int
+ trfChan atomic.Value // *chan struct{}
}
func newControlBuffer(done <-chan struct{}) *controlBuffer {
@@ -248,12 +295,24 @@
}
}
-func (c *controlBuffer) put(it interface{}) error {
+// throttle blocks if there are too many incomingSettings/cleanupStreams in the
+// controlbuf.
+func (c *controlBuffer) throttle() {
+ ch, _ := c.trfChan.Load().(*chan struct{})
+ if ch != nil {
+ select {
+ case <-*ch:
+ case <-c.done:
+ }
+ }
+}
+
+func (c *controlBuffer) put(it cbItem) error {
_, err := c.executeAndPut(nil, it)
return err
}
-func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it interface{}) (bool, error) {
+func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (bool, error) {
var wakeUp bool
c.mu.Lock()
if c.err != nil {
@@ -271,6 +330,15 @@
c.consumerWaiting = false
}
c.list.enqueue(it)
+ if it.isTransportResponseFrame() {
+ c.transportResponseFrames++
+ if c.transportResponseFrames == maxQueuedTransportResponseFrames {
+ // We are adding the frame that puts us over the threshold; create
+ // a throttling channel.
+ ch := make(chan struct{})
+ c.trfChan.Store(&ch)
+ }
+ }
c.mu.Unlock()
if wakeUp {
select {
@@ -304,7 +372,17 @@
return nil, c.err
}
if !c.list.isEmpty() {
- h := c.list.dequeue()
+ h := c.list.dequeue().(cbItem)
+ if h.isTransportResponseFrame() {
+ if c.transportResponseFrames == maxQueuedTransportResponseFrames {
+ // We are removing the frame that put us over the
+ // threshold; close and clear the throttling channel.
+ ch := c.trfChan.Load().(*chan struct{})
+ close(*ch)
+ c.trfChan.Store((*chan struct{})(nil))
+ }
+ c.transportResponseFrames--
+ }
c.mu.Unlock()
return h, nil
}
@@ -559,21 +637,17 @@
func (l *loopyWriter) originateStream(str *outStream) error {
hdr := str.itl.dequeue().(*headerFrame)
- sendPing, err := hdr.initStream(str.id)
- if err != nil {
+ if err := hdr.initStream(str.id); err != nil {
if err == ErrConnClosing {
return err
}
// Other errors(errStreamDrain) need not close transport.
return nil
}
- if err = l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
+ if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
return err
}
l.estdStreams[str.id] = str
- if sendPing {
- return l.pingHandler(&ping{data: [8]byte{}})
- }
return nil
}
diff --git a/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go b/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go
index 5ea997a..f262edd 100644
--- a/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go
+++ b/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go
@@ -149,6 +149,7 @@
n = uint32(math.MaxInt32)
}
f.mu.Lock()
+ defer f.mu.Unlock()
// estSenderQuota is the receiver's view of the maximum number of bytes the sender
// can send without a window update.
estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate))
@@ -169,10 +170,8 @@
// is padded; We will fallback on the current available window(at least a 1/4th of the limit).
f.delta = n
}
- f.mu.Unlock()
return f.delta
}
- f.mu.Unlock()
return 0
}
diff --git a/vendor/google.golang.org/grpc/internal/transport/handler_server.go b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
index 73b41ea..78f9ddc 100644
--- a/vendor/google.golang.org/grpc/internal/transport/handler_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
@@ -24,6 +24,7 @@
package transport
import (
+ "bytes"
"context"
"errors"
"fmt"
@@ -63,9 +64,6 @@
if _, ok := w.(http.Flusher); !ok {
return nil, errors.New("gRPC requires a ResponseWriter supporting http.Flusher")
}
- if _, ok := w.(http.CloseNotifier); !ok {
- return nil, errors.New("gRPC requires a ResponseWriter supporting http.CloseNotifier")
- }
st := &serverHandlerTransport{
rw: w,
@@ -176,17 +174,11 @@
// do runs fn in the ServeHTTP goroutine.
func (ht *serverHandlerTransport) do(fn func()) error {
- // Avoid a panic writing to closed channel. Imperfect but maybe good enough.
select {
case <-ht.closedCh:
return ErrConnClosing
- default:
- select {
- case ht.writes <- fn:
- return nil
- case <-ht.closedCh:
- return ErrConnClosing
- }
+ case ht.writes <- fn:
+ return nil
}
}
@@ -237,7 +229,6 @@
if ht.stats != nil {
ht.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
}
- close(ht.writes)
}
ht.Close()
return err
@@ -315,19 +306,13 @@
ctx, cancel = context.WithCancel(ctx)
}
- // requestOver is closed when either the request's context is done
- // or the status has been written via WriteStatus.
+ // requestOver is closed when the status has been written via WriteStatus.
requestOver := make(chan struct{})
-
- // clientGone receives a single value if peer is gone, either
- // because the underlying connection is dead or because the
- // peer sends an http2 RST_STREAM.
- clientGone := ht.rw.(http.CloseNotifier).CloseNotify()
go func() {
select {
case <-requestOver:
case <-ht.closedCh:
- case <-clientGone:
+ case <-ht.req.Context().Done():
}
cancel()
ht.Close()
@@ -363,7 +348,7 @@
ht.stats.HandleRPC(s.ctx, inHeader)
}
s.trReader = &transportReader{
- reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf},
+ reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf, freeBuffer: func(*bytes.Buffer) {}},
windowHandler: func(int) {},
}
@@ -377,7 +362,7 @@
for buf := make([]byte, readSize); ; {
n, err := req.Body.Read(buf)
if n > 0 {
- s.buf.put(recvMsg{data: buf[:n:n]})
+ s.buf.put(recvMsg{buffer: bytes.NewBuffer(buf[:n:n])})
buf = buf[n:]
}
if err != nil {
@@ -407,10 +392,7 @@
func (ht *serverHandlerTransport) runStream() {
for {
select {
- case fn, ok := <-ht.writes:
- if !ok {
- return
- }
+ case fn := <-ht.writes:
fn()
case <-ht.closedCh:
return
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
index ff8f4db..9bd8c27 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
@@ -62,8 +62,6 @@
// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
// that the server sent GoAway on this transport.
goAway chan struct{}
- // awakenKeepalive is used to wake up keepalive when after it has gone dormant.
- awakenKeepalive chan struct{}
framer *framer
// controlBuf delivers all the control related tasks (e.g., window
@@ -110,6 +108,16 @@
// goAwayReason records the http2.ErrCode and debug data received with the
// GoAway frame.
goAwayReason GoAwayReason
+ // A condition variable used to signal when the keepalive goroutine should
+ // go dormant. The condition for dormancy is based on the number of active
+ // streams and the `PermitWithoutStream` keepalive client parameter. And
+ // since the number of active streams is guarded by the above mutex, we use
+ // the same for this condition variable as well.
+ kpDormancyCond *sync.Cond
+ // A boolean to track whether the keepalive goroutine is dormant or not.
+ // This is checked before attempting to signal the above condition
+ // variable.
+ kpDormant bool
// Fields below are for channelz metric collection.
channelzID int64 // channelz unique identification number
@@ -117,6 +125,8 @@
onGoAway func(GoAwayReason)
onClose func()
+
+ bufferPool *bufferPool
}
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
@@ -230,7 +240,6 @@
readerDone: make(chan struct{}),
writerDone: make(chan struct{}),
goAway: make(chan struct{}),
- awakenKeepalive: make(chan struct{}, 1),
framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
fc: &trInFlow{limit: uint32(icwz)},
scheme: scheme,
@@ -249,6 +258,7 @@
onGoAway: onGoAway,
onClose: onClose,
keepaliveEnabled: keepaliveEnabled,
+ bufferPool: newBufferPool(),
}
t.controlBuf = newControlBuffer(t.ctxDone)
if opts.InitialWindowSize >= defaultWindowSize {
@@ -261,9 +271,6 @@
updateFlowControl: t.updateFlowControl,
}
}
- // Make sure awakenKeepalive can't be written upon.
- // keepalive routine will make it writable, if need be.
- t.awakenKeepalive <- struct{}{}
if t.statsHandler != nil {
t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
@@ -278,6 +285,7 @@
t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
}
if t.keepaliveEnabled {
+ t.kpDormancyCond = sync.NewCond(&t.mu)
go t.keepalive()
}
// Start the reader goroutine for incoming message. Each transport has
@@ -367,6 +375,7 @@
closeStream: func(err error) {
t.CloseStream(s, err)
},
+ freeBuffer: t.bufferPool.put,
},
windowHandler: func(n int) {
t.updateWindow(s, uint32(n))
@@ -437,6 +446,15 @@
if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
var k string
+ for k, vv := range md {
+ // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
+ if isReservedHeader(k) {
+ continue
+ }
+ for _, v := range vv {
+ headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
+ }
+ }
for _, vv := range added {
for i, v := range vv {
if i%2 == 0 {
@@ -450,15 +468,6 @@
headerFields = append(headerFields, hpack.HeaderField{Name: strings.ToLower(k), Value: encodeMetadataHeader(k, v)})
}
}
- for k, vv := range md {
- // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
- if isReservedHeader(k) {
- continue
- }
- for _, v := range vv {
- headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
- }
- }
}
if md, ok := t.md.(*metadata.MD); ok {
for k, vv := range *md {
@@ -489,6 +498,9 @@
}
func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) {
+ if len(t.perRPCCreds) == 0 {
+ return nil, nil
+ }
authData := map[string]string{}
for _, c := range t.perRPCCreds {
data, err := c.GetRequestMetadata(ctx, audience)
@@ -509,7 +521,7 @@
}
func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) {
- callAuthData := map[string]string{}
+ var callAuthData map[string]string
// Check if credentials.PerRPCCredentials were provided via call options.
// Note: if these credentials are provided both via dial options and call
// options, then both sets of credentials will be applied.
@@ -521,6 +533,7 @@
if err != nil {
return nil, status.Errorf(codes.Internal, "transport: %v", err)
}
+ callAuthData = make(map[string]string, len(data))
for k, v := range data {
// Capital header names are illegal in HTTP/2
k = strings.ToLower(k)
@@ -549,15 +562,14 @@
s.write(recvMsg{err: err})
close(s.done)
// If headerChan isn't closed, then close it.
- if atomic.SwapUint32(&s.headerDone, 1) == 0 {
+ if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
close(s.headerChan)
}
-
}
hdr := &headerFrame{
hf: headerFields,
endStream: false,
- initStream: func(id uint32) (bool, error) {
+ initStream: func(id uint32) error {
t.mu.Lock()
if state := t.state; state != reachable {
t.mu.Unlock()
@@ -567,29 +579,19 @@
err = ErrConnClosing
}
cleanup(err)
- return false, err
+ return err
}
t.activeStreams[id] = s
if channelz.IsOn() {
atomic.AddInt64(&t.czData.streamsStarted, 1)
atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
}
- var sendPing bool
- // If the number of active streams change from 0 to 1, then check if keepalive
- // has gone dormant. If so, wake it up.
- if len(t.activeStreams) == 1 && t.keepaliveEnabled {
- select {
- case t.awakenKeepalive <- struct{}{}:
- sendPing = true
- // Fill the awakenKeepalive channel again as this channel must be
- // kept non-writable except at the point that the keepalive()
- // goroutine is waiting either to be awaken or shutdown.
- t.awakenKeepalive <- struct{}{}
- default:
- }
+ // If the keepalive goroutine has gone dormant, wake it up.
+ if t.kpDormant {
+ t.kpDormancyCond.Signal()
}
t.mu.Unlock()
- return sendPing, nil
+ return nil
},
onOrphaned: cleanup,
wq: s.wq,
@@ -713,7 +715,7 @@
s.write(recvMsg{err: err})
}
// If headerChan isn't closed, then close it.
- if atomic.SwapUint32(&s.headerDone, 1) == 0 {
+ if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
s.noHeaders = true
close(s.headerChan)
}
@@ -765,9 +767,17 @@
t.mu.Unlock()
return nil
}
+ // Call t.onClose before setting the state to closing to prevent the client
+ // from attempting to create new streams ASAP.
+ t.onClose()
t.state = closing
streams := t.activeStreams
t.activeStreams = nil
+ if t.kpDormant {
+ // If the keepalive goroutine is blocked on this condition variable, we
+ // should unblock it so that the goroutine eventually exits.
+ t.kpDormancyCond.Signal()
+ }
t.mu.Unlock()
t.controlBuf.finish()
t.cancel()
@@ -785,7 +795,6 @@
}
t.statsHandler.HandleConn(t.ctx, connEnd)
}
- t.onClose()
return err
}
@@ -794,21 +803,21 @@
// stream is closed. If there are no active streams, the transport is closed
// immediately. This does nothing if the transport is already draining or
// closing.
-func (t *http2Client) GracefulClose() error {
+func (t *http2Client) GracefulClose() {
t.mu.Lock()
// Make sure we move to draining only from active.
if t.state == draining || t.state == closing {
t.mu.Unlock()
- return nil
+ return
}
t.state = draining
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
- return t.Close()
+ t.Close()
+ return
}
t.controlBuf.put(&incomingGoAway{})
- return nil
}
// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
@@ -844,11 +853,11 @@
return t.controlBuf.put(df)
}
-func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
+func (t *http2Client) getStream(f http2.Frame) *Stream {
t.mu.Lock()
- defer t.mu.Unlock()
- s, ok := t.activeStreams[f.Header().StreamID]
- return s, ok
+ s := t.activeStreams[f.Header().StreamID]
+ t.mu.Unlock()
+ return s
}
// adjustWindow sends out extra window update over the initial window size
@@ -928,8 +937,8 @@
t.controlBuf.put(bdpPing)
}
// Select the right stream to dispatch.
- s, ok := t.getStream(f)
- if !ok {
+ s := t.getStream(f)
+ if s == nil {
return
}
if size > 0 {
@@ -946,9 +955,10 @@
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
if len(f.Data()) > 0 {
- data := make([]byte, len(f.Data()))
- copy(data, f.Data())
- s.write(recvMsg{data: data})
+ buffer := t.bufferPool.get()
+ buffer.Reset()
+ buffer.Write(f.Data())
+ s.write(recvMsg{buffer: buffer})
}
}
// The server has closed the stream without sending trailers. Record that
@@ -959,8 +969,8 @@
}
func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
- s, ok := t.getStream(f)
- if !ok {
+ s := t.getStream(f)
+ if s == nil {
return
}
if f.ErrCode == http2.ErrCodeRefusedStream {
@@ -973,9 +983,9 @@
statusCode = codes.Unknown
}
if statusCode == codes.Canceled {
- // Our deadline was already exceeded, and that was likely the cause of
- // this cancelation. Alter the status code accordingly.
- if d, ok := s.ctx.Deadline(); ok && d.After(time.Now()) {
+ if d, ok := s.ctx.Deadline(); ok && !d.After(time.Now()) {
+ // Our deadline was already exceeded, and that was likely the cause
+ // of this cancelation. Alter the status code accordingly.
statusCode = codes.DeadlineExceeded
}
}
@@ -1080,11 +1090,12 @@
default:
t.setGoAwayReason(f)
close(t.goAway)
- t.state = draining
t.controlBuf.put(&incomingGoAway{})
-
- // This has to be a new goroutine because we're still using the current goroutine to read in the transport.
+ // Notify the clientconn about the GOAWAY before we set the state to
+ // draining, to allow the client to stop attempting to create streams
+ // before disallowing new streams on this connection.
t.onGoAway(t.goAwayReason)
+ t.state = draining
}
// All streams with IDs greater than the GoAwayId
// and smaller than the previous GoAway ID should be killed.
@@ -1136,20 +1147,30 @@
// operateHeaders takes action on the decoded headers.
func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
- s, ok := t.getStream(frame)
- if !ok {
+ s := t.getStream(frame)
+ if s == nil {
return
}
+ endStream := frame.StreamEnded()
atomic.StoreUint32(&s.bytesReceived, 1)
- var state decodeState
- if err := state.decodeHeader(frame); err != nil {
- t.closeStream(s, err, true, http2.ErrCodeProtocol, status.New(codes.Internal, err.Error()), nil, false)
- // Something wrong. Stops reading even when there is remaining.
+ initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0
+
+ if !initialHeader && !endStream {
+ // As specified by gRPC over HTTP2, a HEADERS frame (and associated CONTINUATION frames) can only appear at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
+ st := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream")
+ t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, false)
return
}
- endStream := frame.StreamEnded()
- var isHeader bool
+ state := &decodeState{}
+ // Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode.
+ state.data.isGRPC = !initialHeader
+ if err := state.decodeHeader(frame); err != nil {
+ t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream)
+ return
+ }
+
+ isHeader := false
defer func() {
if t.statsHandler != nil {
if isHeader {
@@ -1167,29 +1188,33 @@
}
}
}()
- // If headers haven't been received yet.
- if atomic.SwapUint32(&s.headerDone, 1) == 0 {
+
+ // If headerChan hasn't been closed yet
+ if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
if !endStream {
- // Headers frame is not actually a trailers-only frame.
+ // HEADERS frame block carries a Response-Headers.
isHeader = true
// These values can be set without any synchronization because
// stream goroutine will read it only after seeing a closed
// headerChan which we'll close after setting this.
- s.recvCompress = state.encoding
- if len(state.mdata) > 0 {
- s.header = state.mdata
+ s.recvCompress = state.data.encoding
+ if len(state.data.mdata) > 0 {
+ s.header = state.data.mdata
}
} else {
+ // HEADERS frame block carries a Trailers-Only.
s.noHeaders = true
}
close(s.headerChan)
}
+
if !endStream {
return
}
+
// if client received END_STREAM from server while stream was still active, send RST_STREAM
rst := s.getState() == streamActive
- t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.mdata, true)
+ t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true)
}
// reader runs as a separate goroutine in charge of reading data from network
@@ -1220,6 +1245,7 @@
// loop to keep reading incoming messages on this transport.
for {
+ t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame()
if t.keepaliveEnabled {
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
@@ -1277,29 +1303,32 @@
timer.Reset(t.kp.Time)
continue
}
- // Check if keepalive should go dormant.
t.mu.Lock()
- if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
- // Make awakenKeepalive writable.
- <-t.awakenKeepalive
+ if t.state == closing {
+ // If the transport is closing, we should exit from the
+ // keepalive goroutine here. If not, we could have a race
+ // between the call to Signal() from Close() and the call to
+ // Wait() here, whereby the keepalive goroutine ends up
+ // blocking on the condition variable which will never be
+ // signalled again.
t.mu.Unlock()
- select {
- case <-t.awakenKeepalive:
- // If the control gets here a ping has been sent
- // need to reset the timer with keepalive.Timeout.
- case <-t.ctx.Done():
- return
- }
- } else {
- t.mu.Unlock()
- if channelz.IsOn() {
- atomic.AddInt64(&t.czData.kpCount, 1)
- }
- // Send ping.
- t.controlBuf.put(p)
+ return
}
+ if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
+ t.kpDormant = true
+ t.kpDormancyCond.Wait()
+ }
+ t.kpDormant = false
+ t.mu.Unlock()
- // By the time control gets here a ping has been sent one way or the other.
+ if channelz.IsOn() {
+ atomic.AddInt64(&t.czData.kpCount, 1)
+ }
+ // We get here either because we were dormant and a new stream was
+ // created which unblocked the Wait() call, or because the
+ // keepalive timer expired. In both cases, we need to send a ping.
+ t.controlBuf.put(p)
+
timer.Reset(t.kp.Timeout)
select {
case <-timer.C:
@@ -1307,6 +1336,7 @@
timer.Reset(t.kp.Time)
continue
}
+ infof("transport: closing client transport due to idleness.")
t.Close()
return
case <-t.ctx.Done():
@@ -1356,6 +1386,8 @@
return &s
}
+func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr }
+
func (t *http2Client) IncrMsgSent() {
atomic.AddInt64(&t.czData.msgSent, 1)
atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
index d038b2d..33686a1 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
@@ -35,9 +35,11 @@
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
+ spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/keepalive"
@@ -55,13 +57,15 @@
// ErrHeaderListSizeLimitViolation indicates that the header list size is larger
// than the limit set by peer.
ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
+ // statusRawProto is a function to get to the raw status proto wrapped in a
+ // status.Status without a proto.Clone().
+ statusRawProto = internal.StatusRawProto.(func(*status.Status) *spb.Status)
)
// http2Server implements the ServerTransport interface with HTTP2.
type http2Server struct {
ctx context.Context
- ctxDone <-chan struct{} // Cache the context.Done() chan
- cancel context.CancelFunc
+ done chan struct{}
conn net.Conn
loopy *loopyWriter
readerDone chan struct{} // sync point to enable testing.
@@ -119,6 +123,7 @@
// Fields below are for channelz metric collection.
channelzID int64 // channelz unique identification number
czData *channelzData
+ bufferPool *bufferPool
}
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
@@ -132,7 +137,10 @@
}
framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
// Send initial settings as connection preface to client.
- var isettings []http2.Setting
+ isettings := []http2.Setting{{
+ ID: http2.SettingMaxFrameSize,
+ Val: http2MaxFrameLen,
+ }}
// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
// permitted in the HTTP2 spec.
maxStreams := config.MaxStreams
@@ -197,11 +205,10 @@
if kep.MinTime == 0 {
kep.MinTime = defaultKeepalivePolicyMinTime
}
- ctx, cancel := context.WithCancel(context.Background())
+ done := make(chan struct{})
t := &http2Server{
- ctx: ctx,
- cancel: cancel,
- ctxDone: ctx.Done(),
+ ctx: context.Background(),
+ done: done,
conn: conn,
remoteAddr: conn.RemoteAddr(),
localAddr: conn.LocalAddr(),
@@ -220,8 +227,9 @@
kep: kep,
initialWindowSize: iwz,
czData: new(channelzData),
+ bufferPool: newBufferPool(),
}
- t.controlBuf = newControlBuffer(t.ctxDone)
+ t.controlBuf = newControlBuffer(t.done)
if dynamicWindow {
t.bdpEst = &bdpEstimator{
bdp: initialWindowSize,
@@ -286,7 +294,9 @@
// operateHeader takes action on the decoded headers.
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
streamID := frame.Header().StreamID
- state := decodeState{serverSide: true}
+ state := &decodeState{
+ serverSide: true,
+ }
if err := state.decodeHeader(frame); err != nil {
if se, ok := status.FromError(err); ok {
t.controlBuf.put(&cleanupStream{
@@ -305,16 +315,16 @@
st: t,
buf: buf,
fc: &inFlow{limit: uint32(t.initialWindowSize)},
- recvCompress: state.encoding,
- method: state.method,
- contentSubtype: state.contentSubtype,
+ recvCompress: state.data.encoding,
+ method: state.data.method,
+ contentSubtype: state.data.contentSubtype,
}
if frame.StreamEnded() {
// s is just created by the caller. No lock needed.
s.state = streamReadDone
}
- if state.timeoutSet {
- s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout)
+ if state.data.timeoutSet {
+ s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout)
} else {
s.ctx, s.cancel = context.WithCancel(t.ctx)
}
@@ -327,19 +337,19 @@
}
s.ctx = peer.NewContext(s.ctx, pr)
// Attach the received metadata to the context.
- if len(state.mdata) > 0 {
- s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata)
+ if len(state.data.mdata) > 0 {
+ s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata)
}
- if state.statsTags != nil {
- s.ctx = stats.SetIncomingTags(s.ctx, state.statsTags)
+ if state.data.statsTags != nil {
+ s.ctx = stats.SetIncomingTags(s.ctx, state.data.statsTags)
}
- if state.statsTrace != nil {
- s.ctx = stats.SetIncomingTrace(s.ctx, state.statsTrace)
+ if state.data.statsTrace != nil {
+ s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace)
}
if t.inTapHandle != nil {
var err error
info := &tap.Info{
- FullMethodName: state.method,
+ FullMethodName: state.data.method,
}
s.ctx, err = t.inTapHandle(s.ctx, info)
if err != nil {
@@ -350,12 +360,14 @@
rstCode: http2.ErrCodeRefusedStream,
onWrite: func() {},
})
+ s.cancel()
return false
}
}
t.mu.Lock()
if t.state != reachable {
t.mu.Unlock()
+ s.cancel()
return false
}
if uint32(len(t.activeStreams)) >= t.maxStreams {
@@ -366,12 +378,14 @@
rstCode: http2.ErrCodeRefusedStream,
onWrite: func() {},
})
+ s.cancel()
return false
}
if streamID%2 != 1 || streamID <= t.maxStreamID {
t.mu.Unlock()
// illegal gRPC stream id.
errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
+ s.cancel()
return true
}
t.maxStreamID = streamID
@@ -403,9 +417,10 @@
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
s.trReader = &transportReader{
reader: &recvBufferReader{
- ctx: s.ctx,
- ctxDone: s.ctxDone,
- recv: s.buf,
+ ctx: s.ctx,
+ ctxDone: s.ctxDone,
+ recv: s.buf,
+ freeBuffer: t.bufferPool.put,
},
windowHandler: func(n int) {
t.updateWindow(s, uint32(n))
@@ -426,6 +441,7 @@
func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
defer close(t.readerDone)
for {
+ t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame()
atomic.StoreUint32(&t.activity, 1)
if err != nil {
@@ -435,7 +451,7 @@
s := t.activeStreams[se.StreamID]
t.mu.Unlock()
if s != nil {
- t.closeStream(s, true, se.Code, nil, false)
+ t.closeStream(s, true, se.Code, false)
} else {
t.controlBuf.put(&cleanupStream{
streamID: se.StreamID,
@@ -577,7 +593,7 @@
}
if size > 0 {
if err := s.fc.onData(size); err != nil {
- t.closeStream(s, true, http2.ErrCodeFlowControl, nil, false)
+ t.closeStream(s, true, http2.ErrCodeFlowControl, false)
return
}
if f.Header().Flags.Has(http2.FlagDataPadded) {
@@ -589,9 +605,10 @@
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
if len(f.Data()) > 0 {
- data := make([]byte, len(f.Data()))
- copy(data, f.Data())
- s.write(recvMsg{data: data})
+ buffer := t.bufferPool.get()
+ buffer.Reset()
+ buffer.Write(f.Data())
+ s.write(recvMsg{buffer: buffer})
}
}
if f.Header().Flags.Has(http2.FlagDataEndStream) {
@@ -602,11 +619,18 @@
}
func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
- s, ok := t.getStream(f)
- if !ok {
+ // If the stream is not deleted from the transport's active streams map, then do a regular close stream.
+ if s, ok := t.getStream(f); ok {
+ t.closeStream(s, false, 0, false)
return
}
- t.closeStream(s, false, 0, nil, false)
+ // If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
+ t.controlBuf.put(&cleanupStream{
+ streamID: f.Header().StreamID,
+ rst: false,
+ rstCode: 0,
+ onWrite: func() {},
+ })
}
func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
@@ -748,6 +772,10 @@
return nil
}
+func (t *http2Server) setResetPingStrikes() {
+ atomic.StoreUint32(&t.resetPingStrikes, 1)
+}
+
func (t *http2Server) writeHeaderLocked(s *Stream) error {
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
// first and create a slice of that exact size.
@@ -762,15 +790,13 @@
streamID: s.id,
hf: headerFields,
endStream: false,
- onWrite: func() {
- atomic.StoreUint32(&t.resetPingStrikes, 1)
- },
+ onWrite: t.setResetPingStrikes,
})
if !success {
if err != nil {
return err
}
- t.closeStream(s, true, http2.ErrCodeInternal, nil, false)
+ t.closeStream(s, true, http2.ErrCodeInternal, false)
return ErrHeaderListSizeLimitViolation
}
if t.stats != nil {
@@ -808,7 +834,7 @@
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
- if p := st.Proto(); p != nil && len(p.Details) > 0 {
+ if p := statusRawProto(st); p != nil && len(p.Details) > 0 {
stBytes, err := proto.Marshal(p)
if err != nil {
// TODO: return error instead, when callers are able to handle it.
@@ -824,9 +850,7 @@
streamID: s.id,
hf: headerFields,
endStream: true,
- onWrite: func() {
- atomic.StoreUint32(&t.resetPingStrikes, 1)
- },
+ onWrite: t.setResetPingStrikes,
}
s.hdrMu.Unlock()
success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
@@ -834,10 +858,12 @@
if err != nil {
return err
}
- t.closeStream(s, true, http2.ErrCodeInternal, nil, false)
+ t.closeStream(s, true, http2.ErrCodeInternal, false)
return ErrHeaderListSizeLimitViolation
}
- t.closeStream(s, false, 0, trailingHeader, true)
+ // Send a RST_STREAM after the trailers if the client has not already half-closed.
+ rst := s.getState() == streamActive
+ t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
if t.stats != nil {
t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
}
@@ -849,6 +875,9 @@
func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
if !s.isHeaderSent() { // Headers haven't been written yet.
if err := t.WriteHeader(s, nil); err != nil {
+ if _, ok := err.(ConnectionError); ok {
+ return err
+ }
// TODO(mmukhi, dfawley): Make sure this is the right code to return.
return status.Errorf(codes.Internal, "transport: %v", err)
}
@@ -858,7 +887,7 @@
// TODO(mmukhi, dfawley): Should the server write also return io.EOF?
s.cancel()
select {
- case <-t.ctx.Done():
+ case <-t.done:
return ErrConnClosing
default:
}
@@ -873,16 +902,14 @@
hdr = append(hdr, data[:emptyLen]...)
data = data[emptyLen:]
df := &dataFrame{
- streamID: s.id,
- h: hdr,
- d: data,
- onEachWrite: func() {
- atomic.StoreUint32(&t.resetPingStrikes, 1)
- },
+ streamID: s.id,
+ h: hdr,
+ d: data,
+ onEachWrite: t.setResetPingStrikes,
}
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
select {
- case <-t.ctx.Done():
+ case <-t.done:
return ErrConnClosing
default:
}
@@ -944,10 +971,11 @@
select {
case <-maxAge.C:
// Close the connection after grace period.
+ infof("transport: closing server transport due to maximum connection age.")
t.Close()
// Resetting the timer so that the clean-up doesn't deadlock.
maxAge.Reset(infinity)
- case <-t.ctx.Done():
+ case <-t.done:
}
return
case <-keepalive.C:
@@ -957,6 +985,7 @@
continue
}
if pingSent {
+ infof("transport: closing server transport due to idleness.")
t.Close()
// Resetting the timer so that the clean-up doesn't deadlock.
keepalive.Reset(infinity)
@@ -968,7 +997,7 @@
}
t.controlBuf.put(p)
keepalive.Reset(t.kp.Timeout)
- case <-t.ctx.Done():
+ case <-t.done:
return
}
}
@@ -988,7 +1017,7 @@
t.activeStreams = nil
t.mu.Unlock()
t.controlBuf.finish()
- t.cancel()
+ close(t.done)
err := t.conn.Close()
if channelz.IsOn() {
channelz.RemoveEntry(t.channelzID)
@@ -1006,15 +1035,17 @@
// deleteStream deletes the stream s from transport's active streams.
func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
- t.mu.Lock()
- if _, ok := t.activeStreams[s.id]; !ok {
- t.mu.Unlock()
- return
- }
+ // In case stream sending and receiving are invoked in separate
+ // goroutines (e.g., bi-directional streaming), cancel needs to be
+ // called to interrupt the potential blocking on other goroutines.
+ s.cancel()
- delete(t.activeStreams, s.id)
- if len(t.activeStreams) == 0 {
- t.idle = time.Now()
+ t.mu.Lock()
+ if _, ok := t.activeStreams[s.id]; ok {
+ delete(t.activeStreams, s.id)
+ if len(t.activeStreams) == 0 {
+ t.idle = time.Now()
+ }
}
t.mu.Unlock()
@@ -1027,51 +1058,36 @@
}
}
-// closeStream clears the footprint of a stream when the stream is not needed
-// any more.
-func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
- // Mark the stream as done
+// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
+func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
oldState := s.swapState(streamDone)
+ if oldState == streamDone {
+ // If the stream was already done, return.
+ return
+ }
- // In case stream sending and receiving are invoked in separate
- // goroutines (e.g., bi-directional streaming), cancel needs to be
- // called to interrupt the potential blocking on other goroutines.
- s.cancel()
+ hdr.cleanup = &cleanupStream{
+ streamID: s.id,
+ rst: rst,
+ rstCode: rstCode,
+ onWrite: func() {
+ t.deleteStream(s, eosReceived)
+ },
+ }
+ t.controlBuf.put(hdr)
+}
- // Deletes the stream from active streams
+// closeStream clears the footprint of a stream when the stream is not needed any more.
+func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
+ s.swapState(streamDone)
t.deleteStream(s, eosReceived)
- cleanup := &cleanupStream{
+ t.controlBuf.put(&cleanupStream{
streamID: s.id,
rst: rst,
rstCode: rstCode,
onWrite: func() {},
- }
-
- // No trailer. Puts cleanupFrame into transport's control buffer.
- if hdr == nil {
- t.controlBuf.put(cleanup)
- return
- }
-
- // We do the check here, because of the following scenario:
- // 1. closeStream is called first with a trailer. A trailer item with a piggybacked cleanup item
- // is put to control buffer.
- // 2. Loopy writer is waiting on a stream quota. It will never get it because client errored at
- // some point. So loopy can't act on trailer
- // 3. Client sends a RST_STREAM due to the error. Then closeStream is called without a trailer as
- // the result of the received RST_STREAM.
- // If we do this check at the beginning of the closeStream, then we won't put a cleanup item in
- // response to received RST_STREAM into the control buffer and outStream in loopy writer will
- // never get cleaned up.
-
- // If the stream is already done, don't send the trailer.
- if oldState == streamDone {
- return
- }
-
- hdr.cleanup = cleanup
- t.controlBuf.put(hdr)
+ })
}
func (t *http2Server) RemoteAddr() net.Addr {
@@ -1141,7 +1157,7 @@
select {
case <-t.drainChan:
case <-timer.C:
- case <-t.ctx.Done():
+ case <-t.done:
return
}
t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
@@ -1191,7 +1207,7 @@
select {
case sz := <-resp:
return int64(sz)
- case <-t.ctxDone:
+ case <-t.done:
return -1
case <-timer.C:
return -2
diff --git a/vendor/google.golang.org/grpc/internal/transport/http_util.go b/vendor/google.golang.org/grpc/internal/transport/http_util.go
index 77a2cfa..8f5f334 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http_util.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http_util.go
@@ -78,7 +78,8 @@
codes.ResourceExhausted: http2.ErrCodeEnhanceYourCalm,
codes.PermissionDenied: http2.ErrCodeInadequateSecurity,
}
- httpStatusConvTab = map[int]codes.Code{
+ // HTTPStatusConvTab is the HTTP status code to gRPC error code conversion table.
+ HTTPStatusConvTab = map[int]codes.Code{
// 400 Bad Request - INTERNAL.
http.StatusBadRequest: codes.Internal,
// 401 Unauthorized - UNAUTHENTICATED.
@@ -98,9 +99,7 @@
}
)
-// Records the states during HPACK decoding. Must be reset once the
-// decoding of the entire headers are finished.
-type decodeState struct {
+type parsedHeaderData struct {
encoding string
// statusGen caches the stream status received from the trailer the server
// sent. Client side only. Do not access directly. After all trailers are
@@ -120,8 +119,30 @@
statsTags []byte
statsTrace []byte
contentSubtype string
+
+ // isGRPC field indicates whether the peer is speaking gRPC (otherwise HTTP).
+ //
+ // We are in gRPC mode (peer speaking gRPC) if:
+ // * We are client side and have already received a HEADER frame that indicates gRPC peer.
+ // * The header contains valid a content-type, i.e. a string starts with "application/grpc"
+ // And we should handle error specific to gRPC.
+ //
+ // Otherwise (i.e. a content-type string starts without "application/grpc", or does not exist), we
+ // are in HTTP fallback mode, and should handle error specific to HTTP.
+ isGRPC bool
+ grpcErr error
+ httpErr error
+ contentTypeErr string
+}
+
+// decodeState configures decoding criteria and records the decoded data.
+type decodeState struct {
// whether decoding on server side or not
serverSide bool
+
+ // Records the states during HPACK decoding. It will be filled with info parsed from HTTP HEADERS
+ // frame once decodeHeader function has been invoked and returned.
+ data parsedHeaderData
}
// isReservedHeader checks whether hdr belongs to HTTP2 headers
@@ -202,11 +223,11 @@
}
func (d *decodeState) status() *status.Status {
- if d.statusGen == nil {
+ if d.data.statusGen == nil {
// No status-details were provided; generate status using code/msg.
- d.statusGen = status.New(codes.Code(int32(*(d.rawStatusCode))), d.rawStatusMsg)
+ d.data.statusGen = status.New(codes.Code(int32(*(d.data.rawStatusCode))), d.data.rawStatusMsg)
}
- return d.statusGen
+ return d.data.statusGen
}
const binHdrSuffix = "-bin"
@@ -244,113 +265,146 @@
if frame.Truncated {
return status.Error(codes.Internal, "peer header list size exceeded limit")
}
+
for _, hf := range frame.Fields {
- if err := d.processHeaderField(hf); err != nil {
- return err
+ d.processHeaderField(hf)
+ }
+
+ if d.data.isGRPC {
+ if d.data.grpcErr != nil {
+ return d.data.grpcErr
}
- }
-
- if d.serverSide {
+ if d.serverSide {
+ return nil
+ }
+ if d.data.rawStatusCode == nil && d.data.statusGen == nil {
+ // gRPC status doesn't exist.
+ // Set rawStatusCode to be unknown and return nil error.
+ // So that, if the stream has ended this Unknown status
+ // will be propagated to the user.
+ // Otherwise, it will be ignored. In which case, status from
+ // a later trailer, that has StreamEnded flag set, is propagated.
+ code := int(codes.Unknown)
+ d.data.rawStatusCode = &code
+ }
return nil
}
- // If grpc status exists, no need to check further.
- if d.rawStatusCode != nil || d.statusGen != nil {
- return nil
+ // HTTP fallback mode
+ if d.data.httpErr != nil {
+ return d.data.httpErr
}
- // If grpc status doesn't exist and http status doesn't exist,
- // then it's a malformed header.
- if d.httpStatus == nil {
- return status.Error(codes.Internal, "malformed header: doesn't contain status(gRPC or HTTP)")
- }
+ var (
+ code = codes.Internal // when header does not include HTTP status, return INTERNAL
+ ok bool
+ )
- if *(d.httpStatus) != http.StatusOK {
- code, ok := httpStatusConvTab[*(d.httpStatus)]
+ if d.data.httpStatus != nil {
+ code, ok = HTTPStatusConvTab[*(d.data.httpStatus)]
if !ok {
code = codes.Unknown
}
- return status.Error(code, http.StatusText(*(d.httpStatus)))
}
- // gRPC status doesn't exist and http status is OK.
- // Set rawStatusCode to be unknown and return nil error.
- // So that, if the stream has ended this Unknown status
- // will be propagated to the user.
- // Otherwise, it will be ignored. In which case, status from
- // a later trailer, that has StreamEnded flag set, is propagated.
- code := int(codes.Unknown)
- d.rawStatusCode = &code
- return nil
+ return status.Error(code, d.constructHTTPErrMsg())
+}
+
+// constructErrMsg constructs error message to be returned in HTTP fallback mode.
+// Format: HTTP status code and its corresponding message + content-type error message.
+func (d *decodeState) constructHTTPErrMsg() string {
+ var errMsgs []string
+
+ if d.data.httpStatus == nil {
+ errMsgs = append(errMsgs, "malformed header: missing HTTP status")
+ } else {
+ errMsgs = append(errMsgs, fmt.Sprintf("%s: HTTP status code %d", http.StatusText(*(d.data.httpStatus)), *d.data.httpStatus))
+ }
+
+ if d.data.contentTypeErr == "" {
+ errMsgs = append(errMsgs, "transport: missing content-type field")
+ } else {
+ errMsgs = append(errMsgs, d.data.contentTypeErr)
+ }
+
+ return strings.Join(errMsgs, "; ")
}
func (d *decodeState) addMetadata(k, v string) {
- if d.mdata == nil {
- d.mdata = make(map[string][]string)
+ if d.data.mdata == nil {
+ d.data.mdata = make(map[string][]string)
}
- d.mdata[k] = append(d.mdata[k], v)
+ d.data.mdata[k] = append(d.data.mdata[k], v)
}
-func (d *decodeState) processHeaderField(f hpack.HeaderField) error {
+func (d *decodeState) processHeaderField(f hpack.HeaderField) {
switch f.Name {
case "content-type":
contentSubtype, validContentType := contentSubtype(f.Value)
if !validContentType {
- return status.Errorf(codes.Internal, "transport: received the unexpected content-type %q", f.Value)
+ d.data.contentTypeErr = fmt.Sprintf("transport: received the unexpected content-type %q", f.Value)
+ return
}
- d.contentSubtype = contentSubtype
+ d.data.contentSubtype = contentSubtype
// TODO: do we want to propagate the whole content-type in the metadata,
// or come up with a way to just propagate the content-subtype if it was set?
// ie {"content-type": "application/grpc+proto"} or {"content-subtype": "proto"}
// in the metadata?
d.addMetadata(f.Name, f.Value)
+ d.data.isGRPC = true
case "grpc-encoding":
- d.encoding = f.Value
+ d.data.encoding = f.Value
case "grpc-status":
code, err := strconv.Atoi(f.Value)
if err != nil {
- return status.Errorf(codes.Internal, "transport: malformed grpc-status: %v", err)
+ d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status: %v", err)
+ return
}
- d.rawStatusCode = &code
+ d.data.rawStatusCode = &code
case "grpc-message":
- d.rawStatusMsg = decodeGrpcMessage(f.Value)
+ d.data.rawStatusMsg = decodeGrpcMessage(f.Value)
case "grpc-status-details-bin":
v, err := decodeBinHeader(f.Value)
if err != nil {
- return status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
+ d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
+ return
}
s := &spb.Status{}
if err := proto.Unmarshal(v, s); err != nil {
- return status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
+ d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
+ return
}
- d.statusGen = status.FromProto(s)
+ d.data.statusGen = status.FromProto(s)
case "grpc-timeout":
- d.timeoutSet = true
+ d.data.timeoutSet = true
var err error
- if d.timeout, err = decodeTimeout(f.Value); err != nil {
- return status.Errorf(codes.Internal, "transport: malformed time-out: %v", err)
+ if d.data.timeout, err = decodeTimeout(f.Value); err != nil {
+ d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed time-out: %v", err)
}
case ":path":
- d.method = f.Value
+ d.data.method = f.Value
case ":status":
code, err := strconv.Atoi(f.Value)
if err != nil {
- return status.Errorf(codes.Internal, "transport: malformed http-status: %v", err)
+ d.data.httpErr = status.Errorf(codes.Internal, "transport: malformed http-status: %v", err)
+ return
}
- d.httpStatus = &code
+ d.data.httpStatus = &code
case "grpc-tags-bin":
v, err := decodeBinHeader(f.Value)
if err != nil {
- return status.Errorf(codes.Internal, "transport: malformed grpc-tags-bin: %v", err)
+ d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-tags-bin: %v", err)
+ return
}
- d.statsTags = v
+ d.data.statsTags = v
d.addMetadata(f.Name, string(v))
case "grpc-trace-bin":
v, err := decodeBinHeader(f.Value)
if err != nil {
- return status.Errorf(codes.Internal, "transport: malformed grpc-trace-bin: %v", err)
+ d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-trace-bin: %v", err)
+ return
}
- d.statsTrace = v
+ d.data.statsTrace = v
d.addMetadata(f.Name, string(v))
default:
if isReservedHeader(f.Name) && !isWhitelistedHeader(f.Name) {
@@ -359,11 +413,10 @@
v, err := decodeMetadataHeader(f.Name, f.Value)
if err != nil {
errorf("Failed to decode metadata header (%q, %q): %v", f.Name, f.Value, err)
- return nil
+ return
}
d.addMetadata(f.Name, v)
}
- return nil
}
type timeoutUnit uint8
@@ -614,6 +667,7 @@
writer: w,
fr: http2.NewFramer(w, r),
}
+ f.fr.SetMaxReadFrameSize(http2MaxFrameLen)
// Opt-in to Frame reuse API on framer to reduce garbage.
// Frames aren't safe to read from after a subsequent call to ReadFrame.
f.fr.SetReuseFrames()
diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go
index 2580aa7..1c1d106 100644
--- a/vendor/google.golang.org/grpc/internal/transport/transport.go
+++ b/vendor/google.golang.org/grpc/internal/transport/transport.go
@@ -22,6 +22,7 @@
package transport
import (
+ "bytes"
"context"
"errors"
"fmt"
@@ -39,10 +40,32 @@
"google.golang.org/grpc/tap"
)
+type bufferPool struct {
+ pool sync.Pool
+}
+
+func newBufferPool() *bufferPool {
+ return &bufferPool{
+ pool: sync.Pool{
+ New: func() interface{} {
+ return new(bytes.Buffer)
+ },
+ },
+ }
+}
+
+func (p *bufferPool) get() *bytes.Buffer {
+ return p.pool.Get().(*bytes.Buffer)
+}
+
+func (p *bufferPool) put(b *bytes.Buffer) {
+ p.pool.Put(b)
+}
+
// recvMsg represents the received msg from the transport. All transport
// protocol specific info has been removed.
type recvMsg struct {
- data []byte
+ buffer *bytes.Buffer
// nil: received some data
// io.EOF: stream is completed. data is nil.
// other non-nil error: transport failure. data is nil.
@@ -117,8 +140,9 @@
ctx context.Context
ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
recv *recvBuffer
- last []byte // Stores the remaining data in the previous calls.
+ last *bytes.Buffer // Stores the remaining data in the previous calls.
err error
+ freeBuffer func(*bytes.Buffer)
}
// Read reads the next len(p) bytes from last. If last is drained, it tries to
@@ -128,10 +152,13 @@
if r.err != nil {
return 0, r.err
}
- if r.last != nil && len(r.last) > 0 {
+ if r.last != nil {
// Read remaining data left in last call.
- copied := copy(p, r.last)
- r.last = r.last[copied:]
+ copied, _ := r.last.Read(p)
+ if r.last.Len() == 0 {
+ r.freeBuffer(r.last)
+ r.last = nil
+ }
return copied, nil
}
if r.closeStream != nil {
@@ -157,6 +184,19 @@
// r.readAdditional acts on that message and returns the necessary error.
select {
case <-r.ctxDone:
+ // Note that this adds the ctx error to the end of recv buffer, and
+ // reads from the head. This will delay the error until recv buffer is
+ // empty, thus will delay ctx cancellation in Recv().
+ //
+ // It's done this way to fix a race between ctx cancel and trailer. The
+ // race was, stream.Recv() may return ctx error if ctxDone wins the
+ // race, but stream.Trailer() may return a non-nil md because the stream
+ // was not marked as done when trailer is received. This closeStream
+ // call will mark stream as done, thus fix the race.
+ //
+ // TODO: delaying ctx error seems like a unnecessary side effect. What
+ // we really want is to mark the stream as done, and return ctx error
+ // faster.
r.closeStream(ContextErr(r.ctx.Err()))
m := <-r.recv.get()
return r.readAdditional(m, p)
@@ -170,8 +210,13 @@
if m.err != nil {
return 0, m.err
}
- copied := copy(p, m.data)
- r.last = m.data[copied:]
+ copied, _ := m.buffer.Read(p)
+ if m.buffer.Len() == 0 {
+ r.freeBuffer(m.buffer)
+ r.last = nil
+ } else {
+ r.last = m.buffer
+ }
return copied, nil
}
@@ -204,8 +249,8 @@
// is used to adjust flow control, if needed.
requestRead func(int)
- headerChan chan struct{} // closed to indicate the end of header metadata.
- headerDone uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
+ headerChan chan struct{} // closed to indicate the end of header metadata.
+ headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
// hdrMu protects header and trailer metadata on the server-side.
hdrMu sync.Mutex
@@ -266,6 +311,14 @@
}
select {
case <-s.ctx.Done():
+ // We prefer success over failure when reading messages because we delay
+ // context error in stream.Read(). To keep behavior consistent, we also
+ // prefer success here.
+ select {
+ case <-s.headerChan:
+ return nil
+ default:
+ }
return ContextErr(s.ctx.Err())
case <-s.headerChan:
return nil
@@ -327,8 +380,7 @@
if err != nil {
return false, err
}
- // if !headerDone, some other connection error occurred.
- return s.noHeaders && atomic.LoadUint32(&s.headerDone) == 1, nil
+ return s.noHeaders, nil
}
// Trailer returns the cached trailer metedata. Note that if it is not called
@@ -579,9 +631,12 @@
// is called only once.
Close() error
- // GracefulClose starts to tear down the transport. It stops accepting
- // new RPCs and wait the completion of the pending RPCs.
- GracefulClose() error
+ // GracefulClose starts to tear down the transport: the transport will stop
+ // accepting new RPCs and NewStream will return error. Once all streams are
+ // finished, the transport will close.
+ //
+ // It does not block.
+ GracefulClose()
// Write sends the data for the given stream. A nil stream indicates
// the write is to be performed on the transport as a whole.
@@ -611,6 +666,9 @@
// GetGoAwayReason returns the reason why GoAway frame was received.
GetGoAwayReason() GoAwayReason
+ // RemoteAddr returns the remote network address.
+ RemoteAddr() net.Addr
+
// IncrMsgSent increments the number of message sent through this transport.
IncrMsgSent()