blob: e10e62317dfdd42fb5ab7536260507d0b461ce72 [file] [log] [blame]
Zack Williamse940c7a2019-08-21 14:25:39 -07001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "io"
25 "math"
26 "strconv"
27 "sync"
28 "time"
29
30 "golang.org/x/net/trace"
31 "google.golang.org/grpc/balancer"
32 "google.golang.org/grpc/codes"
33 "google.golang.org/grpc/connectivity"
34 "google.golang.org/grpc/encoding"
35 "google.golang.org/grpc/grpclog"
36 "google.golang.org/grpc/internal/balancerload"
37 "google.golang.org/grpc/internal/binarylog"
38 "google.golang.org/grpc/internal/channelz"
39 "google.golang.org/grpc/internal/grpcrand"
40 "google.golang.org/grpc/internal/transport"
41 "google.golang.org/grpc/metadata"
42 "google.golang.org/grpc/peer"
43 "google.golang.org/grpc/stats"
44 "google.golang.org/grpc/status"
45)
46
47// StreamHandler defines the handler called by gRPC server to complete the
48// execution of a streaming RPC. If a StreamHandler returns an error, it
49// should be produced by the status package, or else gRPC will use
50// codes.Unknown as the status code and err.Error() as the status message
51// of the RPC.
52type StreamHandler func(srv interface{}, stream ServerStream) error
53
54// StreamDesc represents a streaming RPC service's method specification.
55type StreamDesc struct {
56 StreamName string
57 Handler StreamHandler
58
59 // At least one of these is true.
60 ServerStreams bool
61 ClientStreams bool
62}
63
64// Stream defines the common interface a client or server stream has to satisfy.
65//
66// Deprecated: See ClientStream and ServerStream documentation instead.
67type Stream interface {
68 // Deprecated: See ClientStream and ServerStream documentation instead.
69 Context() context.Context
70 // Deprecated: See ClientStream and ServerStream documentation instead.
71 SendMsg(m interface{}) error
72 // Deprecated: See ClientStream and ServerStream documentation instead.
73 RecvMsg(m interface{}) error
74}
75
76// ClientStream defines the client-side behavior of a streaming RPC.
77//
78// All errors returned from ClientStream methods are compatible with the
79// status package.
80type ClientStream interface {
81 // Header returns the header metadata received from the server if there
82 // is any. It blocks if the metadata is not ready to read.
83 Header() (metadata.MD, error)
84 // Trailer returns the trailer metadata from the server, if there is any.
85 // It must only be called after stream.CloseAndRecv has returned, or
86 // stream.Recv has returned a non-nil error (including io.EOF).
87 Trailer() metadata.MD
88 // CloseSend closes the send direction of the stream. It closes the stream
89 // when non-nil error is met. It is also not safe to call CloseSend
90 // concurrently with SendMsg.
91 CloseSend() error
92 // Context returns the context for this stream.
93 //
94 // It should not be called until after Header or RecvMsg has returned. Once
95 // called, subsequent client-side retries are disabled.
96 Context() context.Context
97 // SendMsg is generally called by generated code. On error, SendMsg aborts
98 // the stream. If the error was generated by the client, the status is
99 // returned directly; otherwise, io.EOF is returned and the status of
100 // the stream may be discovered using RecvMsg.
101 //
102 // SendMsg blocks until:
103 // - There is sufficient flow control to schedule m with the transport, or
104 // - The stream is done, or
105 // - The stream breaks.
106 //
107 // SendMsg does not wait until the message is received by the server. An
108 // untimely stream closure may result in lost messages. To ensure delivery,
109 // users should ensure the RPC completed successfully using RecvMsg.
110 //
111 // It is safe to have a goroutine calling SendMsg and another goroutine
112 // calling RecvMsg on the same stream at the same time, but it is not safe
113 // to call SendMsg on the same stream in different goroutines. It is also
114 // not safe to call CloseSend concurrently with SendMsg.
115 SendMsg(m interface{}) error
116 // RecvMsg blocks until it receives a message into m or the stream is
117 // done. It returns io.EOF when the stream completes successfully. On
118 // any other error, the stream is aborted and the error contains the RPC
119 // status.
120 //
121 // It is safe to have a goroutine calling SendMsg and another goroutine
122 // calling RecvMsg on the same stream at the same time, but it is not
123 // safe to call RecvMsg on the same stream in different goroutines.
124 RecvMsg(m interface{}) error
125}
126
127// NewStream creates a new Stream for the client side. This is typically
128// called by generated code. ctx is used for the lifetime of the stream.
129//
130// To ensure resources are not leaked due to the stream returned, one of the following
131// actions must be performed:
132//
133// 1. Call Close on the ClientConn.
134// 2. Cancel the context provided.
135// 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
136// client-streaming RPC, for instance, might use the helper function
137// CloseAndRecv (note that CloseSend does not Recv, therefore is not
138// guaranteed to release all resources).
139// 4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
140//
141// If none of the above happen, a goroutine and a context will be leaked, and grpc
142// will not call the optionally-configured stats handler with a stats.End message.
143func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
144 // allow interceptor to see all applicable call options, which means those
145 // configured as defaults from dial option as well as per-call options
146 opts = combine(cc.dopts.callOptions, opts)
147
148 if cc.dopts.streamInt != nil {
149 return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
150 }
151 return newClientStream(ctx, desc, cc, method, opts...)
152}
153
154// NewClientStream is a wrapper for ClientConn.NewStream.
155func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
156 return cc.NewStream(ctx, desc, method, opts...)
157}
158
159func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
160 if channelz.IsOn() {
161 cc.incrCallsStarted()
162 defer func() {
163 if err != nil {
164 cc.incrCallsFailed()
165 }
166 }()
167 }
168 c := defaultCallInfo()
169 // Provide an opportunity for the first RPC to see the first service config
170 // provided by the resolver.
171 if err := cc.waitForResolvedAddrs(ctx); err != nil {
172 return nil, err
173 }
174 mc := cc.GetMethodConfig(method)
175 if mc.WaitForReady != nil {
176 c.failFast = !*mc.WaitForReady
177 }
178
179 // Possible context leak:
180 // The cancel function for the child context we create will only be called
181 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
182 // an error is generated by SendMsg.
183 // https://github.com/grpc/grpc-go/issues/1818.
184 var cancel context.CancelFunc
185 if mc.Timeout != nil && *mc.Timeout >= 0 {
186 ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
187 } else {
188 ctx, cancel = context.WithCancel(ctx)
189 }
190 defer func() {
191 if err != nil {
192 cancel()
193 }
194 }()
195
196 for _, o := range opts {
197 if err := o.before(c); err != nil {
198 return nil, toRPCErr(err)
199 }
200 }
201 c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize)
202 c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
203 if err := setCallInfoCodec(c); err != nil {
204 return nil, err
205 }
206
207 callHdr := &transport.CallHdr{
208 Host: cc.authority,
209 Method: method,
210 ContentSubtype: c.contentSubtype,
211 }
212
213 // Set our outgoing compression according to the UseCompressor CallOption, if
214 // set. In that case, also find the compressor from the encoding package.
215 // Otherwise, use the compressor configured by the WithCompressor DialOption,
216 // if set.
217 var cp Compressor
218 var comp encoding.Compressor
219 if ct := c.compressorType; ct != "" {
220 callHdr.SendCompress = ct
221 if ct != encoding.Identity {
222 comp = encoding.GetCompressor(ct)
223 if comp == nil {
224 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
225 }
226 }
227 } else if cc.dopts.cp != nil {
228 callHdr.SendCompress = cc.dopts.cp.Type()
229 cp = cc.dopts.cp
230 }
231 if c.creds != nil {
232 callHdr.Creds = c.creds
233 }
234 var trInfo *traceInfo
235 if EnableTracing {
236 trInfo = &traceInfo{
237 tr: trace.New("grpc.Sent."+methodFamily(method), method),
238 firstLine: firstLine{
239 client: true,
240 },
241 }
242 if deadline, ok := ctx.Deadline(); ok {
243 trInfo.firstLine.deadline = time.Until(deadline)
244 }
245 trInfo.tr.LazyLog(&trInfo.firstLine, false)
246 ctx = trace.NewContext(ctx, trInfo.tr)
247 }
248 ctx = newContextWithRPCInfo(ctx, c.failFast, c.codec, cp, comp)
249 sh := cc.dopts.copts.StatsHandler
250 var beginTime time.Time
251 if sh != nil {
252 ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
253 beginTime = time.Now()
254 begin := &stats.Begin{
255 Client: true,
256 BeginTime: beginTime,
257 FailFast: c.failFast,
258 }
259 sh.HandleRPC(ctx, begin)
260 }
261
262 cs := &clientStream{
263 callHdr: callHdr,
264 ctx: ctx,
265 methodConfig: &mc,
266 opts: opts,
267 callInfo: c,
268 cc: cc,
269 desc: desc,
270 codec: c.codec,
271 cp: cp,
272 comp: comp,
273 cancel: cancel,
274 beginTime: beginTime,
275 firstAttempt: true,
276 }
277 if !cc.dopts.disableRetry {
278 cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
279 }
280 cs.binlog = binarylog.GetMethodLogger(method)
281
282 cs.callInfo.stream = cs
283 // Only this initial attempt has stats/tracing.
284 // TODO(dfawley): move to newAttempt when per-attempt stats are implemented.
285 if err := cs.newAttemptLocked(sh, trInfo); err != nil {
286 cs.finish(err)
287 return nil, err
288 }
289
290 op := func(a *csAttempt) error { return a.newStream() }
291 if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
292 cs.finish(err)
293 return nil, err
294 }
295
296 if cs.binlog != nil {
297 md, _ := metadata.FromOutgoingContext(ctx)
298 logEntry := &binarylog.ClientHeader{
299 OnClientSide: true,
300 Header: md,
301 MethodName: method,
302 Authority: cs.cc.authority,
303 }
304 if deadline, ok := ctx.Deadline(); ok {
305 logEntry.Timeout = time.Until(deadline)
306 if logEntry.Timeout < 0 {
307 logEntry.Timeout = 0
308 }
309 }
310 cs.binlog.Log(logEntry)
311 }
312
313 if desc != unaryStreamDesc {
314 // Listen on cc and stream contexts to cleanup when the user closes the
315 // ClientConn or cancels the stream context. In all other cases, an error
316 // should already be injected into the recv buffer by the transport, which
317 // the client will eventually receive, and then we will cancel the stream's
318 // context in clientStream.finish.
319 go func() {
320 select {
321 case <-cc.ctx.Done():
322 cs.finish(ErrClientConnClosing)
323 case <-ctx.Done():
324 cs.finish(toRPCErr(ctx.Err()))
325 }
326 }()
327 }
328 return cs, nil
329}
330
331func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) error {
332 cs.attempt = &csAttempt{
333 cs: cs,
334 dc: cs.cc.dopts.dc,
335 statsHandler: sh,
336 trInfo: trInfo,
337 }
338
339 if err := cs.ctx.Err(); err != nil {
340 return toRPCErr(err)
341 }
342 t, done, err := cs.cc.getTransport(cs.ctx, cs.callInfo.failFast, cs.callHdr.Method)
343 if err != nil {
344 return err
345 }
346 if trInfo != nil {
347 trInfo.firstLine.SetRemoteAddr(t.RemoteAddr())
348 }
349 cs.attempt.t = t
350 cs.attempt.done = done
351 return nil
352}
353
354func (a *csAttempt) newStream() error {
355 cs := a.cs
356 cs.callHdr.PreviousAttempts = cs.numRetries
357 s, err := a.t.NewStream(cs.ctx, cs.callHdr)
358 if err != nil {
359 return toRPCErr(err)
360 }
361 cs.attempt.s = s
362 cs.attempt.p = &parser{r: s}
363 return nil
364}
365
366// clientStream implements a client side Stream.
367type clientStream struct {
368 callHdr *transport.CallHdr
369 opts []CallOption
370 callInfo *callInfo
371 cc *ClientConn
372 desc *StreamDesc
373
374 codec baseCodec
375 cp Compressor
376 comp encoding.Compressor
377
378 cancel context.CancelFunc // cancels all attempts
379
380 sentLast bool // sent an end stream
381 beginTime time.Time
382
383 methodConfig *MethodConfig
384
385 ctx context.Context // the application's context, wrapped by stats/tracing
386
387 retryThrottler *retryThrottler // The throttler active when the RPC began.
388
389 binlog *binarylog.MethodLogger // Binary logger, can be nil.
390 // serverHeaderBinlogged is a boolean for whether server header has been
391 // logged. Server header will be logged when the first time one of those
392 // happens: stream.Header(), stream.Recv().
393 //
394 // It's only read and used by Recv() and Header(), so it doesn't need to be
395 // synchronized.
396 serverHeaderBinlogged bool
397
398 mu sync.Mutex
399 firstAttempt bool // if true, transparent retry is valid
400 numRetries int // exclusive of transparent retry attempt(s)
401 numRetriesSincePushback int // retries since pushback; to reset backoff
402 finished bool // TODO: replace with atomic cmpxchg or sync.Once?
403 attempt *csAttempt // the active client stream attempt
404 // TODO(hedging): hedging will have multiple attempts simultaneously.
405 committed bool // active attempt committed for retry?
406 buffer []func(a *csAttempt) error // operations to replay on retry
407 bufferSize int // current size of buffer
408}
409
410// csAttempt implements a single transport stream attempt within a
411// clientStream.
412type csAttempt struct {
413 cs *clientStream
414 t transport.ClientTransport
415 s *transport.Stream
416 p *parser
417 done func(balancer.DoneInfo)
418
419 finished bool
420 dc Decompressor
421 decomp encoding.Compressor
422 decompSet bool
423
424 mu sync.Mutex // guards trInfo.tr
425 // trInfo may be nil (if EnableTracing is false).
426 // trInfo.tr is set when created (if EnableTracing is true),
427 // and cleared when the finish method is called.
428 trInfo *traceInfo
429
430 statsHandler stats.Handler
431}
432
433func (cs *clientStream) commitAttemptLocked() {
434 cs.committed = true
435 cs.buffer = nil
436}
437
438func (cs *clientStream) commitAttempt() {
439 cs.mu.Lock()
440 cs.commitAttemptLocked()
441 cs.mu.Unlock()
442}
443
444// shouldRetry returns nil if the RPC should be retried; otherwise it returns
445// the error that should be returned by the operation.
446func (cs *clientStream) shouldRetry(err error) error {
447 if cs.attempt.s == nil && !cs.callInfo.failFast {
448 // In the event of any error from NewStream (attempt.s == nil), we
449 // never attempted to write anything to the wire, so we can retry
450 // indefinitely for non-fail-fast RPCs.
451 return nil
452 }
453 if cs.finished || cs.committed {
454 // RPC is finished or committed; cannot retry.
455 return err
456 }
457 // Wait for the trailers.
458 if cs.attempt.s != nil {
459 <-cs.attempt.s.Done()
460 }
461 if cs.firstAttempt && !cs.callInfo.failFast && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) {
462 // First attempt, wait-for-ready, stream unprocessed: transparently retry.
463 cs.firstAttempt = false
464 return nil
465 }
466 cs.firstAttempt = false
467 if cs.cc.dopts.disableRetry {
468 return err
469 }
470
471 pushback := 0
472 hasPushback := false
473 if cs.attempt.s != nil {
474 if to, toErr := cs.attempt.s.TrailersOnly(); toErr != nil || !to {
475 return err
476 }
477
478 // TODO(retry): Move down if the spec changes to not check server pushback
479 // before considering this a failure for throttling.
480 sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"]
481 if len(sps) == 1 {
482 var e error
483 if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
484 grpclog.Infof("Server retry pushback specified to abort (%q).", sps[0])
485 cs.retryThrottler.throttle() // This counts as a failure for throttling.
486 return err
487 }
488 hasPushback = true
489 } else if len(sps) > 1 {
490 grpclog.Warningf("Server retry pushback specified multiple values (%q); not retrying.", sps)
491 cs.retryThrottler.throttle() // This counts as a failure for throttling.
492 return err
493 }
494 }
495
496 var code codes.Code
497 if cs.attempt.s != nil {
498 code = cs.attempt.s.Status().Code()
499 } else {
500 code = status.Convert(err).Code()
501 }
502
503 rp := cs.methodConfig.retryPolicy
504 if rp == nil || !rp.retryableStatusCodes[code] {
505 return err
506 }
507
508 // Note: the ordering here is important; we count this as a failure
509 // only if the code matched a retryable code.
510 if cs.retryThrottler.throttle() {
511 return err
512 }
513 if cs.numRetries+1 >= rp.maxAttempts {
514 return err
515 }
516
517 var dur time.Duration
518 if hasPushback {
519 dur = time.Millisecond * time.Duration(pushback)
520 cs.numRetriesSincePushback = 0
521 } else {
522 fact := math.Pow(rp.backoffMultiplier, float64(cs.numRetriesSincePushback))
523 cur := float64(rp.initialBackoff) * fact
524 if max := float64(rp.maxBackoff); cur > max {
525 cur = max
526 }
527 dur = time.Duration(grpcrand.Int63n(int64(cur)))
528 cs.numRetriesSincePushback++
529 }
530
531 // TODO(dfawley): we could eagerly fail here if dur puts us past the
532 // deadline, but unsure if it is worth doing.
533 t := time.NewTimer(dur)
534 select {
535 case <-t.C:
536 cs.numRetries++
537 return nil
538 case <-cs.ctx.Done():
539 t.Stop()
540 return status.FromContextError(cs.ctx.Err()).Err()
541 }
542}
543
544// Returns nil if a retry was performed and succeeded; error otherwise.
545func (cs *clientStream) retryLocked(lastErr error) error {
546 for {
547 cs.attempt.finish(lastErr)
548 if err := cs.shouldRetry(lastErr); err != nil {
549 cs.commitAttemptLocked()
550 return err
551 }
552 if err := cs.newAttemptLocked(nil, nil); err != nil {
553 return err
554 }
555 if lastErr = cs.replayBufferLocked(); lastErr == nil {
556 return nil
557 }
558 }
559}
560
561func (cs *clientStream) Context() context.Context {
562 cs.commitAttempt()
563 // No need to lock before using attempt, since we know it is committed and
564 // cannot change.
565 return cs.attempt.s.Context()
566}
567
568func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
569 cs.mu.Lock()
570 for {
571 if cs.committed {
572 cs.mu.Unlock()
573 return op(cs.attempt)
574 }
575 a := cs.attempt
576 cs.mu.Unlock()
577 err := op(a)
578 cs.mu.Lock()
579 if a != cs.attempt {
580 // We started another attempt already.
581 continue
582 }
583 if err == io.EOF {
584 <-a.s.Done()
585 }
586 if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) {
587 onSuccess()
588 cs.mu.Unlock()
589 return err
590 }
591 if err := cs.retryLocked(err); err != nil {
592 cs.mu.Unlock()
593 return err
594 }
595 }
596}
597
598func (cs *clientStream) Header() (metadata.MD, error) {
599 var m metadata.MD
600 err := cs.withRetry(func(a *csAttempt) error {
601 var err error
602 m, err = a.s.Header()
603 return toRPCErr(err)
604 }, cs.commitAttemptLocked)
605 if err != nil {
606 cs.finish(err)
607 return nil, err
608 }
609 if cs.binlog != nil && !cs.serverHeaderBinlogged {
610 // Only log if binary log is on and header has not been logged.
611 logEntry := &binarylog.ServerHeader{
612 OnClientSide: true,
613 Header: m,
614 PeerAddr: nil,
615 }
616 if peer, ok := peer.FromContext(cs.Context()); ok {
617 logEntry.PeerAddr = peer.Addr
618 }
619 cs.binlog.Log(logEntry)
620 cs.serverHeaderBinlogged = true
621 }
622 return m, err
623}
624
625func (cs *clientStream) Trailer() metadata.MD {
626 // On RPC failure, we never need to retry, because usage requires that
627 // RecvMsg() returned a non-nil error before calling this function is valid.
628 // We would have retried earlier if necessary.
629 //
630 // Commit the attempt anyway, just in case users are not following those
631 // directions -- it will prevent races and should not meaningfully impact
632 // performance.
633 cs.commitAttempt()
634 if cs.attempt.s == nil {
635 return nil
636 }
637 return cs.attempt.s.Trailer()
638}
639
640func (cs *clientStream) replayBufferLocked() error {
641 a := cs.attempt
642 for _, f := range cs.buffer {
643 if err := f(a); err != nil {
644 return err
645 }
646 }
647 return nil
648}
649
650func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error) {
651 // Note: we still will buffer if retry is disabled (for transparent retries).
652 if cs.committed {
653 return
654 }
655 cs.bufferSize += sz
656 if cs.bufferSize > cs.callInfo.maxRetryRPCBufferSize {
657 cs.commitAttemptLocked()
658 return
659 }
660 cs.buffer = append(cs.buffer, op)
661}
662
663func (cs *clientStream) SendMsg(m interface{}) (err error) {
664 defer func() {
665 if err != nil && err != io.EOF {
666 // Call finish on the client stream for errors generated by this SendMsg
667 // call, as these indicate problems created by this client. (Transport
668 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
669 // error will be returned from RecvMsg eventually in that case, or be
670 // retried.)
671 cs.finish(err)
672 }
673 }()
674 if cs.sentLast {
675 return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
676 }
677 if !cs.desc.ClientStreams {
678 cs.sentLast = true
679 }
680
681 // load hdr, payload, data
682 hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
683 if err != nil {
684 return err
685 }
686
687 // TODO(dfawley): should we be checking len(data) instead?
688 if len(payload) > *cs.callInfo.maxSendMessageSize {
689 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
690 }
691 msgBytes := data // Store the pointer before setting to nil. For binary logging.
692 op := func(a *csAttempt) error {
693 err := a.sendMsg(m, hdr, payload, data)
694 // nil out the message and uncomp when replaying; they are only needed for
695 // stats which is disabled for subsequent attempts.
696 m, data = nil, nil
697 return err
698 }
699 err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
700 if cs.binlog != nil && err == nil {
701 cs.binlog.Log(&binarylog.ClientMessage{
702 OnClientSide: true,
703 Message: msgBytes,
704 })
705 }
706 return
707}
708
709func (cs *clientStream) RecvMsg(m interface{}) error {
710 if cs.binlog != nil && !cs.serverHeaderBinlogged {
711 // Call Header() to binary log header if it's not already logged.
712 cs.Header()
713 }
714 var recvInfo *payloadInfo
715 if cs.binlog != nil {
716 recvInfo = &payloadInfo{}
717 }
718 err := cs.withRetry(func(a *csAttempt) error {
719 return a.recvMsg(m, recvInfo)
720 }, cs.commitAttemptLocked)
721 if cs.binlog != nil && err == nil {
722 cs.binlog.Log(&binarylog.ServerMessage{
723 OnClientSide: true,
724 Message: recvInfo.uncompressedBytes,
725 })
726 }
727 if err != nil || !cs.desc.ServerStreams {
728 // err != nil or non-server-streaming indicates end of stream.
729 cs.finish(err)
730
731 if cs.binlog != nil {
732 // finish will not log Trailer. Log Trailer here.
733 logEntry := &binarylog.ServerTrailer{
734 OnClientSide: true,
735 Trailer: cs.Trailer(),
736 Err: err,
737 }
738 if logEntry.Err == io.EOF {
739 logEntry.Err = nil
740 }
741 if peer, ok := peer.FromContext(cs.Context()); ok {
742 logEntry.PeerAddr = peer.Addr
743 }
744 cs.binlog.Log(logEntry)
745 }
746 }
747 return err
748}
749
750func (cs *clientStream) CloseSend() error {
751 if cs.sentLast {
752 // TODO: return an error and finish the stream instead, due to API misuse?
753 return nil
754 }
755 cs.sentLast = true
756 op := func(a *csAttempt) error {
757 a.t.Write(a.s, nil, nil, &transport.Options{Last: true})
758 // Always return nil; io.EOF is the only error that might make sense
759 // instead, but there is no need to signal the client to call RecvMsg
760 // as the only use left for the stream after CloseSend is to call
761 // RecvMsg. This also matches historical behavior.
762 return nil
763 }
764 cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) })
765 if cs.binlog != nil {
766 cs.binlog.Log(&binarylog.ClientHalfClose{
767 OnClientSide: true,
768 })
769 }
770 // We never returned an error here for reasons.
771 return nil
772}
773
774func (cs *clientStream) finish(err error) {
775 if err == io.EOF {
776 // Ending a stream with EOF indicates a success.
777 err = nil
778 }
779 cs.mu.Lock()
780 if cs.finished {
781 cs.mu.Unlock()
782 return
783 }
784 cs.finished = true
785 cs.commitAttemptLocked()
786 cs.mu.Unlock()
787 // For binary logging. only log cancel in finish (could be caused by RPC ctx
788 // canceled or ClientConn closed). Trailer will be logged in RecvMsg.
789 //
790 // Only one of cancel or trailer needs to be logged. In the cases where
791 // users don't call RecvMsg, users must have already canceled the RPC.
792 if cs.binlog != nil && status.Code(err) == codes.Canceled {
793 cs.binlog.Log(&binarylog.Cancel{
794 OnClientSide: true,
795 })
796 }
797 if err == nil {
798 cs.retryThrottler.successfulRPC()
799 }
800 if channelz.IsOn() {
801 if err != nil {
802 cs.cc.incrCallsFailed()
803 } else {
804 cs.cc.incrCallsSucceeded()
805 }
806 }
807 if cs.attempt != nil {
808 cs.attempt.finish(err)
809 }
810 // after functions all rely upon having a stream.
811 if cs.attempt.s != nil {
812 for _, o := range cs.opts {
813 o.after(cs.callInfo)
814 }
815 }
816 cs.cancel()
817}
818
819func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
820 cs := a.cs
821 if a.trInfo != nil {
822 a.mu.Lock()
823 if a.trInfo.tr != nil {
824 a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
825 }
826 a.mu.Unlock()
827 }
828 if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
829 if !cs.desc.ClientStreams {
830 // For non-client-streaming RPCs, we return nil instead of EOF on error
831 // because the generated code requires it. finish is not called; RecvMsg()
832 // will call it with the stream's status independently.
833 return nil
834 }
835 return io.EOF
836 }
837 if a.statsHandler != nil {
838 a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now()))
839 }
840 if channelz.IsOn() {
841 a.t.IncrMsgSent()
842 }
843 return nil
844}
845
846func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
847 cs := a.cs
848 if a.statsHandler != nil && payInfo == nil {
849 payInfo = &payloadInfo{}
850 }
851
852 if !a.decompSet {
853 // Block until we receive headers containing received message encoding.
854 if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity {
855 if a.dc == nil || a.dc.Type() != ct {
856 // No configured decompressor, or it does not match the incoming
857 // message encoding; attempt to find a registered compressor that does.
858 a.dc = nil
859 a.decomp = encoding.GetCompressor(ct)
860 }
861 } else {
862 // No compression is used; disable our decompressor.
863 a.dc = nil
864 }
865 // Only initialize this state once per stream.
866 a.decompSet = true
867 }
868 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
869 if err != nil {
870 if err == io.EOF {
871 if statusErr := a.s.Status().Err(); statusErr != nil {
872 return statusErr
873 }
874 return io.EOF // indicates successful end of stream.
875 }
876 return toRPCErr(err)
877 }
878 if a.trInfo != nil {
879 a.mu.Lock()
880 if a.trInfo.tr != nil {
881 a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
882 }
883 a.mu.Unlock()
884 }
885 if a.statsHandler != nil {
886 a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{
887 Client: true,
888 RecvTime: time.Now(),
889 Payload: m,
890 // TODO truncate large payload.
891 Data: payInfo.uncompressedBytes,
892 WireLength: payInfo.wireLength,
893 Length: len(payInfo.uncompressedBytes),
894 })
895 }
896 if channelz.IsOn() {
897 a.t.IncrMsgRecv()
898 }
899 if cs.desc.ServerStreams {
900 // Subsequent messages should be received by subsequent RecvMsg calls.
901 return nil
902 }
903 // Special handling for non-server-stream rpcs.
904 // This recv expects EOF or errors, so we don't collect inPayload.
905 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp)
906 if err == nil {
907 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
908 }
909 if err == io.EOF {
910 return a.s.Status().Err() // non-server streaming Recv returns nil on success
911 }
912 return toRPCErr(err)
913}
914
915func (a *csAttempt) finish(err error) {
916 a.mu.Lock()
917 if a.finished {
918 a.mu.Unlock()
919 return
920 }
921 a.finished = true
922 if err == io.EOF {
923 // Ending a stream with EOF indicates a success.
924 err = nil
925 }
926 var tr metadata.MD
927 if a.s != nil {
928 a.t.CloseStream(a.s, err)
929 tr = a.s.Trailer()
930 }
931
932 if a.done != nil {
933 br := false
934 if a.s != nil {
935 br = a.s.BytesReceived()
936 }
937 a.done(balancer.DoneInfo{
938 Err: err,
939 Trailer: tr,
940 BytesSent: a.s != nil,
941 BytesReceived: br,
942 ServerLoad: balancerload.Parse(tr),
943 })
944 }
945 if a.statsHandler != nil {
946 end := &stats.End{
947 Client: true,
948 BeginTime: a.cs.beginTime,
949 EndTime: time.Now(),
950 Trailer: tr,
951 Error: err,
952 }
953 a.statsHandler.HandleRPC(a.cs.ctx, end)
954 }
955 if a.trInfo != nil && a.trInfo.tr != nil {
956 if err == nil {
957 a.trInfo.tr.LazyPrintf("RPC: [OK]")
958 } else {
959 a.trInfo.tr.LazyPrintf("RPC: [%v]", err)
960 a.trInfo.tr.SetError()
961 }
962 a.trInfo.tr.Finish()
963 a.trInfo.tr = nil
964 }
965 a.mu.Unlock()
966}
967
968func (ac *addrConn) newClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, opts ...CallOption) (_ ClientStream, err error) {
969 ac.mu.Lock()
970 if ac.transport != t {
971 ac.mu.Unlock()
972 return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
973 }
974 // transition to CONNECTING state when an attempt starts
975 if ac.state != connectivity.Connecting {
976 ac.updateConnectivityState(connectivity.Connecting)
977 ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
978 }
979 ac.mu.Unlock()
980
981 if t == nil {
982 // TODO: return RPC error here?
983 return nil, errors.New("transport provided is nil")
984 }
985 // defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct.
986 c := &callInfo{}
987
988 for _, o := range opts {
989 if err := o.before(c); err != nil {
990 return nil, toRPCErr(err)
991 }
992 }
993 c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
994 c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
995
996 // Possible context leak:
997 // The cancel function for the child context we create will only be called
998 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
999 // an error is generated by SendMsg.
1000 // https://github.com/grpc/grpc-go/issues/1818.
1001 ctx, cancel := context.WithCancel(ctx)
1002 defer func() {
1003 if err != nil {
1004 cancel()
1005 }
1006 }()
1007
1008 if err := setCallInfoCodec(c); err != nil {
1009 return nil, err
1010 }
1011
1012 callHdr := &transport.CallHdr{
1013 Host: ac.cc.authority,
1014 Method: method,
1015 ContentSubtype: c.contentSubtype,
1016 }
1017
1018 // Set our outgoing compression according to the UseCompressor CallOption, if
1019 // set. In that case, also find the compressor from the encoding package.
1020 // Otherwise, use the compressor configured by the WithCompressor DialOption,
1021 // if set.
1022 var cp Compressor
1023 var comp encoding.Compressor
1024 if ct := c.compressorType; ct != "" {
1025 callHdr.SendCompress = ct
1026 if ct != encoding.Identity {
1027 comp = encoding.GetCompressor(ct)
1028 if comp == nil {
1029 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
1030 }
1031 }
1032 } else if ac.cc.dopts.cp != nil {
1033 callHdr.SendCompress = ac.cc.dopts.cp.Type()
1034 cp = ac.cc.dopts.cp
1035 }
1036 if c.creds != nil {
1037 callHdr.Creds = c.creds
1038 }
1039
1040 as := &addrConnStream{
1041 callHdr: callHdr,
1042 ac: ac,
1043 ctx: ctx,
1044 cancel: cancel,
1045 opts: opts,
1046 callInfo: c,
1047 desc: desc,
1048 codec: c.codec,
1049 cp: cp,
1050 comp: comp,
1051 t: t,
1052 }
1053
1054 as.callInfo.stream = as
1055 s, err := as.t.NewStream(as.ctx, as.callHdr)
1056 if err != nil {
1057 err = toRPCErr(err)
1058 return nil, err
1059 }
1060 as.s = s
1061 as.p = &parser{r: s}
1062 ac.incrCallsStarted()
1063 if desc != unaryStreamDesc {
1064 // Listen on cc and stream contexts to cleanup when the user closes the
1065 // ClientConn or cancels the stream context. In all other cases, an error
1066 // should already be injected into the recv buffer by the transport, which
1067 // the client will eventually receive, and then we will cancel the stream's
1068 // context in clientStream.finish.
1069 go func() {
1070 select {
1071 case <-ac.ctx.Done():
1072 as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
1073 case <-ctx.Done():
1074 as.finish(toRPCErr(ctx.Err()))
1075 }
1076 }()
1077 }
1078 return as, nil
1079}
1080
1081type addrConnStream struct {
1082 s *transport.Stream
1083 ac *addrConn
1084 callHdr *transport.CallHdr
1085 cancel context.CancelFunc
1086 opts []CallOption
1087 callInfo *callInfo
1088 t transport.ClientTransport
1089 ctx context.Context
1090 sentLast bool
1091 desc *StreamDesc
1092 codec baseCodec
1093 cp Compressor
1094 comp encoding.Compressor
1095 decompSet bool
1096 dc Decompressor
1097 decomp encoding.Compressor
1098 p *parser
1099 mu sync.Mutex
1100 finished bool
1101}
1102
1103func (as *addrConnStream) Header() (metadata.MD, error) {
1104 m, err := as.s.Header()
1105 if err != nil {
1106 as.finish(toRPCErr(err))
1107 }
1108 return m, err
1109}
1110
1111func (as *addrConnStream) Trailer() metadata.MD {
1112 return as.s.Trailer()
1113}
1114
1115func (as *addrConnStream) CloseSend() error {
1116 if as.sentLast {
1117 // TODO: return an error and finish the stream instead, due to API misuse?
1118 return nil
1119 }
1120 as.sentLast = true
1121
1122 as.t.Write(as.s, nil, nil, &transport.Options{Last: true})
1123 // Always return nil; io.EOF is the only error that might make sense
1124 // instead, but there is no need to signal the client to call RecvMsg
1125 // as the only use left for the stream after CloseSend is to call
1126 // RecvMsg. This also matches historical behavior.
1127 return nil
1128}
1129
1130func (as *addrConnStream) Context() context.Context {
1131 return as.s.Context()
1132}
1133
1134func (as *addrConnStream) SendMsg(m interface{}) (err error) {
1135 defer func() {
1136 if err != nil && err != io.EOF {
1137 // Call finish on the client stream for errors generated by this SendMsg
1138 // call, as these indicate problems created by this client. (Transport
1139 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
1140 // error will be returned from RecvMsg eventually in that case, or be
1141 // retried.)
1142 as.finish(err)
1143 }
1144 }()
1145 if as.sentLast {
1146 return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
1147 }
1148 if !as.desc.ClientStreams {
1149 as.sentLast = true
1150 }
1151
1152 // load hdr, payload, data
1153 hdr, payld, _, err := prepareMsg(m, as.codec, as.cp, as.comp)
1154 if err != nil {
1155 return err
1156 }
1157
1158 // TODO(dfawley): should we be checking len(data) instead?
1159 if len(payld) > *as.callInfo.maxSendMessageSize {
1160 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize)
1161 }
1162
1163 if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams}); err != nil {
1164 if !as.desc.ClientStreams {
1165 // For non-client-streaming RPCs, we return nil instead of EOF on error
1166 // because the generated code requires it. finish is not called; RecvMsg()
1167 // will call it with the stream's status independently.
1168 return nil
1169 }
1170 return io.EOF
1171 }
1172
1173 if channelz.IsOn() {
1174 as.t.IncrMsgSent()
1175 }
1176 return nil
1177}
1178
1179func (as *addrConnStream) RecvMsg(m interface{}) (err error) {
1180 defer func() {
1181 if err != nil || !as.desc.ServerStreams {
1182 // err != nil or non-server-streaming indicates end of stream.
1183 as.finish(err)
1184 }
1185 }()
1186
1187 if !as.decompSet {
1188 // Block until we receive headers containing received message encoding.
1189 if ct := as.s.RecvCompress(); ct != "" && ct != encoding.Identity {
1190 if as.dc == nil || as.dc.Type() != ct {
1191 // No configured decompressor, or it does not match the incoming
1192 // message encoding; attempt to find a registered compressor that does.
1193 as.dc = nil
1194 as.decomp = encoding.GetCompressor(ct)
1195 }
1196 } else {
1197 // No compression is used; disable our decompressor.
1198 as.dc = nil
1199 }
1200 // Only initialize this state once per stream.
1201 as.decompSet = true
1202 }
1203 err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
1204 if err != nil {
1205 if err == io.EOF {
1206 if statusErr := as.s.Status().Err(); statusErr != nil {
1207 return statusErr
1208 }
1209 return io.EOF // indicates successful end of stream.
1210 }
1211 return toRPCErr(err)
1212 }
1213
1214 if channelz.IsOn() {
1215 as.t.IncrMsgRecv()
1216 }
1217 if as.desc.ServerStreams {
1218 // Subsequent messages should be received by subsequent RecvMsg calls.
1219 return nil
1220 }
1221
1222 // Special handling for non-server-stream rpcs.
1223 // This recv expects EOF or errors, so we don't collect inPayload.
1224 err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
1225 if err == nil {
1226 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
1227 }
1228 if err == io.EOF {
1229 return as.s.Status().Err() // non-server streaming Recv returns nil on success
1230 }
1231 return toRPCErr(err)
1232}
1233
1234func (as *addrConnStream) finish(err error) {
1235 as.mu.Lock()
1236 if as.finished {
1237 as.mu.Unlock()
1238 return
1239 }
1240 as.finished = true
1241 if err == io.EOF {
1242 // Ending a stream with EOF indicates a success.
1243 err = nil
1244 }
1245 if as.s != nil {
1246 as.t.CloseStream(as.s, err)
1247 }
1248
1249 if err != nil {
1250 as.ac.incrCallsFailed()
1251 } else {
1252 as.ac.incrCallsSucceeded()
1253 }
1254 as.cancel()
1255 as.mu.Unlock()
1256}
1257
1258// ServerStream defines the server-side behavior of a streaming RPC.
1259//
1260// All errors returned from ServerStream methods are compatible with the
1261// status package.
1262type ServerStream interface {
1263 // SetHeader sets the header metadata. It may be called multiple times.
1264 // When call multiple times, all the provided metadata will be merged.
1265 // All the metadata will be sent out when one of the following happens:
1266 // - ServerStream.SendHeader() is called;
1267 // - The first response is sent out;
1268 // - An RPC status is sent out (error or success).
1269 SetHeader(metadata.MD) error
1270 // SendHeader sends the header metadata.
1271 // The provided md and headers set by SetHeader() will be sent.
1272 // It fails if called multiple times.
1273 SendHeader(metadata.MD) error
1274 // SetTrailer sets the trailer metadata which will be sent with the RPC status.
1275 // When called more than once, all the provided metadata will be merged.
1276 SetTrailer(metadata.MD)
1277 // Context returns the context for this stream.
1278 Context() context.Context
1279 // SendMsg sends a message. On error, SendMsg aborts the stream and the
1280 // error is returned directly.
1281 //
1282 // SendMsg blocks until:
1283 // - There is sufficient flow control to schedule m with the transport, or
1284 // - The stream is done, or
1285 // - The stream breaks.
1286 //
1287 // SendMsg does not wait until the message is received by the client. An
1288 // untimely stream closure may result in lost messages.
1289 //
1290 // It is safe to have a goroutine calling SendMsg and another goroutine
1291 // calling RecvMsg on the same stream at the same time, but it is not safe
1292 // to call SendMsg on the same stream in different goroutines.
1293 SendMsg(m interface{}) error
1294 // RecvMsg blocks until it receives a message into m or the stream is
1295 // done. It returns io.EOF when the client has performed a CloseSend. On
1296 // any non-EOF error, the stream is aborted and the error contains the
1297 // RPC status.
1298 //
1299 // It is safe to have a goroutine calling SendMsg and another goroutine
1300 // calling RecvMsg on the same stream at the same time, but it is not
1301 // safe to call RecvMsg on the same stream in different goroutines.
1302 RecvMsg(m interface{}) error
1303}
1304
1305// serverStream implements a server side Stream.
1306type serverStream struct {
1307 ctx context.Context
1308 t transport.ServerTransport
1309 s *transport.Stream
1310 p *parser
1311 codec baseCodec
1312
1313 cp Compressor
1314 dc Decompressor
1315 comp encoding.Compressor
1316 decomp encoding.Compressor
1317
1318 maxReceiveMessageSize int
1319 maxSendMessageSize int
1320 trInfo *traceInfo
1321
1322 statsHandler stats.Handler
1323
1324 binlog *binarylog.MethodLogger
1325 // serverHeaderBinlogged indicates whether server header has been logged. It
1326 // will happen when one of the following two happens: stream.SendHeader(),
1327 // stream.Send().
1328 //
1329 // It's only checked in send and sendHeader, doesn't need to be
1330 // synchronized.
1331 serverHeaderBinlogged bool
1332
1333 mu sync.Mutex // protects trInfo.tr after the service handler runs.
1334}
1335
1336func (ss *serverStream) Context() context.Context {
1337 return ss.ctx
1338}
1339
1340func (ss *serverStream) SetHeader(md metadata.MD) error {
1341 if md.Len() == 0 {
1342 return nil
1343 }
1344 return ss.s.SetHeader(md)
1345}
1346
1347func (ss *serverStream) SendHeader(md metadata.MD) error {
1348 err := ss.t.WriteHeader(ss.s, md)
1349 if ss.binlog != nil && !ss.serverHeaderBinlogged {
1350 h, _ := ss.s.Header()
1351 ss.binlog.Log(&binarylog.ServerHeader{
1352 Header: h,
1353 })
1354 ss.serverHeaderBinlogged = true
1355 }
1356 return err
1357}
1358
1359func (ss *serverStream) SetTrailer(md metadata.MD) {
1360 if md.Len() == 0 {
1361 return
1362 }
1363 ss.s.SetTrailer(md)
1364}
1365
1366func (ss *serverStream) SendMsg(m interface{}) (err error) {
1367 defer func() {
1368 if ss.trInfo != nil {
1369 ss.mu.Lock()
1370 if ss.trInfo.tr != nil {
1371 if err == nil {
1372 ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
1373 } else {
1374 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1375 ss.trInfo.tr.SetError()
1376 }
1377 }
1378 ss.mu.Unlock()
1379 }
1380 if err != nil && err != io.EOF {
1381 st, _ := status.FromError(toRPCErr(err))
1382 ss.t.WriteStatus(ss.s, st)
1383 // Non-user specified status was sent out. This should be an error
1384 // case (as a server side Cancel maybe).
1385 //
1386 // This is not handled specifically now. User will return a final
1387 // status from the service handler, we will log that error instead.
1388 // This behavior is similar to an interceptor.
1389 }
1390 if channelz.IsOn() && err == nil {
1391 ss.t.IncrMsgSent()
1392 }
1393 }()
1394
1395 // load hdr, payload, data
1396 hdr, payload, data, err := prepareMsg(m, ss.codec, ss.cp, ss.comp)
1397 if err != nil {
1398 return err
1399 }
1400
1401 // TODO(dfawley): should we be checking len(data) instead?
1402 if len(payload) > ss.maxSendMessageSize {
1403 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize)
1404 }
1405 if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil {
1406 return toRPCErr(err)
1407 }
1408 if ss.binlog != nil {
1409 if !ss.serverHeaderBinlogged {
1410 h, _ := ss.s.Header()
1411 ss.binlog.Log(&binarylog.ServerHeader{
1412 Header: h,
1413 })
1414 ss.serverHeaderBinlogged = true
1415 }
1416 ss.binlog.Log(&binarylog.ServerMessage{
1417 Message: data,
1418 })
1419 }
1420 if ss.statsHandler != nil {
1421 ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
1422 }
1423 return nil
1424}
1425
1426func (ss *serverStream) RecvMsg(m interface{}) (err error) {
1427 defer func() {
1428 if ss.trInfo != nil {
1429 ss.mu.Lock()
1430 if ss.trInfo.tr != nil {
1431 if err == nil {
1432 ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
1433 } else if err != io.EOF {
1434 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1435 ss.trInfo.tr.SetError()
1436 }
1437 }
1438 ss.mu.Unlock()
1439 }
1440 if err != nil && err != io.EOF {
1441 st, _ := status.FromError(toRPCErr(err))
1442 ss.t.WriteStatus(ss.s, st)
1443 // Non-user specified status was sent out. This should be an error
1444 // case (as a server side Cancel maybe).
1445 //
1446 // This is not handled specifically now. User will return a final
1447 // status from the service handler, we will log that error instead.
1448 // This behavior is similar to an interceptor.
1449 }
1450 if channelz.IsOn() && err == nil {
1451 ss.t.IncrMsgRecv()
1452 }
1453 }()
1454 var payInfo *payloadInfo
1455 if ss.statsHandler != nil || ss.binlog != nil {
1456 payInfo = &payloadInfo{}
1457 }
1458 if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil {
1459 if err == io.EOF {
1460 if ss.binlog != nil {
1461 ss.binlog.Log(&binarylog.ClientHalfClose{})
1462 }
1463 return err
1464 }
1465 if err == io.ErrUnexpectedEOF {
1466 err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
1467 }
1468 return toRPCErr(err)
1469 }
1470 if ss.statsHandler != nil {
1471 ss.statsHandler.HandleRPC(ss.s.Context(), &stats.InPayload{
1472 RecvTime: time.Now(),
1473 Payload: m,
1474 // TODO truncate large payload.
1475 Data: payInfo.uncompressedBytes,
1476 WireLength: payInfo.wireLength,
1477 Length: len(payInfo.uncompressedBytes),
1478 })
1479 }
1480 if ss.binlog != nil {
1481 ss.binlog.Log(&binarylog.ClientMessage{
1482 Message: payInfo.uncompressedBytes,
1483 })
1484 }
1485 return nil
1486}
1487
1488// MethodFromServerStream returns the method string for the input stream.
1489// The returned string is in the format of "/service/method".
1490func MethodFromServerStream(stream ServerStream) (string, bool) {
1491 return Method(stream.Context())
1492}
1493
1494// prepareMsg returns the hdr, payload and data
1495// using the compressors passed or using the
1496// passed preparedmsg
1497func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) {
1498 if preparedMsg, ok := m.(*PreparedMsg); ok {
1499 return preparedMsg.hdr, preparedMsg.payload, preparedMsg.encodedData, nil
1500 }
1501 // The input interface is not a prepared msg.
1502 // Marshal and Compress the data at this point
1503 data, err = encode(codec, m)
1504 if err != nil {
1505 return nil, nil, nil, err
1506 }
1507 compData, err := compress(data, cp, comp)
1508 if err != nil {
1509 return nil, nil, nil, err
1510 }
1511 hdr, payload = msgHeader(data, compData)
1512 return hdr, payload, data, nil
1513}