[VOL-5291] On demand statistics for ONU and OLT
Change-Id: I4850bb0f0d2235122cb0c1bcf835b3672bb34436
Signed-off-by: Akash Reddy Kankanala <akash.kankanala@radisys.com>
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index 40abdac..1009268 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -36,8 +36,10 @@
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/grpcutil"
+ imetadata "google.golang.org/grpc/internal/metadata"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/serviceconfig"
+ istatus "google.golang.org/grpc/internal/status"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
@@ -46,10 +48,12 @@
)
// StreamHandler defines the handler called by gRPC server to complete the
-// execution of a streaming RPC. If a StreamHandler returns an error, it
-// should be produced by the status package, or else gRPC will use
-// codes.Unknown as the status code and err.Error() as the status message
-// of the RPC.
+// execution of a streaming RPC.
+//
+// If a StreamHandler returns an error, it should either be produced by the
+// status package, or be one of the context errors. Otherwise, gRPC will use
+// codes.Unknown as the status code and err.Error() as the status message of the
+// RPC.
type StreamHandler func(srv interface{}, stream ServerStream) error
// StreamDesc represents a streaming RPC service's method specification. Used
@@ -119,6 +123,9 @@
// calling RecvMsg on the same stream at the same time, but it is not safe
// to call SendMsg on the same stream in different goroutines. It is also
// not safe to call CloseSend concurrently with SendMsg.
+ //
+ // It is not safe to modify the message after calling SendMsg. Tracing
+ // libraries and stats handlers may use the message lazily.
SendMsg(m interface{}) error
// RecvMsg blocks until it receives a message into m or the stream is
// done. It returns io.EOF when the stream completes successfully. On
@@ -148,6 +155,11 @@
// If none of the above happen, a goroutine and a context will be leaked, and grpc
// will not call the optionally-configured stats handler with a stats.End message.
func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
+ if err := cc.idlenessMgr.onCallBegin(); err != nil {
+ return nil, err
+ }
+ defer cc.idlenessMgr.onCallEnd()
+
// allow interceptor to see all applicable call options, which means those
// configured as defaults from dial option as well as per-call options
opts = combine(cc.dopts.callOptions, opts)
@@ -164,6 +176,20 @@
}
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
+ if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
+ // validate md
+ if err := imetadata.Validate(md); err != nil {
+ return nil, status.Error(codes.Internal, err.Error())
+ }
+ // validate added
+ for _, kvs := range added {
+ for i := 0; i < len(kvs); i += 2 {
+ if err := imetadata.ValidatePair(kvs[i], kvs[i+1]); err != nil {
+ return nil, status.Error(codes.Internal, err.Error())
+ }
+ }
+ }
+ }
if channelz.IsOn() {
cc.incrCallsStarted()
defer func() {
@@ -187,6 +213,13 @@
rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method}
rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo)
if err != nil {
+ if st, ok := status.FromError(err); ok {
+ // Restrict the code to the list allowed by gRFC A54.
+ if istatus.IsRestrictedControlPlaneCode(st) {
+ err = status.Errorf(codes.Internal, "config selector returned illegal status: %v", err)
+ }
+ return nil, err
+ }
return nil, toRPCErr(err)
}
@@ -293,20 +326,35 @@
if !cc.dopts.disableRetry {
cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
}
- cs.binlog = binarylog.GetMethodLogger(method)
-
- if err := cs.newAttemptLocked(false /* isTransparent */); err != nil {
- cs.finish(err)
- return nil, err
+ if ml := binarylog.GetMethodLogger(method); ml != nil {
+ cs.binlogs = append(cs.binlogs, ml)
+ }
+ if cc.dopts.binaryLogger != nil {
+ if ml := cc.dopts.binaryLogger.GetMethodLogger(method); ml != nil {
+ cs.binlogs = append(cs.binlogs, ml)
+ }
}
- op := func(a *csAttempt) error { return a.newStream() }
+ // Pick the transport to use and create a new stream on the transport.
+ // Assign cs.attempt upon success.
+ op := func(a *csAttempt) error {
+ if err := a.getTransport(); err != nil {
+ return err
+ }
+ if err := a.newStream(); err != nil {
+ return err
+ }
+ // Because this operation is always called either here (while creating
+ // the clientStream) or by the retry code while locked when replaying
+ // the operation, it is safe to access cs.attempt directly.
+ cs.attempt = a
+ return nil
+ }
if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
- cs.finish(err)
return nil, err
}
- if cs.binlog != nil {
+ if len(cs.binlogs) != 0 {
md, _ := metadata.FromOutgoingContext(ctx)
logEntry := &binarylog.ClientHeader{
OnClientSide: true,
@@ -320,7 +368,9 @@
logEntry.Timeout = 0
}
}
- cs.binlog.Log(logEntry)
+ for _, binlog := range cs.binlogs {
+ binlog.Log(cs.ctx, logEntry)
+ }
}
if desc != unaryStreamDesc {
@@ -341,14 +391,20 @@
return cs, nil
}
-// newAttemptLocked creates a new attempt with a transport.
-// If it succeeds, then it replaces clientStream's attempt with this new attempt.
-func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) {
+// newAttemptLocked creates a new csAttempt without a transport or stream.
+func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error) {
+ if err := cs.ctx.Err(); err != nil {
+ return nil, toRPCErr(err)
+ }
+ if err := cs.cc.ctx.Err(); err != nil {
+ return nil, ErrClientConnClosing
+ }
+
ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.cp, cs.comp)
method := cs.callHdr.Method
- sh := cs.cc.dopts.copts.StatsHandler
var beginTime time.Time
- if sh != nil {
+ shs := cs.cc.dopts.copts.StatsHandlers
+ for _, sh := range shs {
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast})
beginTime = time.Now()
begin := &stats.Begin{
@@ -377,58 +433,81 @@
ctx = trace.NewContext(ctx, trInfo.tr)
}
- newAttempt := &csAttempt{
- ctx: ctx,
- beginTime: beginTime,
- cs: cs,
- dc: cs.cc.dopts.dc,
- statsHandler: sh,
- trInfo: trInfo,
- }
- defer func() {
- if retErr != nil {
- // This attempt is not set in the clientStream, so it's finish won't
- // be called. Call it here for stats and trace in case they are not
- // nil.
- newAttempt.finish(retErr)
- }
- }()
-
- if err := ctx.Err(); err != nil {
- return toRPCErr(err)
- }
-
- if cs.cc.parsedTarget.Scheme == "xds" {
+ if cs.cc.parsedTarget.URL.Scheme == "xds" {
// Add extra metadata (metadata that will be added by transport) to context
// so the balancer can see them.
ctx = grpcutil.WithExtraMetadata(ctx, metadata.Pairs(
"content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype),
))
}
- t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method)
+
+ return &csAttempt{
+ ctx: ctx,
+ beginTime: beginTime,
+ cs: cs,
+ dc: cs.cc.dopts.dc,
+ statsHandlers: shs,
+ trInfo: trInfo,
+ }, nil
+}
+
+func (a *csAttempt) getTransport() error {
+ cs := a.cs
+
+ var err error
+ a.t, a.pickResult, err = cs.cc.getTransport(a.ctx, cs.callInfo.failFast, cs.callHdr.Method)
if err != nil {
+ if de, ok := err.(dropError); ok {
+ err = de.error
+ a.drop = true
+ }
return err
}
- if trInfo != nil {
- trInfo.firstLine.SetRemoteAddr(t.RemoteAddr())
+ if a.trInfo != nil {
+ a.trInfo.firstLine.SetRemoteAddr(a.t.RemoteAddr())
}
- newAttempt.t = t
- newAttempt.done = done
- cs.attempt = newAttempt
return nil
}
func (a *csAttempt) newStream() error {
cs := a.cs
cs.callHdr.PreviousAttempts = cs.numRetries
+
+ // Merge metadata stored in PickResult, if any, with existing call metadata.
+ // It is safe to overwrite the csAttempt's context here, since all state
+ // maintained in it are local to the attempt. When the attempt has to be
+ // retried, a new instance of csAttempt will be created.
+ if a.pickResult.Metadata != nil {
+ // We currently do not have a function it the metadata package which
+ // merges given metadata with existing metadata in a context. Existing
+ // function `AppendToOutgoingContext()` takes a variadic argument of key
+ // value pairs.
+ //
+ // TODO: Make it possible to retrieve key value pairs from metadata.MD
+ // in a form passable to AppendToOutgoingContext(), or create a version
+ // of AppendToOutgoingContext() that accepts a metadata.MD.
+ md, _ := metadata.FromOutgoingContext(a.ctx)
+ md = metadata.Join(md, a.pickResult.Metadata)
+ a.ctx = metadata.NewOutgoingContext(a.ctx, md)
+ }
+
s, err := a.t.NewStream(a.ctx, cs.callHdr)
if err != nil {
- // Return without converting to an RPC error so retry code can
- // inspect.
- return err
+ nse, ok := err.(*transport.NewStreamError)
+ if !ok {
+ // Unexpected.
+ return err
+ }
+
+ if nse.AllowTransparentRetry {
+ a.allowTransparentRetry = true
+ }
+
+ // Unwrap and convert error.
+ return toRPCErr(nse.Err)
}
- cs.attempt.s = s
- cs.attempt.p = &parser{r: s}
+ a.s = s
+ a.p = &parser{r: s}
return nil
}
@@ -454,7 +533,7 @@
retryThrottler *retryThrottler // The throttler active when the RPC began.
- binlog *binarylog.MethodLogger // Binary logger, can be nil.
+ binlogs []binarylog.MethodLogger
// serverHeaderBinlogged is a boolean for whether server header has been
// logged. Server header will be logged when the first time one of those
// happens: stream.Header(), stream.Recv().
@@ -486,12 +565,12 @@
// csAttempt implements a single transport stream attempt within a
// clientStream.
type csAttempt struct {
- ctx context.Context
- cs *clientStream
- t transport.ClientTransport
- s *transport.Stream
- p *parser
- done func(balancer.DoneInfo)
+ ctx context.Context
+ cs *clientStream
+ t transport.ClientTransport
+ s *transport.Stream
+ p *parser
+ pickResult balancer.PickResult
finished bool
dc Decompressor
@@ -504,8 +583,13 @@
// and cleared when the finish method is called.
trInfo *traceInfo
- statsHandler stats.Handler
- beginTime time.Time
+ statsHandlers []stats.Handler
+ beginTime time.Time
+
+ // set for newStream errors that may be transparently retried
+ allowTransparentRetry bool
+ // set for pick errors that are returned as a status
+ drop bool
}
func (cs *clientStream) commitAttemptLocked() {
@@ -525,41 +609,21 @@
// shouldRetry returns nil if the RPC should be retried; otherwise it returns
// the error that should be returned by the operation. If the RPC should be
// retried, the bool indicates whether it is being retried transparently.
-func (cs *clientStream) shouldRetry(err error) (bool, error) {
- if cs.attempt.s == nil {
- // Error from NewClientStream.
- nse, ok := err.(*transport.NewStreamError)
- if !ok {
- // Unexpected, but assume no I/O was performed and the RPC is not
- // fatal, so retry indefinitely.
- return true, nil
- }
+func (a *csAttempt) shouldRetry(err error) (bool, error) {
+ cs := a.cs
- // Unwrap and convert error.
- err = toRPCErr(nse.Err)
-
- // Never retry DoNotRetry errors, which indicate the RPC should not be
- // retried due to max header list size violation, etc.
- if nse.DoNotRetry {
- return false, err
- }
-
- // In the event of a non-IO operation error from NewStream, we never
- // attempted to write anything to the wire, so we can retry
- // indefinitely.
- if !nse.DoNotTransparentRetry {
- return true, nil
- }
- }
- if cs.finished || cs.committed {
- // RPC is finished or committed; cannot retry.
+ if cs.finished || cs.committed || a.drop {
+ // RPC is finished or committed or was dropped by the picker; cannot retry.
return false, err
}
+ if a.s == nil && a.allowTransparentRetry {
+ return true, nil
+ }
// Wait for the trailers.
unprocessed := false
- if cs.attempt.s != nil {
- <-cs.attempt.s.Done()
- unprocessed = cs.attempt.s.Unprocessed()
+ if a.s != nil {
+ <-a.s.Done()
+ unprocessed = a.s.Unprocessed()
}
if cs.firstAttempt && unprocessed {
// First attempt, stream unprocessed: transparently retry.
@@ -571,14 +635,14 @@
pushback := 0
hasPushback := false
- if cs.attempt.s != nil {
- if !cs.attempt.s.TrailersOnly() {
+ if a.s != nil {
+ if !a.s.TrailersOnly() {
return false, err
}
// TODO(retry): Move down if the spec changes to not check server pushback
// before considering this a failure for throttling.
- sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"]
+ sps := a.s.Trailer()["grpc-retry-pushback-ms"]
if len(sps) == 1 {
var e error
if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
@@ -595,10 +659,10 @@
}
var code codes.Code
- if cs.attempt.s != nil {
- code = cs.attempt.s.Status().Code()
+ if a.s != nil {
+ code = a.s.Status().Code()
} else {
- code = status.Convert(err).Code()
+ code = status.Code(err)
}
rp := cs.methodConfig.RetryPolicy
@@ -643,19 +707,24 @@
}
// Returns nil if a retry was performed and succeeded; error otherwise.
-func (cs *clientStream) retryLocked(lastErr error) error {
+func (cs *clientStream) retryLocked(attempt *csAttempt, lastErr error) error {
for {
- cs.attempt.finish(toRPCErr(lastErr))
- isTransparent, err := cs.shouldRetry(lastErr)
+ attempt.finish(toRPCErr(lastErr))
+ isTransparent, err := attempt.shouldRetry(lastErr)
if err != nil {
cs.commitAttemptLocked()
return err
}
cs.firstAttempt = false
- if err := cs.newAttemptLocked(isTransparent); err != nil {
+ attempt, err = cs.newAttemptLocked(isTransparent)
+ if err != nil {
+ // Only returns error if the clientconn is closed or the context of
+ // the stream is canceled.
return err
}
- if lastErr = cs.replayBufferLocked(); lastErr == nil {
+ // Note that the first op in the replay buffer always sets cs.attempt
+ // if it is able to pick a transport and create a stream.
+ if lastErr = cs.replayBufferLocked(attempt); lastErr == nil {
return nil
}
}
@@ -665,7 +734,10 @@
cs.commitAttempt()
// No need to lock before using attempt, since we know it is committed and
// cannot change.
- return cs.attempt.s.Context()
+ if cs.attempt.s != nil {
+ return cs.attempt.s.Context()
+ }
+ return cs.ctx
}
func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
@@ -679,6 +751,18 @@
// already be status errors.
return toRPCErr(op(cs.attempt))
}
+ if len(cs.buffer) == 0 {
+ // For the first op, which controls creation of the stream and
+ // assigns cs.attempt, we need to create a new attempt inline
+ // before executing the first op. On subsequent ops, the attempt
+ // is created immediately before replaying the ops.
+ var err error
+ if cs.attempt, err = cs.newAttemptLocked(false /* isTransparent */); err != nil {
+ cs.mu.Unlock()
+ cs.finish(err)
+ return err
+ }
+ }
a := cs.attempt
cs.mu.Unlock()
err := op(a)
@@ -695,7 +779,7 @@
cs.mu.Unlock()
return err
}
- if err := cs.retryLocked(err); err != nil {
+ if err := cs.retryLocked(a, err); err != nil {
cs.mu.Unlock()
return err
}
@@ -704,17 +788,25 @@
func (cs *clientStream) Header() (metadata.MD, error) {
var m metadata.MD
+ noHeader := false
err := cs.withRetry(func(a *csAttempt) error {
var err error
m, err = a.s.Header()
+ if err == transport.ErrNoHeaders {
+ noHeader = true
+ return nil
+ }
return toRPCErr(err)
}, cs.commitAttemptLocked)
+
if err != nil {
cs.finish(err)
return nil, err
}
- if cs.binlog != nil && !cs.serverHeaderBinlogged {
- // Only log if binary log is on and header has not been logged.
+
+ if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged && !noHeader {
+ // Only log if binary log is on and header has not been logged, and
+ // there is actually headers to log.
logEntry := &binarylog.ServerHeader{
OnClientSide: true,
Header: m,
@@ -723,10 +815,12 @@
if peer, ok := peer.FromContext(cs.Context()); ok {
logEntry.PeerAddr = peer.Addr
}
- cs.binlog.Log(logEntry)
cs.serverHeaderBinlogged = true
+ for _, binlog := range cs.binlogs {
+ binlog.Log(cs.ctx, logEntry)
+ }
}
- return m, err
+ return m, nil
}
func (cs *clientStream) Trailer() metadata.MD {
@@ -744,10 +838,9 @@
return cs.attempt.s.Trailer()
}
-func (cs *clientStream) replayBufferLocked() error {
- a := cs.attempt
+func (cs *clientStream) replayBufferLocked(attempt *csAttempt) error {
for _, f := range cs.buffer {
- if err := f(a); err != nil {
+ if err := f(attempt); err != nil {
return err
}
}
@@ -795,47 +888,48 @@
if len(payload) > *cs.callInfo.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
}
- msgBytes := data // Store the pointer before setting to nil. For binary logging.
op := func(a *csAttempt) error {
- err := a.sendMsg(m, hdr, payload, data)
- // nil out the message and uncomp when replaying; they are only needed for
- // stats which is disabled for subsequent attempts.
- m, data = nil, nil
- return err
+ return a.sendMsg(m, hdr, payload, data)
}
err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
- if cs.binlog != nil && err == nil {
- cs.binlog.Log(&binarylog.ClientMessage{
+ if len(cs.binlogs) != 0 && err == nil {
+ cm := &binarylog.ClientMessage{
OnClientSide: true,
- Message: msgBytes,
- })
+ Message: data,
+ }
+ for _, binlog := range cs.binlogs {
+ binlog.Log(cs.ctx, cm)
+ }
}
- return
+ return err
}
func (cs *clientStream) RecvMsg(m interface{}) error {
- if cs.binlog != nil && !cs.serverHeaderBinlogged {
+ if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged {
// Call Header() to binary log header if it's not already logged.
cs.Header()
}
var recvInfo *payloadInfo
- if cs.binlog != nil {
+ if len(cs.binlogs) != 0 {
recvInfo = &payloadInfo{}
}
err := cs.withRetry(func(a *csAttempt) error {
return a.recvMsg(m, recvInfo)
}, cs.commitAttemptLocked)
- if cs.binlog != nil && err == nil {
- cs.binlog.Log(&binarylog.ServerMessage{
+ if len(cs.binlogs) != 0 && err == nil {
+ sm := &binarylog.ServerMessage{
OnClientSide: true,
Message: recvInfo.uncompressedBytes,
- })
+ }
+ for _, binlog := range cs.binlogs {
+ binlog.Log(cs.ctx, sm)
+ }
}
if err != nil || !cs.desc.ServerStreams {
// err != nil or non-server-streaming indicates end of stream.
cs.finish(err)
- if cs.binlog != nil {
+ if len(cs.binlogs) != 0 {
// finish will not log Trailer. Log Trailer here.
logEntry := &binarylog.ServerTrailer{
OnClientSide: true,
@@ -848,7 +942,9 @@
if peer, ok := peer.FromContext(cs.Context()); ok {
logEntry.PeerAddr = peer.Addr
}
- cs.binlog.Log(logEntry)
+ for _, binlog := range cs.binlogs {
+ binlog.Log(cs.ctx, logEntry)
+ }
}
}
return err
@@ -869,10 +965,13 @@
return nil
}
cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) })
- if cs.binlog != nil {
- cs.binlog.Log(&binarylog.ClientHalfClose{
+ if len(cs.binlogs) != 0 {
+ chc := &binarylog.ClientHalfClose{
OnClientSide: true,
- })
+ }
+ for _, binlog := range cs.binlogs {
+ binlog.Log(cs.ctx, chc)
+ }
}
// We never returned an error here for reasons.
return nil
@@ -889,6 +988,9 @@
return
}
cs.finished = true
+ for _, onFinish := range cs.callInfo.onFinish {
+ onFinish(err)
+ }
cs.commitAttemptLocked()
if cs.attempt != nil {
cs.attempt.finish(err)
@@ -905,10 +1007,13 @@
//
// Only one of cancel or trailer needs to be logged. In the cases where
// users don't call RecvMsg, users must have already canceled the RPC.
- if cs.binlog != nil && status.Code(err) == codes.Canceled {
- cs.binlog.Log(&binarylog.Cancel{
+ if len(cs.binlogs) != 0 && status.Code(err) == codes.Canceled {
+ c := &binarylog.Cancel{
OnClientSide: true,
- })
+ }
+ for _, binlog := range cs.binlogs {
+ binlog.Log(cs.ctx, c)
+ }
}
if err == nil {
cs.retryThrottler.successfulRPC()
@@ -941,8 +1046,8 @@
}
return io.EOF
}
- if a.statsHandler != nil {
- a.statsHandler.HandleRPC(a.ctx, outPayload(true, m, data, payld, time.Now()))
+ for _, sh := range a.statsHandlers {
+ sh.HandleRPC(a.ctx, outPayload(true, m, data, payld, time.Now()))
}
if channelz.IsOn() {
a.t.IncrMsgSent()
@@ -952,7 +1057,7 @@
func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
cs := a.cs
- if a.statsHandler != nil && payInfo == nil {
+ if len(a.statsHandlers) != 0 && payInfo == nil {
payInfo = &payloadInfo{}
}
@@ -980,6 +1085,7 @@
}
return io.EOF // indicates successful end of stream.
}
+
return toRPCErr(err)
}
if a.trInfo != nil {
@@ -989,15 +1095,16 @@
}
a.mu.Unlock()
}
- if a.statsHandler != nil {
- a.statsHandler.HandleRPC(a.ctx, &stats.InPayload{
+ for _, sh := range a.statsHandlers {
+ sh.HandleRPC(a.ctx, &stats.InPayload{
Client: true,
RecvTime: time.Now(),
Payload: m,
// TODO truncate large payload.
- Data: payInfo.uncompressedBytes,
- WireLength: payInfo.wireLength + headerLen,
- Length: len(payInfo.uncompressedBytes),
+ Data: payInfo.uncompressedBytes,
+ WireLength: payInfo.compressedLength + headerLen,
+ CompressedLength: payInfo.compressedLength,
+ Length: len(payInfo.uncompressedBytes),
})
}
if channelz.IsOn() {
@@ -1036,12 +1143,12 @@
tr = a.s.Trailer()
}
- if a.done != nil {
+ if a.pickResult.Done != nil {
br := false
if a.s != nil {
br = a.s.BytesReceived()
}
- a.done(balancer.DoneInfo{
+ a.pickResult.Done(balancer.DoneInfo{
Err: err,
Trailer: tr,
BytesSent: a.s != nil,
@@ -1049,7 +1156,7 @@
ServerLoad: balancerload.Parse(tr),
})
}
- if a.statsHandler != nil {
+ for _, sh := range a.statsHandlers {
end := &stats.End{
Client: true,
BeginTime: a.beginTime,
@@ -1057,7 +1164,7 @@
Trailer: tr,
Error: err,
}
- a.statsHandler.HandleRPC(a.ctx, end)
+ sh.HandleRPC(a.ctx, end)
}
if a.trInfo != nil && a.trInfo.tr != nil {
if err == nil {
@@ -1166,14 +1273,19 @@
as.p = &parser{r: s}
ac.incrCallsStarted()
if desc != unaryStreamDesc {
- // Listen on cc and stream contexts to cleanup when the user closes the
- // ClientConn or cancels the stream context. In all other cases, an error
- // should already be injected into the recv buffer by the transport, which
- // the client will eventually receive, and then we will cancel the stream's
- // context in clientStream.finish.
+ // Listen on stream context to cleanup when the stream context is
+ // canceled. Also listen for the addrConn's context in case the
+ // addrConn is closed or reconnects to a different address. In all
+ // other cases, an error should already be injected into the recv
+ // buffer by the transport, which the client will eventually receive,
+ // and then we will cancel the stream's context in
+ // addrConnStream.finish.
go func() {
+ ac.mu.Lock()
+ acCtx := ac.ctx
+ ac.mu.Unlock()
select {
- case <-ac.ctx.Done():
+ case <-acCtx.Done():
as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
case <-ctx.Done():
as.finish(toRPCErr(ctx.Err()))
@@ -1362,8 +1474,10 @@
// ServerStream defines the server-side behavior of a streaming RPC.
//
-// All errors returned from ServerStream methods are compatible with the
-// status package.
+// Errors returned from ServerStream methods are compatible with the status
+// package. However, the status code will often not match the RPC status as
+// seen by the client application, and therefore, should not be relied upon for
+// this purpose.
type ServerStream interface {
// SetHeader sets the header metadata. It may be called multiple times.
// When call multiple times, all the provided metadata will be merged.
@@ -1395,6 +1509,9 @@
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not safe
// to call SendMsg on the same stream in different goroutines.
+ //
+ // It is not safe to modify the message after calling SendMsg. Tracing
+ // libraries and stats handlers may use the message lazily.
SendMsg(m interface{}) error
// RecvMsg blocks until it receives a message into m or the stream is
// done. It returns io.EOF when the client has performed a CloseSend. On
@@ -1420,13 +1537,15 @@
comp encoding.Compressor
decomp encoding.Compressor
+ sendCompressorName string
+
maxReceiveMessageSize int
maxSendMessageSize int
trInfo *traceInfo
- statsHandler stats.Handler
+ statsHandler []stats.Handler
- binlog *binarylog.MethodLogger
+ binlogs []binarylog.MethodLogger
// serverHeaderBinlogged indicates whether server header has been logged. It
// will happen when one of the following two happens: stream.SendHeader(),
// stream.Send().
@@ -1446,17 +1565,29 @@
if md.Len() == 0 {
return nil
}
+ err := imetadata.Validate(md)
+ if err != nil {
+ return status.Error(codes.Internal, err.Error())
+ }
return ss.s.SetHeader(md)
}
func (ss *serverStream) SendHeader(md metadata.MD) error {
- err := ss.t.WriteHeader(ss.s, md)
- if ss.binlog != nil && !ss.serverHeaderBinlogged {
+ err := imetadata.Validate(md)
+ if err != nil {
+ return status.Error(codes.Internal, err.Error())
+ }
+
+ err = ss.t.WriteHeader(ss.s, md)
+ if len(ss.binlogs) != 0 && !ss.serverHeaderBinlogged {
h, _ := ss.s.Header()
- ss.binlog.Log(&binarylog.ServerHeader{
+ sh := &binarylog.ServerHeader{
Header: h,
- })
+ }
ss.serverHeaderBinlogged = true
+ for _, binlog := range ss.binlogs {
+ binlog.Log(ss.ctx, sh)
+ }
}
return err
}
@@ -1465,6 +1596,9 @@
if md.Len() == 0 {
return
}
+ if err := imetadata.Validate(md); err != nil {
+ logger.Errorf("stream: failed to validate md when setting trailer, err: %v", err)
+ }
ss.s.SetTrailer(md)
}
@@ -1497,6 +1631,13 @@
}
}()
+ // Server handler could have set new compressor by calling SetSendCompressor.
+ // In case it is set, we need to use it for compressing outbound message.
+ if sendCompressorsName := ss.s.SendCompress(); sendCompressorsName != ss.sendCompressorName {
+ ss.comp = encoding.GetCompressor(sendCompressorsName)
+ ss.sendCompressorName = sendCompressorsName
+ }
+
// load hdr, payload, data
hdr, payload, data, err := prepareMsg(m, ss.codec, ss.cp, ss.comp)
if err != nil {
@@ -1510,20 +1651,28 @@
if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil {
return toRPCErr(err)
}
- if ss.binlog != nil {
+ if len(ss.binlogs) != 0 {
if !ss.serverHeaderBinlogged {
h, _ := ss.s.Header()
- ss.binlog.Log(&binarylog.ServerHeader{
+ sh := &binarylog.ServerHeader{
Header: h,
- })
+ }
ss.serverHeaderBinlogged = true
+ for _, binlog := range ss.binlogs {
+ binlog.Log(ss.ctx, sh)
+ }
}
- ss.binlog.Log(&binarylog.ServerMessage{
+ sm := &binarylog.ServerMessage{
Message: data,
- })
+ }
+ for _, binlog := range ss.binlogs {
+ binlog.Log(ss.ctx, sm)
+ }
}
- if ss.statsHandler != nil {
- ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
+ if len(ss.statsHandler) != 0 {
+ for _, sh := range ss.statsHandler {
+ sh.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
+ }
}
return nil
}
@@ -1557,13 +1706,16 @@
}
}()
var payInfo *payloadInfo
- if ss.statsHandler != nil || ss.binlog != nil {
+ if len(ss.statsHandler) != 0 || len(ss.binlogs) != 0 {
payInfo = &payloadInfo{}
}
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil {
if err == io.EOF {
- if ss.binlog != nil {
- ss.binlog.Log(&binarylog.ClientHalfClose{})
+ if len(ss.binlogs) != 0 {
+ chc := &binarylog.ClientHalfClose{}
+ for _, binlog := range ss.binlogs {
+ binlog.Log(ss.ctx, chc)
+ }
}
return err
}
@@ -1572,20 +1724,26 @@
}
return toRPCErr(err)
}
- if ss.statsHandler != nil {
- ss.statsHandler.HandleRPC(ss.s.Context(), &stats.InPayload{
- RecvTime: time.Now(),
- Payload: m,
- // TODO truncate large payload.
- Data: payInfo.uncompressedBytes,
- WireLength: payInfo.wireLength + headerLen,
- Length: len(payInfo.uncompressedBytes),
- })
+ if len(ss.statsHandler) != 0 {
+ for _, sh := range ss.statsHandler {
+ sh.HandleRPC(ss.s.Context(), &stats.InPayload{
+ RecvTime: time.Now(),
+ Payload: m,
+ // TODO truncate large payload.
+ Data: payInfo.uncompressedBytes,
+ Length: len(payInfo.uncompressedBytes),
+ WireLength: payInfo.compressedLength + headerLen,
+ CompressedLength: payInfo.compressedLength,
+ })
+ }
}
- if ss.binlog != nil {
- ss.binlog.Log(&binarylog.ClientMessage{
+ if len(ss.binlogs) != 0 {
+ cm := &binarylog.ClientMessage{
Message: payInfo.uncompressedBytes,
- })
+ }
+ for _, binlog := range ss.binlogs {
+ binlog.Log(ss.ctx, cm)
+ }
}
return nil
}