blob: 0c266d6f9a342736a28b5af4281b01cf11a72edb [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "io"
25 "math"
26 "strconv"
27 "sync"
28 "time"
29
30 "golang.org/x/net/trace"
31 "google.golang.org/grpc/balancer"
32 "google.golang.org/grpc/codes"
33 "google.golang.org/grpc/connectivity"
34 "google.golang.org/grpc/encoding"
35 "google.golang.org/grpc/grpclog"
36 "google.golang.org/grpc/internal/binarylog"
37 "google.golang.org/grpc/internal/channelz"
38 "google.golang.org/grpc/internal/grpcrand"
39 "google.golang.org/grpc/internal/transport"
40 "google.golang.org/grpc/metadata"
41 "google.golang.org/grpc/peer"
42 "google.golang.org/grpc/stats"
43 "google.golang.org/grpc/status"
44)
45
46// StreamHandler defines the handler called by gRPC server to complete the
47// execution of a streaming RPC. If a StreamHandler returns an error, it
48// should be produced by the status package, or else gRPC will use
49// codes.Unknown as the status code and err.Error() as the status message
50// of the RPC.
51type StreamHandler func(srv interface{}, stream ServerStream) error
52
53// StreamDesc represents a streaming RPC service's method specification.
54type StreamDesc struct {
55 StreamName string
56 Handler StreamHandler
57
58 // At least one of these is true.
59 ServerStreams bool
60 ClientStreams bool
61}
62
63// Stream defines the common interface a client or server stream has to satisfy.
64//
65// Deprecated: See ClientStream and ServerStream documentation instead.
66type Stream interface {
67 // Deprecated: See ClientStream and ServerStream documentation instead.
68 Context() context.Context
69 // Deprecated: See ClientStream and ServerStream documentation instead.
70 SendMsg(m interface{}) error
71 // Deprecated: See ClientStream and ServerStream documentation instead.
72 RecvMsg(m interface{}) error
73}
74
75// ClientStream defines the client-side behavior of a streaming RPC.
76//
77// All errors returned from ClientStream methods are compatible with the
78// status package.
79type ClientStream interface {
80 // Header returns the header metadata received from the server if there
81 // is any. It blocks if the metadata is not ready to read.
82 Header() (metadata.MD, error)
83 // Trailer returns the trailer metadata from the server, if there is any.
84 // It must only be called after stream.CloseAndRecv has returned, or
85 // stream.Recv has returned a non-nil error (including io.EOF).
86 Trailer() metadata.MD
87 // CloseSend closes the send direction of the stream. It closes the stream
88 // when non-nil error is met. It is also not safe to call CloseSend
89 // concurrently with SendMsg.
90 CloseSend() error
91 // Context returns the context for this stream.
92 //
93 // It should not be called until after Header or RecvMsg has returned. Once
94 // called, subsequent client-side retries are disabled.
95 Context() context.Context
96 // SendMsg is generally called by generated code. On error, SendMsg aborts
97 // the stream. If the error was generated by the client, the status is
98 // returned directly; otherwise, io.EOF is returned and the status of
99 // the stream may be discovered using RecvMsg.
100 //
101 // SendMsg blocks until:
102 // - There is sufficient flow control to schedule m with the transport, or
103 // - The stream is done, or
104 // - The stream breaks.
105 //
106 // SendMsg does not wait until the message is received by the server. An
107 // untimely stream closure may result in lost messages. To ensure delivery,
108 // users should ensure the RPC completed successfully using RecvMsg.
109 //
110 // It is safe to have a goroutine calling SendMsg and another goroutine
111 // calling RecvMsg on the same stream at the same time, but it is not safe
112 // to call SendMsg on the same stream in different goroutines. It is also
113 // not safe to call CloseSend concurrently with SendMsg.
114 SendMsg(m interface{}) error
115 // RecvMsg blocks until it receives a message into m or the stream is
116 // done. It returns io.EOF when the stream completes successfully. On
117 // any other error, the stream is aborted and the error contains the RPC
118 // status.
119 //
120 // It is safe to have a goroutine calling SendMsg and another goroutine
121 // calling RecvMsg on the same stream at the same time, but it is not
122 // safe to call RecvMsg on the same stream in different goroutines.
123 RecvMsg(m interface{}) error
124}
125
126// NewStream creates a new Stream for the client side. This is typically
127// called by generated code. ctx is used for the lifetime of the stream.
128//
129// To ensure resources are not leaked due to the stream returned, one of the following
130// actions must be performed:
131//
132// 1. Call Close on the ClientConn.
133// 2. Cancel the context provided.
134// 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
135// client-streaming RPC, for instance, might use the helper function
136// CloseAndRecv (note that CloseSend does not Recv, therefore is not
137// guaranteed to release all resources).
138// 4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
139//
140// If none of the above happen, a goroutine and a context will be leaked, and grpc
141// will not call the optionally-configured stats handler with a stats.End message.
142func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
143 // allow interceptor to see all applicable call options, which means those
144 // configured as defaults from dial option as well as per-call options
145 opts = combine(cc.dopts.callOptions, opts)
146
147 if cc.dopts.streamInt != nil {
148 return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
149 }
150 return newClientStream(ctx, desc, cc, method, opts...)
151}
152
153// NewClientStream is a wrapper for ClientConn.NewStream.
154func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
155 return cc.NewStream(ctx, desc, method, opts...)
156}
157
158func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
159 if channelz.IsOn() {
160 cc.incrCallsStarted()
161 defer func() {
162 if err != nil {
163 cc.incrCallsFailed()
164 }
165 }()
166 }
167 c := defaultCallInfo()
168 // Provide an opportunity for the first RPC to see the first service config
169 // provided by the resolver.
170 if err := cc.waitForResolvedAddrs(ctx); err != nil {
171 return nil, err
172 }
173 mc := cc.GetMethodConfig(method)
174 if mc.WaitForReady != nil {
175 c.failFast = !*mc.WaitForReady
176 }
177
178 // Possible context leak:
179 // The cancel function for the child context we create will only be called
180 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
181 // an error is generated by SendMsg.
182 // https://github.com/grpc/grpc-go/issues/1818.
183 var cancel context.CancelFunc
184 if mc.Timeout != nil && *mc.Timeout >= 0 {
185 ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
186 } else {
187 ctx, cancel = context.WithCancel(ctx)
188 }
189 defer func() {
190 if err != nil {
191 cancel()
192 }
193 }()
194
195 for _, o := range opts {
196 if err := o.before(c); err != nil {
197 return nil, toRPCErr(err)
198 }
199 }
200 c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize)
201 c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
202 if err := setCallInfoCodec(c); err != nil {
203 return nil, err
204 }
205
206 callHdr := &transport.CallHdr{
207 Host: cc.authority,
208 Method: method,
209 ContentSubtype: c.contentSubtype,
210 }
211
212 // Set our outgoing compression according to the UseCompressor CallOption, if
213 // set. In that case, also find the compressor from the encoding package.
214 // Otherwise, use the compressor configured by the WithCompressor DialOption,
215 // if set.
216 var cp Compressor
217 var comp encoding.Compressor
218 if ct := c.compressorType; ct != "" {
219 callHdr.SendCompress = ct
220 if ct != encoding.Identity {
221 comp = encoding.GetCompressor(ct)
222 if comp == nil {
223 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
224 }
225 }
226 } else if cc.dopts.cp != nil {
227 callHdr.SendCompress = cc.dopts.cp.Type()
228 cp = cc.dopts.cp
229 }
230 if c.creds != nil {
231 callHdr.Creds = c.creds
232 }
233 var trInfo traceInfo
234 if EnableTracing {
235 trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
236 trInfo.firstLine.client = true
237 if deadline, ok := ctx.Deadline(); ok {
238 trInfo.firstLine.deadline = deadline.Sub(time.Now())
239 }
240 trInfo.tr.LazyLog(&trInfo.firstLine, false)
241 ctx = trace.NewContext(ctx, trInfo.tr)
242 }
243 ctx = newContextWithRPCInfo(ctx, c.failFast)
244 sh := cc.dopts.copts.StatsHandler
245 var beginTime time.Time
246 if sh != nil {
247 ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
248 beginTime = time.Now()
249 begin := &stats.Begin{
250 Client: true,
251 BeginTime: beginTime,
252 FailFast: c.failFast,
253 }
254 sh.HandleRPC(ctx, begin)
255 }
256
257 cs := &clientStream{
258 callHdr: callHdr,
259 ctx: ctx,
260 methodConfig: &mc,
261 opts: opts,
262 callInfo: c,
263 cc: cc,
264 desc: desc,
265 codec: c.codec,
266 cp: cp,
267 comp: comp,
268 cancel: cancel,
269 beginTime: beginTime,
270 firstAttempt: true,
271 }
272 if !cc.dopts.disableRetry {
273 cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
274 }
275 cs.binlog = binarylog.GetMethodLogger(method)
276
277 cs.callInfo.stream = cs
278 // Only this initial attempt has stats/tracing.
279 // TODO(dfawley): move to newAttempt when per-attempt stats are implemented.
280 if err := cs.newAttemptLocked(sh, trInfo); err != nil {
281 cs.finish(err)
282 return nil, err
283 }
284
285 op := func(a *csAttempt) error { return a.newStream() }
286 if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
287 cs.finish(err)
288 return nil, err
289 }
290
291 if cs.binlog != nil {
292 md, _ := metadata.FromOutgoingContext(ctx)
293 logEntry := &binarylog.ClientHeader{
294 OnClientSide: true,
295 Header: md,
296 MethodName: method,
297 Authority: cs.cc.authority,
298 }
299 if deadline, ok := ctx.Deadline(); ok {
300 logEntry.Timeout = deadline.Sub(time.Now())
301 if logEntry.Timeout < 0 {
302 logEntry.Timeout = 0
303 }
304 }
305 cs.binlog.Log(logEntry)
306 }
307
308 if desc != unaryStreamDesc {
309 // Listen on cc and stream contexts to cleanup when the user closes the
310 // ClientConn or cancels the stream context. In all other cases, an error
311 // should already be injected into the recv buffer by the transport, which
312 // the client will eventually receive, and then we will cancel the stream's
313 // context in clientStream.finish.
314 go func() {
315 select {
316 case <-cc.ctx.Done():
317 cs.finish(ErrClientConnClosing)
318 case <-ctx.Done():
319 cs.finish(toRPCErr(ctx.Err()))
320 }
321 }()
322 }
323 return cs, nil
324}
325
326func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo traceInfo) error {
327 cs.attempt = &csAttempt{
328 cs: cs,
329 dc: cs.cc.dopts.dc,
330 statsHandler: sh,
331 trInfo: trInfo,
332 }
333
334 if err := cs.ctx.Err(); err != nil {
335 return toRPCErr(err)
336 }
337 t, done, err := cs.cc.getTransport(cs.ctx, cs.callInfo.failFast, cs.callHdr.Method)
338 if err != nil {
339 return err
340 }
341 cs.attempt.t = t
342 cs.attempt.done = done
343 return nil
344}
345
346func (a *csAttempt) newStream() error {
347 cs := a.cs
348 cs.callHdr.PreviousAttempts = cs.numRetries
349 s, err := a.t.NewStream(cs.ctx, cs.callHdr)
350 if err != nil {
351 return toRPCErr(err)
352 }
353 cs.attempt.s = s
354 cs.attempt.p = &parser{r: s}
355 return nil
356}
357
358// clientStream implements a client side Stream.
359type clientStream struct {
360 callHdr *transport.CallHdr
361 opts []CallOption
362 callInfo *callInfo
363 cc *ClientConn
364 desc *StreamDesc
365
366 codec baseCodec
367 cp Compressor
368 comp encoding.Compressor
369
370 cancel context.CancelFunc // cancels all attempts
371
372 sentLast bool // sent an end stream
373 beginTime time.Time
374
375 methodConfig *MethodConfig
376
377 ctx context.Context // the application's context, wrapped by stats/tracing
378
379 retryThrottler *retryThrottler // The throttler active when the RPC began.
380
381 binlog *binarylog.MethodLogger // Binary logger, can be nil.
382 // serverHeaderBinlogged is a boolean for whether server header has been
383 // logged. Server header will be logged when the first time one of those
384 // happens: stream.Header(), stream.Recv().
385 //
386 // It's only read and used by Recv() and Header(), so it doesn't need to be
387 // synchronized.
388 serverHeaderBinlogged bool
389
390 mu sync.Mutex
391 firstAttempt bool // if true, transparent retry is valid
392 numRetries int // exclusive of transparent retry attempt(s)
393 numRetriesSincePushback int // retries since pushback; to reset backoff
394 finished bool // TODO: replace with atomic cmpxchg or sync.Once?
395 attempt *csAttempt // the active client stream attempt
396 // TODO(hedging): hedging will have multiple attempts simultaneously.
397 committed bool // active attempt committed for retry?
398 buffer []func(a *csAttempt) error // operations to replay on retry
399 bufferSize int // current size of buffer
400}
401
402// csAttempt implements a single transport stream attempt within a
403// clientStream.
404type csAttempt struct {
405 cs *clientStream
406 t transport.ClientTransport
407 s *transport.Stream
408 p *parser
409 done func(balancer.DoneInfo)
410
411 finished bool
412 dc Decompressor
413 decomp encoding.Compressor
414 decompSet bool
415
416 mu sync.Mutex // guards trInfo.tr
417 // trInfo.tr is set when created (if EnableTracing is true),
418 // and cleared when the finish method is called.
419 trInfo traceInfo
420
421 statsHandler stats.Handler
422}
423
424func (cs *clientStream) commitAttemptLocked() {
425 cs.committed = true
426 cs.buffer = nil
427}
428
429func (cs *clientStream) commitAttempt() {
430 cs.mu.Lock()
431 cs.commitAttemptLocked()
432 cs.mu.Unlock()
433}
434
435// shouldRetry returns nil if the RPC should be retried; otherwise it returns
436// the error that should be returned by the operation.
437func (cs *clientStream) shouldRetry(err error) error {
438 if cs.attempt.s == nil && !cs.callInfo.failFast {
439 // In the event of any error from NewStream (attempt.s == nil), we
440 // never attempted to write anything to the wire, so we can retry
441 // indefinitely for non-fail-fast RPCs.
442 return nil
443 }
444 if cs.finished || cs.committed {
445 // RPC is finished or committed; cannot retry.
446 return err
447 }
448 // Wait for the trailers.
449 if cs.attempt.s != nil {
450 <-cs.attempt.s.Done()
451 }
452 if cs.firstAttempt && !cs.callInfo.failFast && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) {
453 // First attempt, wait-for-ready, stream unprocessed: transparently retry.
454 cs.firstAttempt = false
455 return nil
456 }
457 cs.firstAttempt = false
458 if cs.cc.dopts.disableRetry {
459 return err
460 }
461
462 pushback := 0
463 hasPushback := false
464 if cs.attempt.s != nil {
465 if to, toErr := cs.attempt.s.TrailersOnly(); toErr != nil {
466 // Context error; stop now.
467 return toErr
468 } else if !to {
469 return err
470 }
471
472 // TODO(retry): Move down if the spec changes to not check server pushback
473 // before considering this a failure for throttling.
474 sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"]
475 if len(sps) == 1 {
476 var e error
477 if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
478 grpclog.Infof("Server retry pushback specified to abort (%q).", sps[0])
479 cs.retryThrottler.throttle() // This counts as a failure for throttling.
480 return err
481 }
482 hasPushback = true
483 } else if len(sps) > 1 {
484 grpclog.Warningf("Server retry pushback specified multiple values (%q); not retrying.", sps)
485 cs.retryThrottler.throttle() // This counts as a failure for throttling.
486 return err
487 }
488 }
489
490 var code codes.Code
491 if cs.attempt.s != nil {
492 code = cs.attempt.s.Status().Code()
493 } else {
494 code = status.Convert(err).Code()
495 }
496
497 rp := cs.methodConfig.retryPolicy
498 if rp == nil || !rp.retryableStatusCodes[code] {
499 return err
500 }
501
502 // Note: the ordering here is important; we count this as a failure
503 // only if the code matched a retryable code.
504 if cs.retryThrottler.throttle() {
505 return err
506 }
507 if cs.numRetries+1 >= rp.maxAttempts {
508 return err
509 }
510
511 var dur time.Duration
512 if hasPushback {
513 dur = time.Millisecond * time.Duration(pushback)
514 cs.numRetriesSincePushback = 0
515 } else {
516 fact := math.Pow(rp.backoffMultiplier, float64(cs.numRetriesSincePushback))
517 cur := float64(rp.initialBackoff) * fact
518 if max := float64(rp.maxBackoff); cur > max {
519 cur = max
520 }
521 dur = time.Duration(grpcrand.Int63n(int64(cur)))
522 cs.numRetriesSincePushback++
523 }
524
525 // TODO(dfawley): we could eagerly fail here if dur puts us past the
526 // deadline, but unsure if it is worth doing.
527 t := time.NewTimer(dur)
528 select {
529 case <-t.C:
530 cs.numRetries++
531 return nil
532 case <-cs.ctx.Done():
533 t.Stop()
534 return status.FromContextError(cs.ctx.Err()).Err()
535 }
536}
537
538// Returns nil if a retry was performed and succeeded; error otherwise.
539func (cs *clientStream) retryLocked(lastErr error) error {
540 for {
541 cs.attempt.finish(lastErr)
542 if err := cs.shouldRetry(lastErr); err != nil {
543 cs.commitAttemptLocked()
544 return err
545 }
546 if err := cs.newAttemptLocked(nil, traceInfo{}); err != nil {
547 return err
548 }
549 if lastErr = cs.replayBufferLocked(); lastErr == nil {
550 return nil
551 }
552 }
553}
554
555func (cs *clientStream) Context() context.Context {
556 cs.commitAttempt()
557 // No need to lock before using attempt, since we know it is committed and
558 // cannot change.
559 return cs.attempt.s.Context()
560}
561
562func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
563 cs.mu.Lock()
564 for {
565 if cs.committed {
566 cs.mu.Unlock()
567 return op(cs.attempt)
568 }
569 a := cs.attempt
570 cs.mu.Unlock()
571 err := op(a)
572 cs.mu.Lock()
573 if a != cs.attempt {
574 // We started another attempt already.
575 continue
576 }
577 if err == io.EOF {
578 <-a.s.Done()
579 }
580 if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) {
581 onSuccess()
582 cs.mu.Unlock()
583 return err
584 }
585 if err := cs.retryLocked(err); err != nil {
586 cs.mu.Unlock()
587 return err
588 }
589 }
590}
591
592func (cs *clientStream) Header() (metadata.MD, error) {
593 var m metadata.MD
594 err := cs.withRetry(func(a *csAttempt) error {
595 var err error
596 m, err = a.s.Header()
597 return toRPCErr(err)
598 }, cs.commitAttemptLocked)
599 if err != nil {
600 cs.finish(err)
601 return nil, err
602 }
603 if cs.binlog != nil && !cs.serverHeaderBinlogged {
604 // Only log if binary log is on and header has not been logged.
605 logEntry := &binarylog.ServerHeader{
606 OnClientSide: true,
607 Header: m,
608 PeerAddr: nil,
609 }
610 if peer, ok := peer.FromContext(cs.Context()); ok {
611 logEntry.PeerAddr = peer.Addr
612 }
613 cs.binlog.Log(logEntry)
614 cs.serverHeaderBinlogged = true
615 }
616 return m, err
617}
618
619func (cs *clientStream) Trailer() metadata.MD {
620 // On RPC failure, we never need to retry, because usage requires that
621 // RecvMsg() returned a non-nil error before calling this function is valid.
622 // We would have retried earlier if necessary.
623 //
624 // Commit the attempt anyway, just in case users are not following those
625 // directions -- it will prevent races and should not meaningfully impact
626 // performance.
627 cs.commitAttempt()
628 if cs.attempt.s == nil {
629 return nil
630 }
631 return cs.attempt.s.Trailer()
632}
633
634func (cs *clientStream) replayBufferLocked() error {
635 a := cs.attempt
636 for _, f := range cs.buffer {
637 if err := f(a); err != nil {
638 return err
639 }
640 }
641 return nil
642}
643
644func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error) {
645 // Note: we still will buffer if retry is disabled (for transparent retries).
646 if cs.committed {
647 return
648 }
649 cs.bufferSize += sz
650 if cs.bufferSize > cs.callInfo.maxRetryRPCBufferSize {
651 cs.commitAttemptLocked()
652 return
653 }
654 cs.buffer = append(cs.buffer, op)
655}
656
657func (cs *clientStream) SendMsg(m interface{}) (err error) {
658 defer func() {
659 if err != nil && err != io.EOF {
660 // Call finish on the client stream for errors generated by this SendMsg
661 // call, as these indicate problems created by this client. (Transport
662 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
663 // error will be returned from RecvMsg eventually in that case, or be
664 // retried.)
665 cs.finish(err)
666 }
667 }()
668 if cs.sentLast {
669 return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
670 }
671 if !cs.desc.ClientStreams {
672 cs.sentLast = true
673 }
674 data, err := encode(cs.codec, m)
675 if err != nil {
676 return err
677 }
678 compData, err := compress(data, cs.cp, cs.comp)
679 if err != nil {
680 return err
681 }
682 hdr, payload := msgHeader(data, compData)
683 // TODO(dfawley): should we be checking len(data) instead?
684 if len(payload) > *cs.callInfo.maxSendMessageSize {
685 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
686 }
687 msgBytes := data // Store the pointer before setting to nil. For binary logging.
688 op := func(a *csAttempt) error {
689 err := a.sendMsg(m, hdr, payload, data)
690 // nil out the message and uncomp when replaying; they are only needed for
691 // stats which is disabled for subsequent attempts.
692 m, data = nil, nil
693 return err
694 }
695 err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
696 if cs.binlog != nil && err == nil {
697 cs.binlog.Log(&binarylog.ClientMessage{
698 OnClientSide: true,
699 Message: msgBytes,
700 })
701 }
702 return
703}
704
705func (cs *clientStream) RecvMsg(m interface{}) error {
706 if cs.binlog != nil && !cs.serverHeaderBinlogged {
707 // Call Header() to binary log header if it's not already logged.
708 cs.Header()
709 }
710 var recvInfo *payloadInfo
711 if cs.binlog != nil {
712 recvInfo = &payloadInfo{}
713 }
714 err := cs.withRetry(func(a *csAttempt) error {
715 return a.recvMsg(m, recvInfo)
716 }, cs.commitAttemptLocked)
717 if cs.binlog != nil && err == nil {
718 cs.binlog.Log(&binarylog.ServerMessage{
719 OnClientSide: true,
720 Message: recvInfo.uncompressedBytes,
721 })
722 }
723 if err != nil || !cs.desc.ServerStreams {
724 // err != nil or non-server-streaming indicates end of stream.
725 cs.finish(err)
726
727 if cs.binlog != nil {
728 // finish will not log Trailer. Log Trailer here.
729 logEntry := &binarylog.ServerTrailer{
730 OnClientSide: true,
731 Trailer: cs.Trailer(),
732 Err: err,
733 }
734 if logEntry.Err == io.EOF {
735 logEntry.Err = nil
736 }
737 if peer, ok := peer.FromContext(cs.Context()); ok {
738 logEntry.PeerAddr = peer.Addr
739 }
740 cs.binlog.Log(logEntry)
741 }
742 }
743 return err
744}
745
746func (cs *clientStream) CloseSend() error {
747 if cs.sentLast {
748 // TODO: return an error and finish the stream instead, due to API misuse?
749 return nil
750 }
751 cs.sentLast = true
752 op := func(a *csAttempt) error {
753 a.t.Write(a.s, nil, nil, &transport.Options{Last: true})
754 // Always return nil; io.EOF is the only error that might make sense
755 // instead, but there is no need to signal the client to call RecvMsg
756 // as the only use left for the stream after CloseSend is to call
757 // RecvMsg. This also matches historical behavior.
758 return nil
759 }
760 cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) })
761 if cs.binlog != nil {
762 cs.binlog.Log(&binarylog.ClientHalfClose{
763 OnClientSide: true,
764 })
765 }
766 // We never returned an error here for reasons.
767 return nil
768}
769
770func (cs *clientStream) finish(err error) {
771 if err == io.EOF {
772 // Ending a stream with EOF indicates a success.
773 err = nil
774 }
775 cs.mu.Lock()
776 if cs.finished {
777 cs.mu.Unlock()
778 return
779 }
780 cs.finished = true
781 cs.commitAttemptLocked()
782 cs.mu.Unlock()
783 // For binary logging. only log cancel in finish (could be caused by RPC ctx
784 // canceled or ClientConn closed). Trailer will be logged in RecvMsg.
785 //
786 // Only one of cancel or trailer needs to be logged. In the cases where
787 // users don't call RecvMsg, users must have already canceled the RPC.
788 if cs.binlog != nil && status.Code(err) == codes.Canceled {
789 cs.binlog.Log(&binarylog.Cancel{
790 OnClientSide: true,
791 })
792 }
793 if err == nil {
794 cs.retryThrottler.successfulRPC()
795 }
796 if channelz.IsOn() {
797 if err != nil {
798 cs.cc.incrCallsFailed()
799 } else {
800 cs.cc.incrCallsSucceeded()
801 }
802 }
803 if cs.attempt != nil {
804 cs.attempt.finish(err)
805 }
806 // after functions all rely upon having a stream.
807 if cs.attempt.s != nil {
808 for _, o := range cs.opts {
809 o.after(cs.callInfo)
810 }
811 }
812 cs.cancel()
813}
814
815func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
816 cs := a.cs
817 if EnableTracing {
818 a.mu.Lock()
819 if a.trInfo.tr != nil {
820 a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
821 }
822 a.mu.Unlock()
823 }
824 if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
825 if !cs.desc.ClientStreams {
826 // For non-client-streaming RPCs, we return nil instead of EOF on error
827 // because the generated code requires it. finish is not called; RecvMsg()
828 // will call it with the stream's status independently.
829 return nil
830 }
831 return io.EOF
832 }
833 if a.statsHandler != nil {
834 a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now()))
835 }
836 if channelz.IsOn() {
837 a.t.IncrMsgSent()
838 }
839 return nil
840}
841
842func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
843 cs := a.cs
844 if a.statsHandler != nil && payInfo == nil {
845 payInfo = &payloadInfo{}
846 }
847
848 if !a.decompSet {
849 // Block until we receive headers containing received message encoding.
850 if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity {
851 if a.dc == nil || a.dc.Type() != ct {
852 // No configured decompressor, or it does not match the incoming
853 // message encoding; attempt to find a registered compressor that does.
854 a.dc = nil
855 a.decomp = encoding.GetCompressor(ct)
856 }
857 } else {
858 // No compression is used; disable our decompressor.
859 a.dc = nil
860 }
861 // Only initialize this state once per stream.
862 a.decompSet = true
863 }
864 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
865 if err != nil {
866 if err == io.EOF {
867 if statusErr := a.s.Status().Err(); statusErr != nil {
868 return statusErr
869 }
870 return io.EOF // indicates successful end of stream.
871 }
872 return toRPCErr(err)
873 }
874 if EnableTracing {
875 a.mu.Lock()
876 if a.trInfo.tr != nil {
877 a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
878 }
879 a.mu.Unlock()
880 }
881 if a.statsHandler != nil {
882 a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{
883 Client: true,
884 RecvTime: time.Now(),
885 Payload: m,
886 // TODO truncate large payload.
887 Data: payInfo.uncompressedBytes,
888 Length: len(payInfo.uncompressedBytes),
889 })
890 }
891 if channelz.IsOn() {
892 a.t.IncrMsgRecv()
893 }
894 if cs.desc.ServerStreams {
895 // Subsequent messages should be received by subsequent RecvMsg calls.
896 return nil
897 }
898 // Special handling for non-server-stream rpcs.
899 // This recv expects EOF or errors, so we don't collect inPayload.
900 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp)
901 if err == nil {
902 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
903 }
904 if err == io.EOF {
905 return a.s.Status().Err() // non-server streaming Recv returns nil on success
906 }
907 return toRPCErr(err)
908}
909
910func (a *csAttempt) finish(err error) {
911 a.mu.Lock()
912 if a.finished {
913 a.mu.Unlock()
914 return
915 }
916 a.finished = true
917 if err == io.EOF {
918 // Ending a stream with EOF indicates a success.
919 err = nil
920 }
921 if a.s != nil {
922 a.t.CloseStream(a.s, err)
923 }
924
925 if a.done != nil {
926 br := false
927 var tr metadata.MD
928 if a.s != nil {
929 br = a.s.BytesReceived()
930 tr = a.s.Trailer()
931 }
932 a.done(balancer.DoneInfo{
933 Err: err,
934 Trailer: tr,
935 BytesSent: a.s != nil,
936 BytesReceived: br,
937 })
938 }
939 if a.statsHandler != nil {
940 end := &stats.End{
941 Client: true,
942 BeginTime: a.cs.beginTime,
943 EndTime: time.Now(),
944 Error: err,
945 }
946 a.statsHandler.HandleRPC(a.cs.ctx, end)
947 }
948 if a.trInfo.tr != nil {
949 if err == nil {
950 a.trInfo.tr.LazyPrintf("RPC: [OK]")
951 } else {
952 a.trInfo.tr.LazyPrintf("RPC: [%v]", err)
953 a.trInfo.tr.SetError()
954 }
955 a.trInfo.tr.Finish()
956 a.trInfo.tr = nil
957 }
958 a.mu.Unlock()
959}
960
961func (ac *addrConn) newClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, opts ...CallOption) (_ ClientStream, err error) {
962 ac.mu.Lock()
963 if ac.transport != t {
964 ac.mu.Unlock()
965 return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
966 }
967 // transition to CONNECTING state when an attempt starts
968 if ac.state != connectivity.Connecting {
969 ac.updateConnectivityState(connectivity.Connecting)
970 ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
971 }
972 ac.mu.Unlock()
973
974 if t == nil {
975 // TODO: return RPC error here?
976 return nil, errors.New("transport provided is nil")
977 }
978 // defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct.
979 c := &callInfo{}
980
981 for _, o := range opts {
982 if err := o.before(c); err != nil {
983 return nil, toRPCErr(err)
984 }
985 }
986 c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
987 c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
988
989 // Possible context leak:
990 // The cancel function for the child context we create will only be called
991 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
992 // an error is generated by SendMsg.
993 // https://github.com/grpc/grpc-go/issues/1818.
994 ctx, cancel := context.WithCancel(ctx)
995 defer func() {
996 if err != nil {
997 cancel()
998 }
999 }()
1000
1001 if err := setCallInfoCodec(c); err != nil {
1002 return nil, err
1003 }
1004
1005 callHdr := &transport.CallHdr{
1006 Host: ac.cc.authority,
1007 Method: method,
1008 ContentSubtype: c.contentSubtype,
1009 }
1010
1011 // Set our outgoing compression according to the UseCompressor CallOption, if
1012 // set. In that case, also find the compressor from the encoding package.
1013 // Otherwise, use the compressor configured by the WithCompressor DialOption,
1014 // if set.
1015 var cp Compressor
1016 var comp encoding.Compressor
1017 if ct := c.compressorType; ct != "" {
1018 callHdr.SendCompress = ct
1019 if ct != encoding.Identity {
1020 comp = encoding.GetCompressor(ct)
1021 if comp == nil {
1022 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
1023 }
1024 }
1025 } else if ac.cc.dopts.cp != nil {
1026 callHdr.SendCompress = ac.cc.dopts.cp.Type()
1027 cp = ac.cc.dopts.cp
1028 }
1029 if c.creds != nil {
1030 callHdr.Creds = c.creds
1031 }
1032
1033 as := &addrConnStream{
1034 callHdr: callHdr,
1035 ac: ac,
1036 ctx: ctx,
1037 cancel: cancel,
1038 opts: opts,
1039 callInfo: c,
1040 desc: desc,
1041 codec: c.codec,
1042 cp: cp,
1043 comp: comp,
1044 t: t,
1045 }
1046
1047 as.callInfo.stream = as
1048 s, err := as.t.NewStream(as.ctx, as.callHdr)
1049 if err != nil {
1050 err = toRPCErr(err)
1051 return nil, err
1052 }
1053 as.s = s
1054 as.p = &parser{r: s}
1055 ac.incrCallsStarted()
1056 if desc != unaryStreamDesc {
1057 // Listen on cc and stream contexts to cleanup when the user closes the
1058 // ClientConn or cancels the stream context. In all other cases, an error
1059 // should already be injected into the recv buffer by the transport, which
1060 // the client will eventually receive, and then we will cancel the stream's
1061 // context in clientStream.finish.
1062 go func() {
1063 select {
1064 case <-ac.ctx.Done():
1065 as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
1066 case <-ctx.Done():
1067 as.finish(toRPCErr(ctx.Err()))
1068 }
1069 }()
1070 }
1071 return as, nil
1072}
1073
1074type addrConnStream struct {
1075 s *transport.Stream
1076 ac *addrConn
1077 callHdr *transport.CallHdr
1078 cancel context.CancelFunc
1079 opts []CallOption
1080 callInfo *callInfo
1081 t transport.ClientTransport
1082 ctx context.Context
1083 sentLast bool
1084 desc *StreamDesc
1085 codec baseCodec
1086 cp Compressor
1087 comp encoding.Compressor
1088 decompSet bool
1089 dc Decompressor
1090 decomp encoding.Compressor
1091 p *parser
1092 done func(balancer.DoneInfo)
1093 mu sync.Mutex
1094 finished bool
1095}
1096
1097func (as *addrConnStream) Header() (metadata.MD, error) {
1098 m, err := as.s.Header()
1099 if err != nil {
1100 as.finish(toRPCErr(err))
1101 }
1102 return m, err
1103}
1104
1105func (as *addrConnStream) Trailer() metadata.MD {
1106 return as.s.Trailer()
1107}
1108
1109func (as *addrConnStream) CloseSend() error {
1110 if as.sentLast {
1111 // TODO: return an error and finish the stream instead, due to API misuse?
1112 return nil
1113 }
1114 as.sentLast = true
1115
1116 as.t.Write(as.s, nil, nil, &transport.Options{Last: true})
1117 // Always return nil; io.EOF is the only error that might make sense
1118 // instead, but there is no need to signal the client to call RecvMsg
1119 // as the only use left for the stream after CloseSend is to call
1120 // RecvMsg. This also matches historical behavior.
1121 return nil
1122}
1123
1124func (as *addrConnStream) Context() context.Context {
1125 return as.s.Context()
1126}
1127
1128func (as *addrConnStream) SendMsg(m interface{}) (err error) {
1129 defer func() {
1130 if err != nil && err != io.EOF {
1131 // Call finish on the client stream for errors generated by this SendMsg
1132 // call, as these indicate problems created by this client. (Transport
1133 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
1134 // error will be returned from RecvMsg eventually in that case, or be
1135 // retried.)
1136 as.finish(err)
1137 }
1138 }()
1139 if as.sentLast {
1140 return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
1141 }
1142 if !as.desc.ClientStreams {
1143 as.sentLast = true
1144 }
1145 data, err := encode(as.codec, m)
1146 if err != nil {
1147 return err
1148 }
1149 compData, err := compress(data, as.cp, as.comp)
1150 if err != nil {
1151 return err
1152 }
1153 hdr, payld := msgHeader(data, compData)
1154 // TODO(dfawley): should we be checking len(data) instead?
1155 if len(payld) > *as.callInfo.maxSendMessageSize {
1156 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize)
1157 }
1158
1159 if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams}); err != nil {
1160 if !as.desc.ClientStreams {
1161 // For non-client-streaming RPCs, we return nil instead of EOF on error
1162 // because the generated code requires it. finish is not called; RecvMsg()
1163 // will call it with the stream's status independently.
1164 return nil
1165 }
1166 return io.EOF
1167 }
1168
1169 if channelz.IsOn() {
1170 as.t.IncrMsgSent()
1171 }
1172 return nil
1173}
1174
1175func (as *addrConnStream) RecvMsg(m interface{}) (err error) {
1176 defer func() {
1177 if err != nil || !as.desc.ServerStreams {
1178 // err != nil or non-server-streaming indicates end of stream.
1179 as.finish(err)
1180 }
1181 }()
1182
1183 if !as.decompSet {
1184 // Block until we receive headers containing received message encoding.
1185 if ct := as.s.RecvCompress(); ct != "" && ct != encoding.Identity {
1186 if as.dc == nil || as.dc.Type() != ct {
1187 // No configured decompressor, or it does not match the incoming
1188 // message encoding; attempt to find a registered compressor that does.
1189 as.dc = nil
1190 as.decomp = encoding.GetCompressor(ct)
1191 }
1192 } else {
1193 // No compression is used; disable our decompressor.
1194 as.dc = nil
1195 }
1196 // Only initialize this state once per stream.
1197 as.decompSet = true
1198 }
1199 err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
1200 if err != nil {
1201 if err == io.EOF {
1202 if statusErr := as.s.Status().Err(); statusErr != nil {
1203 return statusErr
1204 }
1205 return io.EOF // indicates successful end of stream.
1206 }
1207 return toRPCErr(err)
1208 }
1209
1210 if channelz.IsOn() {
1211 as.t.IncrMsgRecv()
1212 }
1213 if as.desc.ServerStreams {
1214 // Subsequent messages should be received by subsequent RecvMsg calls.
1215 return nil
1216 }
1217
1218 // Special handling for non-server-stream rpcs.
1219 // This recv expects EOF or errors, so we don't collect inPayload.
1220 err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
1221 if err == nil {
1222 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
1223 }
1224 if err == io.EOF {
1225 return as.s.Status().Err() // non-server streaming Recv returns nil on success
1226 }
1227 return toRPCErr(err)
1228}
1229
1230func (as *addrConnStream) finish(err error) {
1231 as.mu.Lock()
1232 if as.finished {
1233 as.mu.Unlock()
1234 return
1235 }
1236 as.finished = true
1237 if err == io.EOF {
1238 // Ending a stream with EOF indicates a success.
1239 err = nil
1240 }
1241 if as.s != nil {
1242 as.t.CloseStream(as.s, err)
1243 }
1244
1245 if err != nil {
1246 as.ac.incrCallsFailed()
1247 } else {
1248 as.ac.incrCallsSucceeded()
1249 }
1250 as.cancel()
1251 as.mu.Unlock()
1252}
1253
1254// ServerStream defines the server-side behavior of a streaming RPC.
1255//
1256// All errors returned from ServerStream methods are compatible with the
1257// status package.
1258type ServerStream interface {
1259 // SetHeader sets the header metadata. It may be called multiple times.
1260 // When call multiple times, all the provided metadata will be merged.
1261 // All the metadata will be sent out when one of the following happens:
1262 // - ServerStream.SendHeader() is called;
1263 // - The first response is sent out;
1264 // - An RPC status is sent out (error or success).
1265 SetHeader(metadata.MD) error
1266 // SendHeader sends the header metadata.
1267 // The provided md and headers set by SetHeader() will be sent.
1268 // It fails if called multiple times.
1269 SendHeader(metadata.MD) error
1270 // SetTrailer sets the trailer metadata which will be sent with the RPC status.
1271 // When called more than once, all the provided metadata will be merged.
1272 SetTrailer(metadata.MD)
1273 // Context returns the context for this stream.
1274 Context() context.Context
1275 // SendMsg sends a message. On error, SendMsg aborts the stream and the
1276 // error is returned directly.
1277 //
1278 // SendMsg blocks until:
1279 // - There is sufficient flow control to schedule m with the transport, or
1280 // - The stream is done, or
1281 // - The stream breaks.
1282 //
1283 // SendMsg does not wait until the message is received by the client. An
1284 // untimely stream closure may result in lost messages.
1285 //
1286 // It is safe to have a goroutine calling SendMsg and another goroutine
1287 // calling RecvMsg on the same stream at the same time, but it is not safe
1288 // to call SendMsg on the same stream in different goroutines.
1289 SendMsg(m interface{}) error
1290 // RecvMsg blocks until it receives a message into m or the stream is
1291 // done. It returns io.EOF when the client has performed a CloseSend. On
1292 // any non-EOF error, the stream is aborted and the error contains the
1293 // RPC status.
1294 //
1295 // It is safe to have a goroutine calling SendMsg and another goroutine
1296 // calling RecvMsg on the same stream at the same time, but it is not
1297 // safe to call RecvMsg on the same stream in different goroutines.
1298 RecvMsg(m interface{}) error
1299}
1300
1301// serverStream implements a server side Stream.
1302type serverStream struct {
1303 ctx context.Context
1304 t transport.ServerTransport
1305 s *transport.Stream
1306 p *parser
1307 codec baseCodec
1308
1309 cp Compressor
1310 dc Decompressor
1311 comp encoding.Compressor
1312 decomp encoding.Compressor
1313
1314 maxReceiveMessageSize int
1315 maxSendMessageSize int
1316 trInfo *traceInfo
1317
1318 statsHandler stats.Handler
1319
1320 binlog *binarylog.MethodLogger
1321 // serverHeaderBinlogged indicates whether server header has been logged. It
1322 // will happen when one of the following two happens: stream.SendHeader(),
1323 // stream.Send().
1324 //
1325 // It's only checked in send and sendHeader, doesn't need to be
1326 // synchronized.
1327 serverHeaderBinlogged bool
1328
1329 mu sync.Mutex // protects trInfo.tr after the service handler runs.
1330}
1331
1332func (ss *serverStream) Context() context.Context {
1333 return ss.ctx
1334}
1335
1336func (ss *serverStream) SetHeader(md metadata.MD) error {
1337 if md.Len() == 0 {
1338 return nil
1339 }
1340 return ss.s.SetHeader(md)
1341}
1342
1343func (ss *serverStream) SendHeader(md metadata.MD) error {
1344 err := ss.t.WriteHeader(ss.s, md)
1345 if ss.binlog != nil && !ss.serverHeaderBinlogged {
1346 h, _ := ss.s.Header()
1347 ss.binlog.Log(&binarylog.ServerHeader{
1348 Header: h,
1349 })
1350 ss.serverHeaderBinlogged = true
1351 }
1352 return err
1353}
1354
1355func (ss *serverStream) SetTrailer(md metadata.MD) {
1356 if md.Len() == 0 {
1357 return
1358 }
1359 ss.s.SetTrailer(md)
1360}
1361
1362func (ss *serverStream) SendMsg(m interface{}) (err error) {
1363 defer func() {
1364 if ss.trInfo != nil {
1365 ss.mu.Lock()
1366 if ss.trInfo.tr != nil {
1367 if err == nil {
1368 ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
1369 } else {
1370 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1371 ss.trInfo.tr.SetError()
1372 }
1373 }
1374 ss.mu.Unlock()
1375 }
1376 if err != nil && err != io.EOF {
1377 st, _ := status.FromError(toRPCErr(err))
1378 ss.t.WriteStatus(ss.s, st)
1379 // Non-user specified status was sent out. This should be an error
1380 // case (as a server side Cancel maybe).
1381 //
1382 // This is not handled specifically now. User will return a final
1383 // status from the service handler, we will log that error instead.
1384 // This behavior is similar to an interceptor.
1385 }
1386 if channelz.IsOn() && err == nil {
1387 ss.t.IncrMsgSent()
1388 }
1389 }()
1390 data, err := encode(ss.codec, m)
1391 if err != nil {
1392 return err
1393 }
1394 compData, err := compress(data, ss.cp, ss.comp)
1395 if err != nil {
1396 return err
1397 }
1398 hdr, payload := msgHeader(data, compData)
1399 // TODO(dfawley): should we be checking len(data) instead?
1400 if len(payload) > ss.maxSendMessageSize {
1401 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize)
1402 }
1403 if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil {
1404 return toRPCErr(err)
1405 }
1406 if ss.binlog != nil {
1407 if !ss.serverHeaderBinlogged {
1408 h, _ := ss.s.Header()
1409 ss.binlog.Log(&binarylog.ServerHeader{
1410 Header: h,
1411 })
1412 ss.serverHeaderBinlogged = true
1413 }
1414 ss.binlog.Log(&binarylog.ServerMessage{
1415 Message: data,
1416 })
1417 }
1418 if ss.statsHandler != nil {
1419 ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
1420 }
1421 return nil
1422}
1423
1424func (ss *serverStream) RecvMsg(m interface{}) (err error) {
1425 defer func() {
1426 if ss.trInfo != nil {
1427 ss.mu.Lock()
1428 if ss.trInfo.tr != nil {
1429 if err == nil {
1430 ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
1431 } else if err != io.EOF {
1432 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1433 ss.trInfo.tr.SetError()
1434 }
1435 }
1436 ss.mu.Unlock()
1437 }
1438 if err != nil && err != io.EOF {
1439 st, _ := status.FromError(toRPCErr(err))
1440 ss.t.WriteStatus(ss.s, st)
1441 // Non-user specified status was sent out. This should be an error
1442 // case (as a server side Cancel maybe).
1443 //
1444 // This is not handled specifically now. User will return a final
1445 // status from the service handler, we will log that error instead.
1446 // This behavior is similar to an interceptor.
1447 }
1448 if channelz.IsOn() && err == nil {
1449 ss.t.IncrMsgRecv()
1450 }
1451 }()
1452 var payInfo *payloadInfo
1453 if ss.statsHandler != nil || ss.binlog != nil {
1454 payInfo = &payloadInfo{}
1455 }
1456 if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil {
1457 if err == io.EOF {
1458 if ss.binlog != nil {
1459 ss.binlog.Log(&binarylog.ClientHalfClose{})
1460 }
1461 return err
1462 }
1463 if err == io.ErrUnexpectedEOF {
1464 err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
1465 }
1466 return toRPCErr(err)
1467 }
1468 if ss.statsHandler != nil {
1469 ss.statsHandler.HandleRPC(ss.s.Context(), &stats.InPayload{
1470 RecvTime: time.Now(),
1471 Payload: m,
1472 // TODO truncate large payload.
1473 Data: payInfo.uncompressedBytes,
1474 Length: len(payInfo.uncompressedBytes),
1475 })
1476 }
1477 if ss.binlog != nil {
1478 ss.binlog.Log(&binarylog.ClientMessage{
1479 Message: payInfo.uncompressedBytes,
1480 })
1481 }
1482 return nil
1483}
1484
1485// MethodFromServerStream returns the method string for the input stream.
1486// The returned string is in the format of "/service/method".
1487func MethodFromServerStream(stream ServerStream) (string, bool) {
1488 return Method(stream.Context())
1489}