blob: bb99940e36fe21ed0fbd56210e0581959558bd11 [file] [log] [blame]
Prince Pereirac1c21d62021-04-22 08:38:15 +00001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "io"
25 "math"
26 "strconv"
27 "sync"
28 "time"
29
30 "golang.org/x/net/trace"
31 "google.golang.org/grpc/balancer"
32 "google.golang.org/grpc/codes"
33 "google.golang.org/grpc/encoding"
34 "google.golang.org/grpc/grpclog"
35 "google.golang.org/grpc/internal/balancerload"
36 "google.golang.org/grpc/internal/binarylog"
37 "google.golang.org/grpc/internal/channelz"
38 "google.golang.org/grpc/internal/grpcrand"
39 "google.golang.org/grpc/internal/transport"
40 "google.golang.org/grpc/metadata"
41 "google.golang.org/grpc/peer"
42 "google.golang.org/grpc/stats"
43 "google.golang.org/grpc/status"
44)
45
46// StreamHandler defines the handler called by gRPC server to complete the
47// execution of a streaming RPC. If a StreamHandler returns an error, it
48// should be produced by the status package, or else gRPC will use
49// codes.Unknown as the status code and err.Error() as the status message
50// of the RPC.
51type StreamHandler func(srv interface{}, stream ServerStream) error
52
53// StreamDesc represents a streaming RPC service's method specification.
54type StreamDesc struct {
55 StreamName string
56 Handler StreamHandler
57
58 // At least one of these is true.
59 ServerStreams bool
60 ClientStreams bool
61}
62
63// Stream defines the common interface a client or server stream has to satisfy.
64//
65// Deprecated: See ClientStream and ServerStream documentation instead.
66type Stream interface {
67 // Deprecated: See ClientStream and ServerStream documentation instead.
68 Context() context.Context
69 // Deprecated: See ClientStream and ServerStream documentation instead.
70 SendMsg(m interface{}) error
71 // Deprecated: See ClientStream and ServerStream documentation instead.
72 RecvMsg(m interface{}) error
73}
74
75// ClientStream defines the client-side behavior of a streaming RPC.
76//
77// All errors returned from ClientStream methods are compatible with the
78// status package.
79type ClientStream interface {
80 // Header returns the header metadata received from the server if there
81 // is any. It blocks if the metadata is not ready to read.
82 Header() (metadata.MD, error)
83 // Trailer returns the trailer metadata from the server, if there is any.
84 // It must only be called after stream.CloseAndRecv has returned, or
85 // stream.Recv has returned a non-nil error (including io.EOF).
86 Trailer() metadata.MD
87 // CloseSend closes the send direction of the stream. It closes the stream
88 // when non-nil error is met. It is also not safe to call CloseSend
89 // concurrently with SendMsg.
90 CloseSend() error
91 // Context returns the context for this stream.
92 //
93 // It should not be called until after Header or RecvMsg has returned. Once
94 // called, subsequent client-side retries are disabled.
95 Context() context.Context
96 // SendMsg is generally called by generated code. On error, SendMsg aborts
97 // the stream. If the error was generated by the client, the status is
98 // returned directly; otherwise, io.EOF is returned and the status of
99 // the stream may be discovered using RecvMsg.
100 //
101 // SendMsg blocks until:
102 // - There is sufficient flow control to schedule m with the transport, or
103 // - The stream is done, or
104 // - The stream breaks.
105 //
106 // SendMsg does not wait until the message is received by the server. An
107 // untimely stream closure may result in lost messages. To ensure delivery,
108 // users should ensure the RPC completed successfully using RecvMsg.
109 //
110 // It is safe to have a goroutine calling SendMsg and another goroutine
111 // calling RecvMsg on the same stream at the same time, but it is not safe
112 // to call SendMsg on the same stream in different goroutines. It is also
113 // not safe to call CloseSend concurrently with SendMsg.
114 SendMsg(m interface{}) error
115 // RecvMsg blocks until it receives a message into m or the stream is
116 // done. It returns io.EOF when the stream completes successfully. On
117 // any other error, the stream is aborted and the error contains the RPC
118 // status.
119 //
120 // It is safe to have a goroutine calling SendMsg and another goroutine
121 // calling RecvMsg on the same stream at the same time, but it is not
122 // safe to call RecvMsg on the same stream in different goroutines.
123 RecvMsg(m interface{}) error
124}
125
126// NewStream creates a new Stream for the client side. This is typically
127// called by generated code. ctx is used for the lifetime of the stream.
128//
129// To ensure resources are not leaked due to the stream returned, one of the following
130// actions must be performed:
131//
132// 1. Call Close on the ClientConn.
133// 2. Cancel the context provided.
134// 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
135// client-streaming RPC, for instance, might use the helper function
136// CloseAndRecv (note that CloseSend does not Recv, therefore is not
137// guaranteed to release all resources).
138// 4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
139//
140// If none of the above happen, a goroutine and a context will be leaked, and grpc
141// will not call the optionally-configured stats handler with a stats.End message.
142func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
143 // allow interceptor to see all applicable call options, which means those
144 // configured as defaults from dial option as well as per-call options
145 opts = combine(cc.dopts.callOptions, opts)
146
147 if cc.dopts.streamInt != nil {
148 return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
149 }
150 return newClientStream(ctx, desc, cc, method, opts...)
151}
152
153// NewClientStream is a wrapper for ClientConn.NewStream.
154func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
155 return cc.NewStream(ctx, desc, method, opts...)
156}
157
158func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
159 if channelz.IsOn() {
160 cc.incrCallsStarted()
161 defer func() {
162 if err != nil {
163 cc.incrCallsFailed()
164 }
165 }()
166 }
167 c := defaultCallInfo()
168 // Provide an opportunity for the first RPC to see the first service config
169 // provided by the resolver.
170 if err := cc.waitForResolvedAddrs(ctx); err != nil {
171 return nil, err
172 }
173 mc := cc.GetMethodConfig(method)
174 if mc.WaitForReady != nil {
175 c.failFast = !*mc.WaitForReady
176 }
177
178 // Possible context leak:
179 // The cancel function for the child context we create will only be called
180 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
181 // an error is generated by SendMsg.
182 // https://github.com/grpc/grpc-go/issues/1818.
183 var cancel context.CancelFunc
184 if mc.Timeout != nil && *mc.Timeout >= 0 {
185 ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
186 } else {
187 ctx, cancel = context.WithCancel(ctx)
188 }
189 defer func() {
190 if err != nil {
191 cancel()
192 }
193 }()
194
195 for _, o := range opts {
196 if err := o.before(c); err != nil {
197 return nil, toRPCErr(err)
198 }
199 }
200 c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize)
201 c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
202 if err := setCallInfoCodec(c); err != nil {
203 return nil, err
204 }
205
206 callHdr := &transport.CallHdr{
207 Host: cc.authority,
208 Method: method,
209 ContentSubtype: c.contentSubtype,
210 }
211
212 // Set our outgoing compression according to the UseCompressor CallOption, if
213 // set. In that case, also find the compressor from the encoding package.
214 // Otherwise, use the compressor configured by the WithCompressor DialOption,
215 // if set.
216 var cp Compressor
217 var comp encoding.Compressor
218 if ct := c.compressorType; ct != "" {
219 callHdr.SendCompress = ct
220 if ct != encoding.Identity {
221 comp = encoding.GetCompressor(ct)
222 if comp == nil {
223 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
224 }
225 }
226 } else if cc.dopts.cp != nil {
227 callHdr.SendCompress = cc.dopts.cp.Type()
228 cp = cc.dopts.cp
229 }
230 if c.creds != nil {
231 callHdr.Creds = c.creds
232 }
233 var trInfo *traceInfo
234 if EnableTracing {
235 trInfo = &traceInfo{
236 tr: trace.New("grpc.Sent."+methodFamily(method), method),
237 firstLine: firstLine{
238 client: true,
239 },
240 }
241 if deadline, ok := ctx.Deadline(); ok {
242 trInfo.firstLine.deadline = time.Until(deadline)
243 }
244 trInfo.tr.LazyLog(&trInfo.firstLine, false)
245 ctx = trace.NewContext(ctx, trInfo.tr)
246 }
247 ctx = newContextWithRPCInfo(ctx, c.failFast, c.codec, cp, comp)
248 sh := cc.dopts.copts.StatsHandler
249 var beginTime time.Time
250 if sh != nil {
251 ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
252 beginTime = time.Now()
253 begin := &stats.Begin{
254 Client: true,
255 BeginTime: beginTime,
256 FailFast: c.failFast,
257 }
258 sh.HandleRPC(ctx, begin)
259 }
260
261 cs := &clientStream{
262 callHdr: callHdr,
263 ctx: ctx,
264 methodConfig: &mc,
265 opts: opts,
266 callInfo: c,
267 cc: cc,
268 desc: desc,
269 codec: c.codec,
270 cp: cp,
271 comp: comp,
272 cancel: cancel,
273 beginTime: beginTime,
274 firstAttempt: true,
275 }
276 if !cc.dopts.disableRetry {
277 cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
278 }
279 cs.binlog = binarylog.GetMethodLogger(method)
280
281 cs.callInfo.stream = cs
282 // Only this initial attempt has stats/tracing.
283 // TODO(dfawley): move to newAttempt when per-attempt stats are implemented.
284 if err := cs.newAttemptLocked(sh, trInfo); err != nil {
285 cs.finish(err)
286 return nil, err
287 }
288
289 op := func(a *csAttempt) error { return a.newStream() }
290 if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
291 cs.finish(err)
292 return nil, err
293 }
294
295 if cs.binlog != nil {
296 md, _ := metadata.FromOutgoingContext(ctx)
297 logEntry := &binarylog.ClientHeader{
298 OnClientSide: true,
299 Header: md,
300 MethodName: method,
301 Authority: cs.cc.authority,
302 }
303 if deadline, ok := ctx.Deadline(); ok {
304 logEntry.Timeout = time.Until(deadline)
305 if logEntry.Timeout < 0 {
306 logEntry.Timeout = 0
307 }
308 }
309 cs.binlog.Log(logEntry)
310 }
311
312 if desc != unaryStreamDesc {
313 // Listen on cc and stream contexts to cleanup when the user closes the
314 // ClientConn or cancels the stream context. In all other cases, an error
315 // should already be injected into the recv buffer by the transport, which
316 // the client will eventually receive, and then we will cancel the stream's
317 // context in clientStream.finish.
318 go func() {
319 select {
320 case <-cc.ctx.Done():
321 cs.finish(ErrClientConnClosing)
322 case <-ctx.Done():
323 cs.finish(toRPCErr(ctx.Err()))
324 }
325 }()
326 }
327 return cs, nil
328}
329
330// newAttemptLocked creates a new attempt with a transport.
331// If it succeeds, then it replaces clientStream's attempt with this new attempt.
332func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (retErr error) {
333 newAttempt := &csAttempt{
334 cs: cs,
335 dc: cs.cc.dopts.dc,
336 statsHandler: sh,
337 trInfo: trInfo,
338 }
339 defer func() {
340 if retErr != nil {
341 // This attempt is not set in the clientStream, so it's finish won't
342 // be called. Call it here for stats and trace in case they are not
343 // nil.
344 newAttempt.finish(retErr)
345 }
346 }()
347
348 if err := cs.ctx.Err(); err != nil {
349 return toRPCErr(err)
350 }
351 t, done, err := cs.cc.getTransport(cs.ctx, cs.callInfo.failFast, cs.callHdr.Method)
352 if err != nil {
353 return err
354 }
355 if trInfo != nil {
356 trInfo.firstLine.SetRemoteAddr(t.RemoteAddr())
357 }
358 newAttempt.t = t
359 newAttempt.done = done
360 cs.attempt = newAttempt
361 return nil
362}
363
364func (a *csAttempt) newStream() error {
365 cs := a.cs
366 cs.callHdr.PreviousAttempts = cs.numRetries
367 s, err := a.t.NewStream(cs.ctx, cs.callHdr)
368 if err != nil {
369 return toRPCErr(err)
370 }
371 cs.attempt.s = s
372 cs.attempt.p = &parser{r: s}
373 return nil
374}
375
376// clientStream implements a client side Stream.
377type clientStream struct {
378 callHdr *transport.CallHdr
379 opts []CallOption
380 callInfo *callInfo
381 cc *ClientConn
382 desc *StreamDesc
383
384 codec baseCodec
385 cp Compressor
386 comp encoding.Compressor
387
388 cancel context.CancelFunc // cancels all attempts
389
390 sentLast bool // sent an end stream
391 beginTime time.Time
392
393 methodConfig *MethodConfig
394
395 ctx context.Context // the application's context, wrapped by stats/tracing
396
397 retryThrottler *retryThrottler // The throttler active when the RPC began.
398
399 binlog *binarylog.MethodLogger // Binary logger, can be nil.
400 // serverHeaderBinlogged is a boolean for whether server header has been
401 // logged. Server header will be logged when the first time one of those
402 // happens: stream.Header(), stream.Recv().
403 //
404 // It's only read and used by Recv() and Header(), so it doesn't need to be
405 // synchronized.
406 serverHeaderBinlogged bool
407
408 mu sync.Mutex
409 firstAttempt bool // if true, transparent retry is valid
410 numRetries int // exclusive of transparent retry attempt(s)
411 numRetriesSincePushback int // retries since pushback; to reset backoff
412 finished bool // TODO: replace with atomic cmpxchg or sync.Once?
413 // attempt is the active client stream attempt.
414 // The only place where it is written is the newAttemptLocked method and this method never writes nil.
415 // So, attempt can be nil only inside newClientStream function when clientStream is first created.
416 // One of the first things done after clientStream's creation, is to call newAttemptLocked which either
417 // assigns a non nil value to the attempt or returns an error. If an error is returned from newAttemptLocked,
418 // then newClientStream calls finish on the clientStream and returns. So, finish method is the only
419 // place where we need to check if the attempt is nil.
420 attempt *csAttempt
421 // TODO(hedging): hedging will have multiple attempts simultaneously.
422 committed bool // active attempt committed for retry?
423 buffer []func(a *csAttempt) error // operations to replay on retry
424 bufferSize int // current size of buffer
425}
426
427// csAttempt implements a single transport stream attempt within a
428// clientStream.
429type csAttempt struct {
430 cs *clientStream
431 t transport.ClientTransport
432 s *transport.Stream
433 p *parser
434 done func(balancer.DoneInfo)
435
436 finished bool
437 dc Decompressor
438 decomp encoding.Compressor
439 decompSet bool
440
441 mu sync.Mutex // guards trInfo.tr
442 // trInfo may be nil (if EnableTracing is false).
443 // trInfo.tr is set when created (if EnableTracing is true),
444 // and cleared when the finish method is called.
445 trInfo *traceInfo
446
447 statsHandler stats.Handler
448}
449
450func (cs *clientStream) commitAttemptLocked() {
451 cs.committed = true
452 cs.buffer = nil
453}
454
455func (cs *clientStream) commitAttempt() {
456 cs.mu.Lock()
457 cs.commitAttemptLocked()
458 cs.mu.Unlock()
459}
460
461// shouldRetry returns nil if the RPC should be retried; otherwise it returns
462// the error that should be returned by the operation.
463func (cs *clientStream) shouldRetry(err error) error {
464 if cs.attempt.s == nil && !cs.callInfo.failFast {
465 // In the event of any error from NewStream (attempt.s == nil), we
466 // never attempted to write anything to the wire, so we can retry
467 // indefinitely for non-fail-fast RPCs.
468 return nil
469 }
470 if cs.finished || cs.committed {
471 // RPC is finished or committed; cannot retry.
472 return err
473 }
474 // Wait for the trailers.
475 if cs.attempt.s != nil {
476 <-cs.attempt.s.Done()
477 }
478 if cs.firstAttempt && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) {
479 // First attempt, stream unprocessed: transparently retry.
480 cs.firstAttempt = false
481 return nil
482 }
483 cs.firstAttempt = false
484 if cs.cc.dopts.disableRetry {
485 return err
486 }
487
488 pushback := 0
489 hasPushback := false
490 if cs.attempt.s != nil {
491 if !cs.attempt.s.TrailersOnly() {
492 return err
493 }
494
495 // TODO(retry): Move down if the spec changes to not check server pushback
496 // before considering this a failure for throttling.
497 sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"]
498 if len(sps) == 1 {
499 var e error
500 if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
501 grpclog.Infof("Server retry pushback specified to abort (%q).", sps[0])
502 cs.retryThrottler.throttle() // This counts as a failure for throttling.
503 return err
504 }
505 hasPushback = true
506 } else if len(sps) > 1 {
507 grpclog.Warningf("Server retry pushback specified multiple values (%q); not retrying.", sps)
508 cs.retryThrottler.throttle() // This counts as a failure for throttling.
509 return err
510 }
511 }
512
513 var code codes.Code
514 if cs.attempt.s != nil {
515 code = cs.attempt.s.Status().Code()
516 } else {
517 code = status.Convert(err).Code()
518 }
519
520 rp := cs.methodConfig.retryPolicy
521 if rp == nil || !rp.retryableStatusCodes[code] {
522 return err
523 }
524
525 // Note: the ordering here is important; we count this as a failure
526 // only if the code matched a retryable code.
527 if cs.retryThrottler.throttle() {
528 return err
529 }
530 if cs.numRetries+1 >= rp.maxAttempts {
531 return err
532 }
533
534 var dur time.Duration
535 if hasPushback {
536 dur = time.Millisecond * time.Duration(pushback)
537 cs.numRetriesSincePushback = 0
538 } else {
539 fact := math.Pow(rp.backoffMultiplier, float64(cs.numRetriesSincePushback))
540 cur := float64(rp.initialBackoff) * fact
541 if max := float64(rp.maxBackoff); cur > max {
542 cur = max
543 }
544 dur = time.Duration(grpcrand.Int63n(int64(cur)))
545 cs.numRetriesSincePushback++
546 }
547
548 // TODO(dfawley): we could eagerly fail here if dur puts us past the
549 // deadline, but unsure if it is worth doing.
550 t := time.NewTimer(dur)
551 select {
552 case <-t.C:
553 cs.numRetries++
554 return nil
555 case <-cs.ctx.Done():
556 t.Stop()
557 return status.FromContextError(cs.ctx.Err()).Err()
558 }
559}
560
561// Returns nil if a retry was performed and succeeded; error otherwise.
562func (cs *clientStream) retryLocked(lastErr error) error {
563 for {
564 cs.attempt.finish(lastErr)
565 if err := cs.shouldRetry(lastErr); err != nil {
566 cs.commitAttemptLocked()
567 return err
568 }
569 if err := cs.newAttemptLocked(nil, nil); err != nil {
570 return err
571 }
572 if lastErr = cs.replayBufferLocked(); lastErr == nil {
573 return nil
574 }
575 }
576}
577
578func (cs *clientStream) Context() context.Context {
579 cs.commitAttempt()
580 // No need to lock before using attempt, since we know it is committed and
581 // cannot change.
582 return cs.attempt.s.Context()
583}
584
585func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
586 cs.mu.Lock()
587 for {
588 if cs.committed {
589 cs.mu.Unlock()
590 return op(cs.attempt)
591 }
592 a := cs.attempt
593 cs.mu.Unlock()
594 err := op(a)
595 cs.mu.Lock()
596 if a != cs.attempt {
597 // We started another attempt already.
598 continue
599 }
600 if err == io.EOF {
601 <-a.s.Done()
602 }
603 if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) {
604 onSuccess()
605 cs.mu.Unlock()
606 return err
607 }
608 if err := cs.retryLocked(err); err != nil {
609 cs.mu.Unlock()
610 return err
611 }
612 }
613}
614
615func (cs *clientStream) Header() (metadata.MD, error) {
616 var m metadata.MD
617 err := cs.withRetry(func(a *csAttempt) error {
618 var err error
619 m, err = a.s.Header()
620 return toRPCErr(err)
621 }, cs.commitAttemptLocked)
622 if err != nil {
623 cs.finish(err)
624 return nil, err
625 }
626 if cs.binlog != nil && !cs.serverHeaderBinlogged {
627 // Only log if binary log is on and header has not been logged.
628 logEntry := &binarylog.ServerHeader{
629 OnClientSide: true,
630 Header: m,
631 PeerAddr: nil,
632 }
633 if peer, ok := peer.FromContext(cs.Context()); ok {
634 logEntry.PeerAddr = peer.Addr
635 }
636 cs.binlog.Log(logEntry)
637 cs.serverHeaderBinlogged = true
638 }
639 return m, err
640}
641
642func (cs *clientStream) Trailer() metadata.MD {
643 // On RPC failure, we never need to retry, because usage requires that
644 // RecvMsg() returned a non-nil error before calling this function is valid.
645 // We would have retried earlier if necessary.
646 //
647 // Commit the attempt anyway, just in case users are not following those
648 // directions -- it will prevent races and should not meaningfully impact
649 // performance.
650 cs.commitAttempt()
651 if cs.attempt.s == nil {
652 return nil
653 }
654 return cs.attempt.s.Trailer()
655}
656
657func (cs *clientStream) replayBufferLocked() error {
658 a := cs.attempt
659 for _, f := range cs.buffer {
660 if err := f(a); err != nil {
661 return err
662 }
663 }
664 return nil
665}
666
667func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error) {
668 // Note: we still will buffer if retry is disabled (for transparent retries).
669 if cs.committed {
670 return
671 }
672 cs.bufferSize += sz
673 if cs.bufferSize > cs.callInfo.maxRetryRPCBufferSize {
674 cs.commitAttemptLocked()
675 return
676 }
677 cs.buffer = append(cs.buffer, op)
678}
679
680func (cs *clientStream) SendMsg(m interface{}) (err error) {
681 defer func() {
682 if err != nil && err != io.EOF {
683 // Call finish on the client stream for errors generated by this SendMsg
684 // call, as these indicate problems created by this client. (Transport
685 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
686 // error will be returned from RecvMsg eventually in that case, or be
687 // retried.)
688 cs.finish(err)
689 }
690 }()
691 if cs.sentLast {
692 return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
693 }
694 if !cs.desc.ClientStreams {
695 cs.sentLast = true
696 }
697
698 // load hdr, payload, data
699 hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
700 if err != nil {
701 return err
702 }
703
704 // TODO(dfawley): should we be checking len(data) instead?
705 if len(payload) > *cs.callInfo.maxSendMessageSize {
706 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
707 }
708 msgBytes := data // Store the pointer before setting to nil. For binary logging.
709 op := func(a *csAttempt) error {
710 err := a.sendMsg(m, hdr, payload, data)
711 // nil out the message and uncomp when replaying; they are only needed for
712 // stats which is disabled for subsequent attempts.
713 m, data = nil, nil
714 return err
715 }
716 err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
717 if cs.binlog != nil && err == nil {
718 cs.binlog.Log(&binarylog.ClientMessage{
719 OnClientSide: true,
720 Message: msgBytes,
721 })
722 }
723 return
724}
725
726func (cs *clientStream) RecvMsg(m interface{}) error {
727 if cs.binlog != nil && !cs.serverHeaderBinlogged {
728 // Call Header() to binary log header if it's not already logged.
729 cs.Header()
730 }
731 var recvInfo *payloadInfo
732 if cs.binlog != nil {
733 recvInfo = &payloadInfo{}
734 }
735 err := cs.withRetry(func(a *csAttempt) error {
736 return a.recvMsg(m, recvInfo)
737 }, cs.commitAttemptLocked)
738 if cs.binlog != nil && err == nil {
739 cs.binlog.Log(&binarylog.ServerMessage{
740 OnClientSide: true,
741 Message: recvInfo.uncompressedBytes,
742 })
743 }
744 if err != nil || !cs.desc.ServerStreams {
745 // err != nil or non-server-streaming indicates end of stream.
746 cs.finish(err)
747
748 if cs.binlog != nil {
749 // finish will not log Trailer. Log Trailer here.
750 logEntry := &binarylog.ServerTrailer{
751 OnClientSide: true,
752 Trailer: cs.Trailer(),
753 Err: err,
754 }
755 if logEntry.Err == io.EOF {
756 logEntry.Err = nil
757 }
758 if peer, ok := peer.FromContext(cs.Context()); ok {
759 logEntry.PeerAddr = peer.Addr
760 }
761 cs.binlog.Log(logEntry)
762 }
763 }
764 return err
765}
766
767func (cs *clientStream) CloseSend() error {
768 if cs.sentLast {
769 // TODO: return an error and finish the stream instead, due to API misuse?
770 return nil
771 }
772 cs.sentLast = true
773 op := func(a *csAttempt) error {
774 a.t.Write(a.s, nil, nil, &transport.Options{Last: true})
775 // Always return nil; io.EOF is the only error that might make sense
776 // instead, but there is no need to signal the client to call RecvMsg
777 // as the only use left for the stream after CloseSend is to call
778 // RecvMsg. This also matches historical behavior.
779 return nil
780 }
781 cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) })
782 if cs.binlog != nil {
783 cs.binlog.Log(&binarylog.ClientHalfClose{
784 OnClientSide: true,
785 })
786 }
787 // We never returned an error here for reasons.
788 return nil
789}
790
791func (cs *clientStream) finish(err error) {
792 if err == io.EOF {
793 // Ending a stream with EOF indicates a success.
794 err = nil
795 }
796 cs.mu.Lock()
797 if cs.finished {
798 cs.mu.Unlock()
799 return
800 }
801 cs.finished = true
802 cs.commitAttemptLocked()
803 cs.mu.Unlock()
804 // For binary logging. only log cancel in finish (could be caused by RPC ctx
805 // canceled or ClientConn closed). Trailer will be logged in RecvMsg.
806 //
807 // Only one of cancel or trailer needs to be logged. In the cases where
808 // users don't call RecvMsg, users must have already canceled the RPC.
809 if cs.binlog != nil && status.Code(err) == codes.Canceled {
810 cs.binlog.Log(&binarylog.Cancel{
811 OnClientSide: true,
812 })
813 }
814 if err == nil {
815 cs.retryThrottler.successfulRPC()
816 }
817 if channelz.IsOn() {
818 if err != nil {
819 cs.cc.incrCallsFailed()
820 } else {
821 cs.cc.incrCallsSucceeded()
822 }
823 }
824 if cs.attempt != nil {
825 cs.attempt.finish(err)
826 // after functions all rely upon having a stream.
827 if cs.attempt.s != nil {
828 for _, o := range cs.opts {
829 o.after(cs.callInfo)
830 }
831 }
832 }
833 cs.cancel()
834}
835
836func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
837 cs := a.cs
838 if a.trInfo != nil {
839 a.mu.Lock()
840 if a.trInfo.tr != nil {
841 a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
842 }
843 a.mu.Unlock()
844 }
845 if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
846 if !cs.desc.ClientStreams {
847 // For non-client-streaming RPCs, we return nil instead of EOF on error
848 // because the generated code requires it. finish is not called; RecvMsg()
849 // will call it with the stream's status independently.
850 return nil
851 }
852 return io.EOF
853 }
854 if a.statsHandler != nil {
855 a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now()))
856 }
857 if channelz.IsOn() {
858 a.t.IncrMsgSent()
859 }
860 return nil
861}
862
863func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
864 cs := a.cs
865 if a.statsHandler != nil && payInfo == nil {
866 payInfo = &payloadInfo{}
867 }
868
869 if !a.decompSet {
870 // Block until we receive headers containing received message encoding.
871 if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity {
872 if a.dc == nil || a.dc.Type() != ct {
873 // No configured decompressor, or it does not match the incoming
874 // message encoding; attempt to find a registered compressor that does.
875 a.dc = nil
876 a.decomp = encoding.GetCompressor(ct)
877 }
878 } else {
879 // No compression is used; disable our decompressor.
880 a.dc = nil
881 }
882 // Only initialize this state once per stream.
883 a.decompSet = true
884 }
885 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
886 if err != nil {
887 if err == io.EOF {
888 if statusErr := a.s.Status().Err(); statusErr != nil {
889 return statusErr
890 }
891 return io.EOF // indicates successful end of stream.
892 }
893 return toRPCErr(err)
894 }
895 if a.trInfo != nil {
896 a.mu.Lock()
897 if a.trInfo.tr != nil {
898 a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
899 }
900 a.mu.Unlock()
901 }
902 if a.statsHandler != nil {
903 a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{
904 Client: true,
905 RecvTime: time.Now(),
906 Payload: m,
907 // TODO truncate large payload.
908 Data: payInfo.uncompressedBytes,
909 WireLength: payInfo.wireLength,
910 Length: len(payInfo.uncompressedBytes),
911 })
912 }
913 if channelz.IsOn() {
914 a.t.IncrMsgRecv()
915 }
916 if cs.desc.ServerStreams {
917 // Subsequent messages should be received by subsequent RecvMsg calls.
918 return nil
919 }
920 // Special handling for non-server-stream rpcs.
921 // This recv expects EOF or errors, so we don't collect inPayload.
922 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp)
923 if err == nil {
924 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
925 }
926 if err == io.EOF {
927 return a.s.Status().Err() // non-server streaming Recv returns nil on success
928 }
929 return toRPCErr(err)
930}
931
932func (a *csAttempt) finish(err error) {
933 a.mu.Lock()
934 if a.finished {
935 a.mu.Unlock()
936 return
937 }
938 a.finished = true
939 if err == io.EOF {
940 // Ending a stream with EOF indicates a success.
941 err = nil
942 }
943 var tr metadata.MD
944 if a.s != nil {
945 a.t.CloseStream(a.s, err)
946 tr = a.s.Trailer()
947 }
948
949 if a.done != nil {
950 br := false
951 if a.s != nil {
952 br = a.s.BytesReceived()
953 }
954 a.done(balancer.DoneInfo{
955 Err: err,
956 Trailer: tr,
957 BytesSent: a.s != nil,
958 BytesReceived: br,
959 ServerLoad: balancerload.Parse(tr),
960 })
961 }
962 if a.statsHandler != nil {
963 end := &stats.End{
964 Client: true,
965 BeginTime: a.cs.beginTime,
966 EndTime: time.Now(),
967 Trailer: tr,
968 Error: err,
969 }
970 a.statsHandler.HandleRPC(a.cs.ctx, end)
971 }
972 if a.trInfo != nil && a.trInfo.tr != nil {
973 if err == nil {
974 a.trInfo.tr.LazyPrintf("RPC: [OK]")
975 } else {
976 a.trInfo.tr.LazyPrintf("RPC: [%v]", err)
977 a.trInfo.tr.SetError()
978 }
979 a.trInfo.tr.Finish()
980 a.trInfo.tr = nil
981 }
982 a.mu.Unlock()
983}
984
985// newClientStream creates a ClientStream with the specified transport, on the
986// given addrConn.
987//
988// It's expected that the given transport is either the same one in addrConn, or
989// is already closed. To avoid race, transport is specified separately, instead
990// of using ac.transpot.
991//
992// Main difference between this and ClientConn.NewStream:
993// - no retry
994// - no service config (or wait for service config)
995// - no tracing or stats
996func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, ac *addrConn, opts ...CallOption) (_ ClientStream, err error) {
997 if t == nil {
998 // TODO: return RPC error here?
999 return nil, errors.New("transport provided is nil")
1000 }
1001 // defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct.
1002 c := &callInfo{}
1003
1004 // Possible context leak:
1005 // The cancel function for the child context we create will only be called
1006 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
1007 // an error is generated by SendMsg.
1008 // https://github.com/grpc/grpc-go/issues/1818.
1009 ctx, cancel := context.WithCancel(ctx)
1010 defer func() {
1011 if err != nil {
1012 cancel()
1013 }
1014 }()
1015
1016 for _, o := range opts {
1017 if err := o.before(c); err != nil {
1018 return nil, toRPCErr(err)
1019 }
1020 }
1021 c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
1022 c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
1023 if err := setCallInfoCodec(c); err != nil {
1024 return nil, err
1025 }
1026
1027 callHdr := &transport.CallHdr{
1028 Host: ac.cc.authority,
1029 Method: method,
1030 ContentSubtype: c.contentSubtype,
1031 }
1032
1033 // Set our outgoing compression according to the UseCompressor CallOption, if
1034 // set. In that case, also find the compressor from the encoding package.
1035 // Otherwise, use the compressor configured by the WithCompressor DialOption,
1036 // if set.
1037 var cp Compressor
1038 var comp encoding.Compressor
1039 if ct := c.compressorType; ct != "" {
1040 callHdr.SendCompress = ct
1041 if ct != encoding.Identity {
1042 comp = encoding.GetCompressor(ct)
1043 if comp == nil {
1044 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
1045 }
1046 }
1047 } else if ac.cc.dopts.cp != nil {
1048 callHdr.SendCompress = ac.cc.dopts.cp.Type()
1049 cp = ac.cc.dopts.cp
1050 }
1051 if c.creds != nil {
1052 callHdr.Creds = c.creds
1053 }
1054
1055 // Use a special addrConnStream to avoid retry.
1056 as := &addrConnStream{
1057 callHdr: callHdr,
1058 ac: ac,
1059 ctx: ctx,
1060 cancel: cancel,
1061 opts: opts,
1062 callInfo: c,
1063 desc: desc,
1064 codec: c.codec,
1065 cp: cp,
1066 comp: comp,
1067 t: t,
1068 }
1069
1070 as.callInfo.stream = as
1071 s, err := as.t.NewStream(as.ctx, as.callHdr)
1072 if err != nil {
1073 err = toRPCErr(err)
1074 return nil, err
1075 }
1076 as.s = s
1077 as.p = &parser{r: s}
1078 ac.incrCallsStarted()
1079 if desc != unaryStreamDesc {
1080 // Listen on cc and stream contexts to cleanup when the user closes the
1081 // ClientConn or cancels the stream context. In all other cases, an error
1082 // should already be injected into the recv buffer by the transport, which
1083 // the client will eventually receive, and then we will cancel the stream's
1084 // context in clientStream.finish.
1085 go func() {
1086 select {
1087 case <-ac.ctx.Done():
1088 as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
1089 case <-ctx.Done():
1090 as.finish(toRPCErr(ctx.Err()))
1091 }
1092 }()
1093 }
1094 return as, nil
1095}
1096
1097type addrConnStream struct {
1098 s *transport.Stream
1099 ac *addrConn
1100 callHdr *transport.CallHdr
1101 cancel context.CancelFunc
1102 opts []CallOption
1103 callInfo *callInfo
1104 t transport.ClientTransport
1105 ctx context.Context
1106 sentLast bool
1107 desc *StreamDesc
1108 codec baseCodec
1109 cp Compressor
1110 comp encoding.Compressor
1111 decompSet bool
1112 dc Decompressor
1113 decomp encoding.Compressor
1114 p *parser
1115 mu sync.Mutex
1116 finished bool
1117}
1118
1119func (as *addrConnStream) Header() (metadata.MD, error) {
1120 m, err := as.s.Header()
1121 if err != nil {
1122 as.finish(toRPCErr(err))
1123 }
1124 return m, err
1125}
1126
1127func (as *addrConnStream) Trailer() metadata.MD {
1128 return as.s.Trailer()
1129}
1130
1131func (as *addrConnStream) CloseSend() error {
1132 if as.sentLast {
1133 // TODO: return an error and finish the stream instead, due to API misuse?
1134 return nil
1135 }
1136 as.sentLast = true
1137
1138 as.t.Write(as.s, nil, nil, &transport.Options{Last: true})
1139 // Always return nil; io.EOF is the only error that might make sense
1140 // instead, but there is no need to signal the client to call RecvMsg
1141 // as the only use left for the stream after CloseSend is to call
1142 // RecvMsg. This also matches historical behavior.
1143 return nil
1144}
1145
1146func (as *addrConnStream) Context() context.Context {
1147 return as.s.Context()
1148}
1149
1150func (as *addrConnStream) SendMsg(m interface{}) (err error) {
1151 defer func() {
1152 if err != nil && err != io.EOF {
1153 // Call finish on the client stream for errors generated by this SendMsg
1154 // call, as these indicate problems created by this client. (Transport
1155 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
1156 // error will be returned from RecvMsg eventually in that case, or be
1157 // retried.)
1158 as.finish(err)
1159 }
1160 }()
1161 if as.sentLast {
1162 return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
1163 }
1164 if !as.desc.ClientStreams {
1165 as.sentLast = true
1166 }
1167
1168 // load hdr, payload, data
1169 hdr, payld, _, err := prepareMsg(m, as.codec, as.cp, as.comp)
1170 if err != nil {
1171 return err
1172 }
1173
1174 // TODO(dfawley): should we be checking len(data) instead?
1175 if len(payld) > *as.callInfo.maxSendMessageSize {
1176 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize)
1177 }
1178
1179 if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams}); err != nil {
1180 if !as.desc.ClientStreams {
1181 // For non-client-streaming RPCs, we return nil instead of EOF on error
1182 // because the generated code requires it. finish is not called; RecvMsg()
1183 // will call it with the stream's status independently.
1184 return nil
1185 }
1186 return io.EOF
1187 }
1188
1189 if channelz.IsOn() {
1190 as.t.IncrMsgSent()
1191 }
1192 return nil
1193}
1194
1195func (as *addrConnStream) RecvMsg(m interface{}) (err error) {
1196 defer func() {
1197 if err != nil || !as.desc.ServerStreams {
1198 // err != nil or non-server-streaming indicates end of stream.
1199 as.finish(err)
1200 }
1201 }()
1202
1203 if !as.decompSet {
1204 // Block until we receive headers containing received message encoding.
1205 if ct := as.s.RecvCompress(); ct != "" && ct != encoding.Identity {
1206 if as.dc == nil || as.dc.Type() != ct {
1207 // No configured decompressor, or it does not match the incoming
1208 // message encoding; attempt to find a registered compressor that does.
1209 as.dc = nil
1210 as.decomp = encoding.GetCompressor(ct)
1211 }
1212 } else {
1213 // No compression is used; disable our decompressor.
1214 as.dc = nil
1215 }
1216 // Only initialize this state once per stream.
1217 as.decompSet = true
1218 }
1219 err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
1220 if err != nil {
1221 if err == io.EOF {
1222 if statusErr := as.s.Status().Err(); statusErr != nil {
1223 return statusErr
1224 }
1225 return io.EOF // indicates successful end of stream.
1226 }
1227 return toRPCErr(err)
1228 }
1229
1230 if channelz.IsOn() {
1231 as.t.IncrMsgRecv()
1232 }
1233 if as.desc.ServerStreams {
1234 // Subsequent messages should be received by subsequent RecvMsg calls.
1235 return nil
1236 }
1237
1238 // Special handling for non-server-stream rpcs.
1239 // This recv expects EOF or errors, so we don't collect inPayload.
1240 err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
1241 if err == nil {
1242 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
1243 }
1244 if err == io.EOF {
1245 return as.s.Status().Err() // non-server streaming Recv returns nil on success
1246 }
1247 return toRPCErr(err)
1248}
1249
1250func (as *addrConnStream) finish(err error) {
1251 as.mu.Lock()
1252 if as.finished {
1253 as.mu.Unlock()
1254 return
1255 }
1256 as.finished = true
1257 if err == io.EOF {
1258 // Ending a stream with EOF indicates a success.
1259 err = nil
1260 }
1261 if as.s != nil {
1262 as.t.CloseStream(as.s, err)
1263 }
1264
1265 if err != nil {
1266 as.ac.incrCallsFailed()
1267 } else {
1268 as.ac.incrCallsSucceeded()
1269 }
1270 as.cancel()
1271 as.mu.Unlock()
1272}
1273
1274// ServerStream defines the server-side behavior of a streaming RPC.
1275//
1276// All errors returned from ServerStream methods are compatible with the
1277// status package.
1278type ServerStream interface {
1279 // SetHeader sets the header metadata. It may be called multiple times.
1280 // When call multiple times, all the provided metadata will be merged.
1281 // All the metadata will be sent out when one of the following happens:
1282 // - ServerStream.SendHeader() is called;
1283 // - The first response is sent out;
1284 // - An RPC status is sent out (error or success).
1285 SetHeader(metadata.MD) error
1286 // SendHeader sends the header metadata.
1287 // The provided md and headers set by SetHeader() will be sent.
1288 // It fails if called multiple times.
1289 SendHeader(metadata.MD) error
1290 // SetTrailer sets the trailer metadata which will be sent with the RPC status.
1291 // When called more than once, all the provided metadata will be merged.
1292 SetTrailer(metadata.MD)
1293 // Context returns the context for this stream.
1294 Context() context.Context
1295 // SendMsg sends a message. On error, SendMsg aborts the stream and the
1296 // error is returned directly.
1297 //
1298 // SendMsg blocks until:
1299 // - There is sufficient flow control to schedule m with the transport, or
1300 // - The stream is done, or
1301 // - The stream breaks.
1302 //
1303 // SendMsg does not wait until the message is received by the client. An
1304 // untimely stream closure may result in lost messages.
1305 //
1306 // It is safe to have a goroutine calling SendMsg and another goroutine
1307 // calling RecvMsg on the same stream at the same time, but it is not safe
1308 // to call SendMsg on the same stream in different goroutines.
1309 SendMsg(m interface{}) error
1310 // RecvMsg blocks until it receives a message into m or the stream is
1311 // done. It returns io.EOF when the client has performed a CloseSend. On
1312 // any non-EOF error, the stream is aborted and the error contains the
1313 // RPC status.
1314 //
1315 // It is safe to have a goroutine calling SendMsg and another goroutine
1316 // calling RecvMsg on the same stream at the same time, but it is not
1317 // safe to call RecvMsg on the same stream in different goroutines.
1318 RecvMsg(m interface{}) error
1319}
1320
1321// serverStream implements a server side Stream.
1322type serverStream struct {
1323 ctx context.Context
1324 t transport.ServerTransport
1325 s *transport.Stream
1326 p *parser
1327 codec baseCodec
1328
1329 cp Compressor
1330 dc Decompressor
1331 comp encoding.Compressor
1332 decomp encoding.Compressor
1333
1334 maxReceiveMessageSize int
1335 maxSendMessageSize int
1336 trInfo *traceInfo
1337
1338 statsHandler stats.Handler
1339
1340 binlog *binarylog.MethodLogger
1341 // serverHeaderBinlogged indicates whether server header has been logged. It
1342 // will happen when one of the following two happens: stream.SendHeader(),
1343 // stream.Send().
1344 //
1345 // It's only checked in send and sendHeader, doesn't need to be
1346 // synchronized.
1347 serverHeaderBinlogged bool
1348
1349 mu sync.Mutex // protects trInfo.tr after the service handler runs.
1350}
1351
1352func (ss *serverStream) Context() context.Context {
1353 return ss.ctx
1354}
1355
1356func (ss *serverStream) SetHeader(md metadata.MD) error {
1357 if md.Len() == 0 {
1358 return nil
1359 }
1360 return ss.s.SetHeader(md)
1361}
1362
1363func (ss *serverStream) SendHeader(md metadata.MD) error {
1364 err := ss.t.WriteHeader(ss.s, md)
1365 if ss.binlog != nil && !ss.serverHeaderBinlogged {
1366 h, _ := ss.s.Header()
1367 ss.binlog.Log(&binarylog.ServerHeader{
1368 Header: h,
1369 })
1370 ss.serverHeaderBinlogged = true
1371 }
1372 return err
1373}
1374
1375func (ss *serverStream) SetTrailer(md metadata.MD) {
1376 if md.Len() == 0 {
1377 return
1378 }
1379 ss.s.SetTrailer(md)
1380}
1381
1382func (ss *serverStream) SendMsg(m interface{}) (err error) {
1383 defer func() {
1384 if ss.trInfo != nil {
1385 ss.mu.Lock()
1386 if ss.trInfo.tr != nil {
1387 if err == nil {
1388 ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
1389 } else {
1390 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1391 ss.trInfo.tr.SetError()
1392 }
1393 }
1394 ss.mu.Unlock()
1395 }
1396 if err != nil && err != io.EOF {
1397 st, _ := status.FromError(toRPCErr(err))
1398 ss.t.WriteStatus(ss.s, st)
1399 // Non-user specified status was sent out. This should be an error
1400 // case (as a server side Cancel maybe).
1401 //
1402 // This is not handled specifically now. User will return a final
1403 // status from the service handler, we will log that error instead.
1404 // This behavior is similar to an interceptor.
1405 }
1406 if channelz.IsOn() && err == nil {
1407 ss.t.IncrMsgSent()
1408 }
1409 }()
1410
1411 // load hdr, payload, data
1412 hdr, payload, data, err := prepareMsg(m, ss.codec, ss.cp, ss.comp)
1413 if err != nil {
1414 return err
1415 }
1416
1417 // TODO(dfawley): should we be checking len(data) instead?
1418 if len(payload) > ss.maxSendMessageSize {
1419 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize)
1420 }
1421 if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil {
1422 return toRPCErr(err)
1423 }
1424 if ss.binlog != nil {
1425 if !ss.serverHeaderBinlogged {
1426 h, _ := ss.s.Header()
1427 ss.binlog.Log(&binarylog.ServerHeader{
1428 Header: h,
1429 })
1430 ss.serverHeaderBinlogged = true
1431 }
1432 ss.binlog.Log(&binarylog.ServerMessage{
1433 Message: data,
1434 })
1435 }
1436 if ss.statsHandler != nil {
1437 ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
1438 }
1439 return nil
1440}
1441
1442func (ss *serverStream) RecvMsg(m interface{}) (err error) {
1443 defer func() {
1444 if ss.trInfo != nil {
1445 ss.mu.Lock()
1446 if ss.trInfo.tr != nil {
1447 if err == nil {
1448 ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
1449 } else if err != io.EOF {
1450 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1451 ss.trInfo.tr.SetError()
1452 }
1453 }
1454 ss.mu.Unlock()
1455 }
1456 if err != nil && err != io.EOF {
1457 st, _ := status.FromError(toRPCErr(err))
1458 ss.t.WriteStatus(ss.s, st)
1459 // Non-user specified status was sent out. This should be an error
1460 // case (as a server side Cancel maybe).
1461 //
1462 // This is not handled specifically now. User will return a final
1463 // status from the service handler, we will log that error instead.
1464 // This behavior is similar to an interceptor.
1465 }
1466 if channelz.IsOn() && err == nil {
1467 ss.t.IncrMsgRecv()
1468 }
1469 }()
1470 var payInfo *payloadInfo
1471 if ss.statsHandler != nil || ss.binlog != nil {
1472 payInfo = &payloadInfo{}
1473 }
1474 if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil {
1475 if err == io.EOF {
1476 if ss.binlog != nil {
1477 ss.binlog.Log(&binarylog.ClientHalfClose{})
1478 }
1479 return err
1480 }
1481 if err == io.ErrUnexpectedEOF {
1482 err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
1483 }
1484 return toRPCErr(err)
1485 }
1486 if ss.statsHandler != nil {
1487 ss.statsHandler.HandleRPC(ss.s.Context(), &stats.InPayload{
1488 RecvTime: time.Now(),
1489 Payload: m,
1490 // TODO truncate large payload.
1491 Data: payInfo.uncompressedBytes,
1492 WireLength: payInfo.wireLength,
1493 Length: len(payInfo.uncompressedBytes),
1494 })
1495 }
1496 if ss.binlog != nil {
1497 ss.binlog.Log(&binarylog.ClientMessage{
1498 Message: payInfo.uncompressedBytes,
1499 })
1500 }
1501 return nil
1502}
1503
1504// MethodFromServerStream returns the method string for the input stream.
1505// The returned string is in the format of "/service/method".
1506func MethodFromServerStream(stream ServerStream) (string, bool) {
1507 return Method(stream.Context())
1508}
1509
1510// prepareMsg returns the hdr, payload and data
1511// using the compressors passed or using the
1512// passed preparedmsg
1513func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) {
1514 if preparedMsg, ok := m.(*PreparedMsg); ok {
1515 return preparedMsg.hdr, preparedMsg.payload, preparedMsg.encodedData, nil
1516 }
1517 // The input interface is not a prepared msg.
1518 // Marshal and Compress the data at this point
1519 data, err = encode(codec, m)
1520 if err != nil {
1521 return nil, nil, nil, err
1522 }
1523 compData, err := compress(data, cp, comp)
1524 if err != nil {
1525 return nil, nil, nil, err
1526 }
1527 hdr, payload = msgHeader(data, compData)
1528 return hdr, payload, data, nil
1529}