VOL-2112 move to voltha-lib-go
Change-Id: Ic1af08003c1d2c698c0cce371e64f47b47b8d875
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index ccf996b..134a624 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -30,9 +30,9 @@
"golang.org/x/net/trace"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
- "google.golang.org/grpc/connectivity"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/internal/balancerload"
"google.golang.org/grpc/internal/binarylog"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcrand"
@@ -230,17 +230,21 @@
if c.creds != nil {
callHdr.Creds = c.creds
}
- var trInfo traceInfo
+ var trInfo *traceInfo
if EnableTracing {
- trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
- trInfo.firstLine.client = true
+ trInfo = &traceInfo{
+ tr: trace.New("grpc.Sent."+methodFamily(method), method),
+ firstLine: firstLine{
+ client: true,
+ },
+ }
if deadline, ok := ctx.Deadline(); ok {
trInfo.firstLine.deadline = time.Until(deadline)
}
trInfo.tr.LazyLog(&trInfo.firstLine, false)
ctx = trace.NewContext(ctx, trInfo.tr)
}
- ctx = newContextWithRPCInfo(ctx, c.failFast)
+ ctx = newContextWithRPCInfo(ctx, c.failFast, c.codec, cp, comp)
sh := cc.dopts.copts.StatsHandler
var beginTime time.Time
if sh != nil {
@@ -323,13 +327,23 @@
return cs, nil
}
-func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo traceInfo) error {
- cs.attempt = &csAttempt{
+// newAttemptLocked creates a new attempt with a transport.
+// If it succeeds, then it replaces clientStream's attempt with this new attempt.
+func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (retErr error) {
+ newAttempt := &csAttempt{
cs: cs,
dc: cs.cc.dopts.dc,
statsHandler: sh,
trInfo: trInfo,
}
+ defer func() {
+ if retErr != nil {
+ // This attempt is not set in the clientStream, so it's finish won't
+ // be called. Call it here for stats and trace in case they are not
+ // nil.
+ newAttempt.finish(retErr)
+ }
+ }()
if err := cs.ctx.Err(); err != nil {
return toRPCErr(err)
@@ -338,8 +352,12 @@
if err != nil {
return err
}
- cs.attempt.t = t
- cs.attempt.done = done
+ if trInfo != nil {
+ trInfo.firstLine.SetRemoteAddr(t.RemoteAddr())
+ }
+ newAttempt.t = t
+ newAttempt.done = done
+ cs.attempt = newAttempt
return nil
}
@@ -388,11 +406,18 @@
serverHeaderBinlogged bool
mu sync.Mutex
- firstAttempt bool // if true, transparent retry is valid
- numRetries int // exclusive of transparent retry attempt(s)
- numRetriesSincePushback int // retries since pushback; to reset backoff
- finished bool // TODO: replace with atomic cmpxchg or sync.Once?
- attempt *csAttempt // the active client stream attempt
+ firstAttempt bool // if true, transparent retry is valid
+ numRetries int // exclusive of transparent retry attempt(s)
+ numRetriesSincePushback int // retries since pushback; to reset backoff
+ finished bool // TODO: replace with atomic cmpxchg or sync.Once?
+ // attempt is the active client stream attempt.
+ // The only place where it is written is the newAttemptLocked method and this method never writes nil.
+ // So, attempt can be nil only inside newClientStream function when clientStream is first created.
+ // One of the first things done after clientStream's creation, is to call newAttemptLocked which either
+ // assigns a non nil value to the attempt or returns an error. If an error is returned from newAttemptLocked,
+ // then newClientStream calls finish on the clientStream and returns. So, finish method is the only
+ // place where we need to check if the attempt is nil.
+ attempt *csAttempt
// TODO(hedging): hedging will have multiple attempts simultaneously.
committed bool // active attempt committed for retry?
buffer []func(a *csAttempt) error // operations to replay on retry
@@ -414,9 +439,10 @@
decompSet bool
mu sync.Mutex // guards trInfo.tr
+ // trInfo may be nil (if EnableTracing is false).
// trInfo.tr is set when created (if EnableTracing is true),
// and cleared when the finish method is called.
- trInfo traceInfo
+ trInfo *traceInfo
statsHandler stats.Handler
}
@@ -449,8 +475,8 @@
if cs.attempt.s != nil {
<-cs.attempt.s.Done()
}
- if cs.firstAttempt && !cs.callInfo.failFast && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) {
- // First attempt, wait-for-ready, stream unprocessed: transparently retry.
+ if cs.firstAttempt && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) {
+ // First attempt, stream unprocessed: transparently retry.
cs.firstAttempt = false
return nil
}
@@ -540,7 +566,7 @@
cs.commitAttemptLocked()
return err
}
- if err := cs.newAttemptLocked(nil, traceInfo{}); err != nil {
+ if err := cs.newAttemptLocked(nil, nil); err != nil {
return err
}
if lastErr = cs.replayBufferLocked(); lastErr == nil {
@@ -668,15 +694,13 @@
if !cs.desc.ClientStreams {
cs.sentLast = true
}
- data, err := encode(cs.codec, m)
+
+ // load hdr, payload, data
+ hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
if err != nil {
return err
}
- compData, err := compress(data, cs.cp, cs.comp)
- if err != nil {
- return err
- }
- hdr, payload := msgHeader(data, compData)
+
// TODO(dfawley): should we be checking len(data) instead?
if len(payload) > *cs.callInfo.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
@@ -799,11 +823,11 @@
}
if cs.attempt != nil {
cs.attempt.finish(err)
- }
- // after functions all rely upon having a stream.
- if cs.attempt.s != nil {
- for _, o := range cs.opts {
- o.after(cs.callInfo)
+ // after functions all rely upon having a stream.
+ if cs.attempt.s != nil {
+ for _, o := range cs.opts {
+ o.after(cs.callInfo)
+ }
}
}
cs.cancel()
@@ -811,7 +835,7 @@
func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
cs := a.cs
- if EnableTracing {
+ if a.trInfo != nil {
a.mu.Lock()
if a.trInfo.tr != nil {
a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
@@ -868,7 +892,7 @@
}
return toRPCErr(err)
}
- if EnableTracing {
+ if a.trInfo != nil {
a.mu.Lock()
if a.trInfo.tr != nil {
a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
@@ -881,8 +905,9 @@
RecvTime: time.Now(),
Payload: m,
// TODO truncate large payload.
- Data: payInfo.uncompressedBytes,
- Length: len(payInfo.uncompressedBytes),
+ Data: payInfo.uncompressedBytes,
+ WireLength: payInfo.wireLength,
+ Length: len(payInfo.uncompressedBytes),
})
}
if channelz.IsOn() {
@@ -915,22 +940,23 @@
// Ending a stream with EOF indicates a success.
err = nil
}
+ var tr metadata.MD
if a.s != nil {
a.t.CloseStream(a.s, err)
+ tr = a.s.Trailer()
}
if a.done != nil {
br := false
- var tr metadata.MD
if a.s != nil {
br = a.s.BytesReceived()
- tr = a.s.Trailer()
}
a.done(balancer.DoneInfo{
Err: err,
Trailer: tr,
BytesSent: a.s != nil,
BytesReceived: br,
+ ServerLoad: balancerload.Parse(tr),
})
}
if a.statsHandler != nil {
@@ -938,11 +964,12 @@
Client: true,
BeginTime: a.cs.beginTime,
EndTime: time.Now(),
+ Trailer: tr,
Error: err,
}
a.statsHandler.HandleRPC(a.cs.ctx, end)
}
- if a.trInfo.tr != nil {
+ if a.trInfo != nil && a.trInfo.tr != nil {
if err == nil {
a.trInfo.tr.LazyPrintf("RPC: [OK]")
} else {
@@ -955,19 +982,18 @@
a.mu.Unlock()
}
-func (ac *addrConn) newClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, opts ...CallOption) (_ ClientStream, err error) {
- ac.mu.Lock()
- if ac.transport != t {
- ac.mu.Unlock()
- return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
- }
- // transition to CONNECTING state when an attempt starts
- if ac.state != connectivity.Connecting {
- ac.updateConnectivityState(connectivity.Connecting)
- ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
- }
- ac.mu.Unlock()
-
+// newClientStream creates a ClientStream with the specified transport, on the
+// given addrConn.
+//
+// It's expected that the given transport is either the same one in addrConn, or
+// is already closed. To avoid race, transport is specified separately, instead
+// of using ac.transpot.
+//
+// Main difference between this and ClientConn.NewStream:
+// - no retry
+// - no service config (or wait for service config)
+// - no tracing or stats
+func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, ac *addrConn, opts ...CallOption) (_ ClientStream, err error) {
if t == nil {
// TODO: return RPC error here?
return nil, errors.New("transport provided is nil")
@@ -975,14 +1001,6 @@
// defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct.
c := &callInfo{}
- for _, o := range opts {
- if err := o.before(c); err != nil {
- return nil, toRPCErr(err)
- }
- }
- c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
- c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
-
// Possible context leak:
// The cancel function for the child context we create will only be called
// when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
@@ -995,6 +1013,13 @@
}
}()
+ for _, o := range opts {
+ if err := o.before(c); err != nil {
+ return nil, toRPCErr(err)
+ }
+ }
+ c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
+ c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
if err := setCallInfoCodec(c); err != nil {
return nil, err
}
@@ -1027,6 +1052,7 @@
callHdr.Creds = c.creds
}
+ // Use a special addrConnStream to avoid retry.
as := &addrConnStream{
callHdr: callHdr,
ac: ac,
@@ -1138,15 +1164,13 @@
if !as.desc.ClientStreams {
as.sentLast = true
}
- data, err := encode(as.codec, m)
+
+ // load hdr, payload, data
+ hdr, payld, _, err := prepareMsg(m, as.codec, as.cp, as.comp)
if err != nil {
return err
}
- compData, err := compress(data, as.cp, as.comp)
- if err != nil {
- return err
- }
- hdr, payld := msgHeader(data, compData)
+
// TODO(dfawley): should we be checking len(data) instead?
if len(payld) > *as.callInfo.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize)
@@ -1383,15 +1407,13 @@
ss.t.IncrMsgSent()
}
}()
- data, err := encode(ss.codec, m)
+
+ // load hdr, payload, data
+ hdr, payload, data, err := prepareMsg(m, ss.codec, ss.cp, ss.comp)
if err != nil {
return err
}
- compData, err := compress(data, ss.cp, ss.comp)
- if err != nil {
- return err
- }
- hdr, payload := msgHeader(data, compData)
+
// TODO(dfawley): should we be checking len(data) instead?
if len(payload) > ss.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize)
@@ -1466,8 +1488,9 @@
RecvTime: time.Now(),
Payload: m,
// TODO truncate large payload.
- Data: payInfo.uncompressedBytes,
- Length: len(payInfo.uncompressedBytes),
+ Data: payInfo.uncompressedBytes,
+ WireLength: payInfo.wireLength,
+ Length: len(payInfo.uncompressedBytes),
})
}
if ss.binlog != nil {
@@ -1483,3 +1506,24 @@
func MethodFromServerStream(stream ServerStream) (string, bool) {
return Method(stream.Context())
}
+
+// prepareMsg returns the hdr, payload and data
+// using the compressors passed or using the
+// passed preparedmsg
+func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) {
+ if preparedMsg, ok := m.(*PreparedMsg); ok {
+ return preparedMsg.hdr, preparedMsg.payload, preparedMsg.encodedData, nil
+ }
+ // The input interface is not a prepared msg.
+ // Marshal and Compress the data at this point
+ data, err = encode(codec, m)
+ if err != nil {
+ return nil, nil, nil, err
+ }
+ compData, err := compress(data, cp, comp)
+ if err != nil {
+ return nil, nil, nil, err
+ }
+ hdr, payload = msgHeader(data, compData)
+ return hdr, payload, data, nil
+}