blob: 40abdaca7b72c588a1877023e3301f8eaa9a47ac [file] [log] [blame]
khenaidoo5fc5cea2021-08-11 17:39:16 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "io"
25 "math"
26 "strconv"
27 "sync"
28 "time"
29
30 "golang.org/x/net/trace"
31 "google.golang.org/grpc/balancer"
32 "google.golang.org/grpc/codes"
33 "google.golang.org/grpc/encoding"
34 "google.golang.org/grpc/internal/balancerload"
35 "google.golang.org/grpc/internal/binarylog"
36 "google.golang.org/grpc/internal/channelz"
37 "google.golang.org/grpc/internal/grpcrand"
38 "google.golang.org/grpc/internal/grpcutil"
39 iresolver "google.golang.org/grpc/internal/resolver"
40 "google.golang.org/grpc/internal/serviceconfig"
41 "google.golang.org/grpc/internal/transport"
42 "google.golang.org/grpc/metadata"
43 "google.golang.org/grpc/peer"
44 "google.golang.org/grpc/stats"
45 "google.golang.org/grpc/status"
46)
47
48// StreamHandler defines the handler called by gRPC server to complete the
49// execution of a streaming RPC. If a StreamHandler returns an error, it
50// should be produced by the status package, or else gRPC will use
51// codes.Unknown as the status code and err.Error() as the status message
52// of the RPC.
53type StreamHandler func(srv interface{}, stream ServerStream) error
54
55// StreamDesc represents a streaming RPC service's method specification. Used
56// on the server when registering services and on the client when initiating
57// new streams.
58type StreamDesc struct {
59 // StreamName and Handler are only used when registering handlers on a
60 // server.
61 StreamName string // the name of the method excluding the service
62 Handler StreamHandler // the handler called for the method
63
64 // ServerStreams and ClientStreams are used for registering handlers on a
65 // server as well as defining RPC behavior when passed to NewClientStream
66 // and ClientConn.NewStream. At least one must be true.
67 ServerStreams bool // indicates the server can perform streaming sends
68 ClientStreams bool // indicates the client can perform streaming sends
69}
70
71// Stream defines the common interface a client or server stream has to satisfy.
72//
73// Deprecated: See ClientStream and ServerStream documentation instead.
74type Stream interface {
75 // Deprecated: See ClientStream and ServerStream documentation instead.
76 Context() context.Context
77 // Deprecated: See ClientStream and ServerStream documentation instead.
78 SendMsg(m interface{}) error
79 // Deprecated: See ClientStream and ServerStream documentation instead.
80 RecvMsg(m interface{}) error
81}
82
83// ClientStream defines the client-side behavior of a streaming RPC.
84//
85// All errors returned from ClientStream methods are compatible with the
86// status package.
87type ClientStream interface {
88 // Header returns the header metadata received from the server if there
89 // is any. It blocks if the metadata is not ready to read.
90 Header() (metadata.MD, error)
91 // Trailer returns the trailer metadata from the server, if there is any.
92 // It must only be called after stream.CloseAndRecv has returned, or
93 // stream.Recv has returned a non-nil error (including io.EOF).
94 Trailer() metadata.MD
95 // CloseSend closes the send direction of the stream. It closes the stream
96 // when non-nil error is met. It is also not safe to call CloseSend
97 // concurrently with SendMsg.
98 CloseSend() error
99 // Context returns the context for this stream.
100 //
101 // It should not be called until after Header or RecvMsg has returned. Once
102 // called, subsequent client-side retries are disabled.
103 Context() context.Context
104 // SendMsg is generally called by generated code. On error, SendMsg aborts
105 // the stream. If the error was generated by the client, the status is
106 // returned directly; otherwise, io.EOF is returned and the status of
107 // the stream may be discovered using RecvMsg.
108 //
109 // SendMsg blocks until:
110 // - There is sufficient flow control to schedule m with the transport, or
111 // - The stream is done, or
112 // - The stream breaks.
113 //
114 // SendMsg does not wait until the message is received by the server. An
115 // untimely stream closure may result in lost messages. To ensure delivery,
116 // users should ensure the RPC completed successfully using RecvMsg.
117 //
118 // It is safe to have a goroutine calling SendMsg and another goroutine
119 // calling RecvMsg on the same stream at the same time, but it is not safe
120 // to call SendMsg on the same stream in different goroutines. It is also
121 // not safe to call CloseSend concurrently with SendMsg.
122 SendMsg(m interface{}) error
123 // RecvMsg blocks until it receives a message into m or the stream is
124 // done. It returns io.EOF when the stream completes successfully. On
125 // any other error, the stream is aborted and the error contains the RPC
126 // status.
127 //
128 // It is safe to have a goroutine calling SendMsg and another goroutine
129 // calling RecvMsg on the same stream at the same time, but it is not
130 // safe to call RecvMsg on the same stream in different goroutines.
131 RecvMsg(m interface{}) error
132}
133
134// NewStream creates a new Stream for the client side. This is typically
135// called by generated code. ctx is used for the lifetime of the stream.
136//
137// To ensure resources are not leaked due to the stream returned, one of the following
138// actions must be performed:
139//
Joey Armstrongba3d9d12024-01-15 14:22:11 -0500140// 1. Call Close on the ClientConn.
141// 2. Cancel the context provided.
142// 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
143// client-streaming RPC, for instance, might use the helper function
144// CloseAndRecv (note that CloseSend does not Recv, therefore is not
145// guaranteed to release all resources).
146// 4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
khenaidoo5fc5cea2021-08-11 17:39:16 -0400147//
148// If none of the above happen, a goroutine and a context will be leaked, and grpc
149// will not call the optionally-configured stats handler with a stats.End message.
150func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
151 // allow interceptor to see all applicable call options, which means those
152 // configured as defaults from dial option as well as per-call options
153 opts = combine(cc.dopts.callOptions, opts)
154
155 if cc.dopts.streamInt != nil {
156 return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
157 }
158 return newClientStream(ctx, desc, cc, method, opts...)
159}
160
161// NewClientStream is a wrapper for ClientConn.NewStream.
162func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
163 return cc.NewStream(ctx, desc, method, opts...)
164}
165
166func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
167 if channelz.IsOn() {
168 cc.incrCallsStarted()
169 defer func() {
170 if err != nil {
171 cc.incrCallsFailed()
172 }
173 }()
174 }
175 // Provide an opportunity for the first RPC to see the first service config
176 // provided by the resolver.
177 if err := cc.waitForResolvedAddrs(ctx); err != nil {
178 return nil, err
179 }
180
181 var mc serviceconfig.MethodConfig
182 var onCommit func()
183 var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
184 return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
185 }
186
187 rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method}
188 rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo)
189 if err != nil {
190 return nil, toRPCErr(err)
191 }
192
193 if rpcConfig != nil {
194 if rpcConfig.Context != nil {
195 ctx = rpcConfig.Context
196 }
197 mc = rpcConfig.MethodConfig
198 onCommit = rpcConfig.OnCommitted
199 if rpcConfig.Interceptor != nil {
200 rpcInfo.Context = nil
201 ns := newStream
202 newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
203 cs, err := rpcConfig.Interceptor.NewStream(ctx, rpcInfo, done, ns)
204 if err != nil {
205 return nil, toRPCErr(err)
206 }
207 return cs, nil
208 }
209 }
210 }
211
212 return newStream(ctx, func() {})
213}
214
215func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
216 c := defaultCallInfo()
217 if mc.WaitForReady != nil {
218 c.failFast = !*mc.WaitForReady
219 }
220
221 // Possible context leak:
222 // The cancel function for the child context we create will only be called
223 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
224 // an error is generated by SendMsg.
225 // https://github.com/grpc/grpc-go/issues/1818.
226 var cancel context.CancelFunc
227 if mc.Timeout != nil && *mc.Timeout >= 0 {
228 ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
229 } else {
230 ctx, cancel = context.WithCancel(ctx)
231 }
232 defer func() {
233 if err != nil {
234 cancel()
235 }
236 }()
237
238 for _, o := range opts {
239 if err := o.before(c); err != nil {
240 return nil, toRPCErr(err)
241 }
242 }
243 c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize)
244 c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
245 if err := setCallInfoCodec(c); err != nil {
246 return nil, err
247 }
248
249 callHdr := &transport.CallHdr{
250 Host: cc.authority,
251 Method: method,
252 ContentSubtype: c.contentSubtype,
253 DoneFunc: doneFunc,
254 }
255
256 // Set our outgoing compression according to the UseCompressor CallOption, if
257 // set. In that case, also find the compressor from the encoding package.
258 // Otherwise, use the compressor configured by the WithCompressor DialOption,
259 // if set.
260 var cp Compressor
261 var comp encoding.Compressor
262 if ct := c.compressorType; ct != "" {
263 callHdr.SendCompress = ct
264 if ct != encoding.Identity {
265 comp = encoding.GetCompressor(ct)
266 if comp == nil {
267 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
268 }
269 }
270 } else if cc.dopts.cp != nil {
271 callHdr.SendCompress = cc.dopts.cp.Type()
272 cp = cc.dopts.cp
273 }
274 if c.creds != nil {
275 callHdr.Creds = c.creds
276 }
277
278 cs := &clientStream{
279 callHdr: callHdr,
280 ctx: ctx,
281 methodConfig: &mc,
282 opts: opts,
283 callInfo: c,
284 cc: cc,
285 desc: desc,
286 codec: c.codec,
287 cp: cp,
288 comp: comp,
289 cancel: cancel,
290 firstAttempt: true,
291 onCommit: onCommit,
292 }
293 if !cc.dopts.disableRetry {
294 cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
295 }
296 cs.binlog = binarylog.GetMethodLogger(method)
297
298 if err := cs.newAttemptLocked(false /* isTransparent */); err != nil {
299 cs.finish(err)
300 return nil, err
301 }
302
303 op := func(a *csAttempt) error { return a.newStream() }
304 if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
305 cs.finish(err)
306 return nil, err
307 }
308
309 if cs.binlog != nil {
310 md, _ := metadata.FromOutgoingContext(ctx)
311 logEntry := &binarylog.ClientHeader{
312 OnClientSide: true,
313 Header: md,
314 MethodName: method,
315 Authority: cs.cc.authority,
316 }
317 if deadline, ok := ctx.Deadline(); ok {
318 logEntry.Timeout = time.Until(deadline)
319 if logEntry.Timeout < 0 {
320 logEntry.Timeout = 0
321 }
322 }
323 cs.binlog.Log(logEntry)
324 }
325
326 if desc != unaryStreamDesc {
327 // Listen on cc and stream contexts to cleanup when the user closes the
328 // ClientConn or cancels the stream context. In all other cases, an error
329 // should already be injected into the recv buffer by the transport, which
330 // the client will eventually receive, and then we will cancel the stream's
331 // context in clientStream.finish.
332 go func() {
333 select {
334 case <-cc.ctx.Done():
335 cs.finish(ErrClientConnClosing)
336 case <-ctx.Done():
337 cs.finish(toRPCErr(ctx.Err()))
338 }
339 }()
340 }
341 return cs, nil
342}
343
344// newAttemptLocked creates a new attempt with a transport.
345// If it succeeds, then it replaces clientStream's attempt with this new attempt.
346func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) {
347 ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.cp, cs.comp)
348 method := cs.callHdr.Method
349 sh := cs.cc.dopts.copts.StatsHandler
350 var beginTime time.Time
351 if sh != nil {
352 ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast})
353 beginTime = time.Now()
354 begin := &stats.Begin{
355 Client: true,
356 BeginTime: beginTime,
357 FailFast: cs.callInfo.failFast,
358 IsClientStream: cs.desc.ClientStreams,
359 IsServerStream: cs.desc.ServerStreams,
360 IsTransparentRetryAttempt: isTransparent,
361 }
362 sh.HandleRPC(ctx, begin)
363 }
364
365 var trInfo *traceInfo
366 if EnableTracing {
367 trInfo = &traceInfo{
368 tr: trace.New("grpc.Sent."+methodFamily(method), method),
369 firstLine: firstLine{
370 client: true,
371 },
372 }
373 if deadline, ok := ctx.Deadline(); ok {
374 trInfo.firstLine.deadline = time.Until(deadline)
375 }
376 trInfo.tr.LazyLog(&trInfo.firstLine, false)
377 ctx = trace.NewContext(ctx, trInfo.tr)
378 }
379
380 newAttempt := &csAttempt{
381 ctx: ctx,
382 beginTime: beginTime,
383 cs: cs,
384 dc: cs.cc.dopts.dc,
385 statsHandler: sh,
386 trInfo: trInfo,
387 }
388 defer func() {
389 if retErr != nil {
390 // This attempt is not set in the clientStream, so it's finish won't
391 // be called. Call it here for stats and trace in case they are not
392 // nil.
393 newAttempt.finish(retErr)
394 }
395 }()
396
397 if err := ctx.Err(); err != nil {
398 return toRPCErr(err)
399 }
400
401 if cs.cc.parsedTarget.Scheme == "xds" {
402 // Add extra metadata (metadata that will be added by transport) to context
403 // so the balancer can see them.
404 ctx = grpcutil.WithExtraMetadata(ctx, metadata.Pairs(
405 "content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype),
406 ))
407 }
408 t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method)
409 if err != nil {
410 return err
411 }
412 if trInfo != nil {
413 trInfo.firstLine.SetRemoteAddr(t.RemoteAddr())
414 }
415 newAttempt.t = t
416 newAttempt.done = done
417 cs.attempt = newAttempt
418 return nil
419}
420
421func (a *csAttempt) newStream() error {
422 cs := a.cs
423 cs.callHdr.PreviousAttempts = cs.numRetries
424 s, err := a.t.NewStream(a.ctx, cs.callHdr)
425 if err != nil {
426 // Return without converting to an RPC error so retry code can
427 // inspect.
428 return err
429 }
430 cs.attempt.s = s
431 cs.attempt.p = &parser{r: s}
432 return nil
433}
434
435// clientStream implements a client side Stream.
436type clientStream struct {
437 callHdr *transport.CallHdr
438 opts []CallOption
439 callInfo *callInfo
440 cc *ClientConn
441 desc *StreamDesc
442
443 codec baseCodec
444 cp Compressor
445 comp encoding.Compressor
446
447 cancel context.CancelFunc // cancels all attempts
448
449 sentLast bool // sent an end stream
450
451 methodConfig *MethodConfig
452
453 ctx context.Context // the application's context, wrapped by stats/tracing
454
455 retryThrottler *retryThrottler // The throttler active when the RPC began.
456
457 binlog *binarylog.MethodLogger // Binary logger, can be nil.
458 // serverHeaderBinlogged is a boolean for whether server header has been
459 // logged. Server header will be logged when the first time one of those
460 // happens: stream.Header(), stream.Recv().
461 //
462 // It's only read and used by Recv() and Header(), so it doesn't need to be
463 // synchronized.
464 serverHeaderBinlogged bool
465
466 mu sync.Mutex
467 firstAttempt bool // if true, transparent retry is valid
468 numRetries int // exclusive of transparent retry attempt(s)
469 numRetriesSincePushback int // retries since pushback; to reset backoff
470 finished bool // TODO: replace with atomic cmpxchg or sync.Once?
471 // attempt is the active client stream attempt.
472 // The only place where it is written is the newAttemptLocked method and this method never writes nil.
473 // So, attempt can be nil only inside newClientStream function when clientStream is first created.
474 // One of the first things done after clientStream's creation, is to call newAttemptLocked which either
475 // assigns a non nil value to the attempt or returns an error. If an error is returned from newAttemptLocked,
476 // then newClientStream calls finish on the clientStream and returns. So, finish method is the only
477 // place where we need to check if the attempt is nil.
478 attempt *csAttempt
479 // TODO(hedging): hedging will have multiple attempts simultaneously.
480 committed bool // active attempt committed for retry?
481 onCommit func()
482 buffer []func(a *csAttempt) error // operations to replay on retry
483 bufferSize int // current size of buffer
484}
485
486// csAttempt implements a single transport stream attempt within a
487// clientStream.
488type csAttempt struct {
489 ctx context.Context
490 cs *clientStream
491 t transport.ClientTransport
492 s *transport.Stream
493 p *parser
494 done func(balancer.DoneInfo)
495
496 finished bool
497 dc Decompressor
498 decomp encoding.Compressor
499 decompSet bool
500
501 mu sync.Mutex // guards trInfo.tr
502 // trInfo may be nil (if EnableTracing is false).
503 // trInfo.tr is set when created (if EnableTracing is true),
504 // and cleared when the finish method is called.
505 trInfo *traceInfo
506
507 statsHandler stats.Handler
508 beginTime time.Time
509}
510
511func (cs *clientStream) commitAttemptLocked() {
512 if !cs.committed && cs.onCommit != nil {
513 cs.onCommit()
514 }
515 cs.committed = true
516 cs.buffer = nil
517}
518
519func (cs *clientStream) commitAttempt() {
520 cs.mu.Lock()
521 cs.commitAttemptLocked()
522 cs.mu.Unlock()
523}
524
525// shouldRetry returns nil if the RPC should be retried; otherwise it returns
526// the error that should be returned by the operation. If the RPC should be
527// retried, the bool indicates whether it is being retried transparently.
528func (cs *clientStream) shouldRetry(err error) (bool, error) {
529 if cs.attempt.s == nil {
530 // Error from NewClientStream.
531 nse, ok := err.(*transport.NewStreamError)
532 if !ok {
533 // Unexpected, but assume no I/O was performed and the RPC is not
534 // fatal, so retry indefinitely.
535 return true, nil
536 }
537
538 // Unwrap and convert error.
539 err = toRPCErr(nse.Err)
540
541 // Never retry DoNotRetry errors, which indicate the RPC should not be
542 // retried due to max header list size violation, etc.
543 if nse.DoNotRetry {
544 return false, err
545 }
546
547 // In the event of a non-IO operation error from NewStream, we never
548 // attempted to write anything to the wire, so we can retry
549 // indefinitely.
550 if !nse.DoNotTransparentRetry {
551 return true, nil
552 }
553 }
554 if cs.finished || cs.committed {
555 // RPC is finished or committed; cannot retry.
556 return false, err
557 }
558 // Wait for the trailers.
559 unprocessed := false
560 if cs.attempt.s != nil {
561 <-cs.attempt.s.Done()
562 unprocessed = cs.attempt.s.Unprocessed()
563 }
564 if cs.firstAttempt && unprocessed {
565 // First attempt, stream unprocessed: transparently retry.
566 return true, nil
567 }
568 if cs.cc.dopts.disableRetry {
569 return false, err
570 }
571
572 pushback := 0
573 hasPushback := false
574 if cs.attempt.s != nil {
575 if !cs.attempt.s.TrailersOnly() {
576 return false, err
577 }
578
579 // TODO(retry): Move down if the spec changes to not check server pushback
580 // before considering this a failure for throttling.
581 sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"]
582 if len(sps) == 1 {
583 var e error
584 if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
585 channelz.Infof(logger, cs.cc.channelzID, "Server retry pushback specified to abort (%q).", sps[0])
586 cs.retryThrottler.throttle() // This counts as a failure for throttling.
587 return false, err
588 }
589 hasPushback = true
590 } else if len(sps) > 1 {
591 channelz.Warningf(logger, cs.cc.channelzID, "Server retry pushback specified multiple values (%q); not retrying.", sps)
592 cs.retryThrottler.throttle() // This counts as a failure for throttling.
593 return false, err
594 }
595 }
596
597 var code codes.Code
598 if cs.attempt.s != nil {
599 code = cs.attempt.s.Status().Code()
600 } else {
601 code = status.Convert(err).Code()
602 }
603
604 rp := cs.methodConfig.RetryPolicy
605 if rp == nil || !rp.RetryableStatusCodes[code] {
606 return false, err
607 }
608
609 // Note: the ordering here is important; we count this as a failure
610 // only if the code matched a retryable code.
611 if cs.retryThrottler.throttle() {
612 return false, err
613 }
614 if cs.numRetries+1 >= rp.MaxAttempts {
615 return false, err
616 }
617
618 var dur time.Duration
619 if hasPushback {
620 dur = time.Millisecond * time.Duration(pushback)
621 cs.numRetriesSincePushback = 0
622 } else {
623 fact := math.Pow(rp.BackoffMultiplier, float64(cs.numRetriesSincePushback))
624 cur := float64(rp.InitialBackoff) * fact
625 if max := float64(rp.MaxBackoff); cur > max {
626 cur = max
627 }
628 dur = time.Duration(grpcrand.Int63n(int64(cur)))
629 cs.numRetriesSincePushback++
630 }
631
632 // TODO(dfawley): we could eagerly fail here if dur puts us past the
633 // deadline, but unsure if it is worth doing.
634 t := time.NewTimer(dur)
635 select {
636 case <-t.C:
637 cs.numRetries++
638 return false, nil
639 case <-cs.ctx.Done():
640 t.Stop()
641 return false, status.FromContextError(cs.ctx.Err()).Err()
642 }
643}
644
645// Returns nil if a retry was performed and succeeded; error otherwise.
646func (cs *clientStream) retryLocked(lastErr error) error {
647 for {
648 cs.attempt.finish(toRPCErr(lastErr))
649 isTransparent, err := cs.shouldRetry(lastErr)
650 if err != nil {
651 cs.commitAttemptLocked()
652 return err
653 }
654 cs.firstAttempt = false
655 if err := cs.newAttemptLocked(isTransparent); err != nil {
656 return err
657 }
658 if lastErr = cs.replayBufferLocked(); lastErr == nil {
659 return nil
660 }
661 }
662}
663
664func (cs *clientStream) Context() context.Context {
665 cs.commitAttempt()
666 // No need to lock before using attempt, since we know it is committed and
667 // cannot change.
668 return cs.attempt.s.Context()
669}
670
671func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
672 cs.mu.Lock()
673 for {
674 if cs.committed {
675 cs.mu.Unlock()
676 // toRPCErr is used in case the error from the attempt comes from
677 // NewClientStream, which intentionally doesn't return a status
678 // error to allow for further inspection; all other errors should
679 // already be status errors.
680 return toRPCErr(op(cs.attempt))
681 }
682 a := cs.attempt
683 cs.mu.Unlock()
684 err := op(a)
685 cs.mu.Lock()
686 if a != cs.attempt {
687 // We started another attempt already.
688 continue
689 }
690 if err == io.EOF {
691 <-a.s.Done()
692 }
693 if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) {
694 onSuccess()
695 cs.mu.Unlock()
696 return err
697 }
698 if err := cs.retryLocked(err); err != nil {
699 cs.mu.Unlock()
700 return err
701 }
702 }
703}
704
705func (cs *clientStream) Header() (metadata.MD, error) {
706 var m metadata.MD
707 err := cs.withRetry(func(a *csAttempt) error {
708 var err error
709 m, err = a.s.Header()
710 return toRPCErr(err)
711 }, cs.commitAttemptLocked)
712 if err != nil {
713 cs.finish(err)
714 return nil, err
715 }
716 if cs.binlog != nil && !cs.serverHeaderBinlogged {
717 // Only log if binary log is on and header has not been logged.
718 logEntry := &binarylog.ServerHeader{
719 OnClientSide: true,
720 Header: m,
721 PeerAddr: nil,
722 }
723 if peer, ok := peer.FromContext(cs.Context()); ok {
724 logEntry.PeerAddr = peer.Addr
725 }
726 cs.binlog.Log(logEntry)
727 cs.serverHeaderBinlogged = true
728 }
729 return m, err
730}
731
732func (cs *clientStream) Trailer() metadata.MD {
733 // On RPC failure, we never need to retry, because usage requires that
734 // RecvMsg() returned a non-nil error before calling this function is valid.
735 // We would have retried earlier if necessary.
736 //
737 // Commit the attempt anyway, just in case users are not following those
738 // directions -- it will prevent races and should not meaningfully impact
739 // performance.
740 cs.commitAttempt()
741 if cs.attempt.s == nil {
742 return nil
743 }
744 return cs.attempt.s.Trailer()
745}
746
747func (cs *clientStream) replayBufferLocked() error {
748 a := cs.attempt
749 for _, f := range cs.buffer {
750 if err := f(a); err != nil {
751 return err
752 }
753 }
754 return nil
755}
756
757func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error) {
758 // Note: we still will buffer if retry is disabled (for transparent retries).
759 if cs.committed {
760 return
761 }
762 cs.bufferSize += sz
763 if cs.bufferSize > cs.callInfo.maxRetryRPCBufferSize {
764 cs.commitAttemptLocked()
765 return
766 }
767 cs.buffer = append(cs.buffer, op)
768}
769
770func (cs *clientStream) SendMsg(m interface{}) (err error) {
771 defer func() {
772 if err != nil && err != io.EOF {
773 // Call finish on the client stream for errors generated by this SendMsg
774 // call, as these indicate problems created by this client. (Transport
775 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
776 // error will be returned from RecvMsg eventually in that case, or be
777 // retried.)
778 cs.finish(err)
779 }
780 }()
781 if cs.sentLast {
782 return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
783 }
784 if !cs.desc.ClientStreams {
785 cs.sentLast = true
786 }
787
788 // load hdr, payload, data
789 hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
790 if err != nil {
791 return err
792 }
793
794 // TODO(dfawley): should we be checking len(data) instead?
795 if len(payload) > *cs.callInfo.maxSendMessageSize {
796 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
797 }
798 msgBytes := data // Store the pointer before setting to nil. For binary logging.
799 op := func(a *csAttempt) error {
800 err := a.sendMsg(m, hdr, payload, data)
801 // nil out the message and uncomp when replaying; they are only needed for
802 // stats which is disabled for subsequent attempts.
803 m, data = nil, nil
804 return err
805 }
806 err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
807 if cs.binlog != nil && err == nil {
808 cs.binlog.Log(&binarylog.ClientMessage{
809 OnClientSide: true,
810 Message: msgBytes,
811 })
812 }
813 return
814}
815
816func (cs *clientStream) RecvMsg(m interface{}) error {
817 if cs.binlog != nil && !cs.serverHeaderBinlogged {
818 // Call Header() to binary log header if it's not already logged.
819 cs.Header()
820 }
821 var recvInfo *payloadInfo
822 if cs.binlog != nil {
823 recvInfo = &payloadInfo{}
824 }
825 err := cs.withRetry(func(a *csAttempt) error {
826 return a.recvMsg(m, recvInfo)
827 }, cs.commitAttemptLocked)
828 if cs.binlog != nil && err == nil {
829 cs.binlog.Log(&binarylog.ServerMessage{
830 OnClientSide: true,
831 Message: recvInfo.uncompressedBytes,
832 })
833 }
834 if err != nil || !cs.desc.ServerStreams {
835 // err != nil or non-server-streaming indicates end of stream.
836 cs.finish(err)
837
838 if cs.binlog != nil {
839 // finish will not log Trailer. Log Trailer here.
840 logEntry := &binarylog.ServerTrailer{
841 OnClientSide: true,
842 Trailer: cs.Trailer(),
843 Err: err,
844 }
845 if logEntry.Err == io.EOF {
846 logEntry.Err = nil
847 }
848 if peer, ok := peer.FromContext(cs.Context()); ok {
849 logEntry.PeerAddr = peer.Addr
850 }
851 cs.binlog.Log(logEntry)
852 }
853 }
854 return err
855}
856
857func (cs *clientStream) CloseSend() error {
858 if cs.sentLast {
859 // TODO: return an error and finish the stream instead, due to API misuse?
860 return nil
861 }
862 cs.sentLast = true
863 op := func(a *csAttempt) error {
864 a.t.Write(a.s, nil, nil, &transport.Options{Last: true})
865 // Always return nil; io.EOF is the only error that might make sense
866 // instead, but there is no need to signal the client to call RecvMsg
867 // as the only use left for the stream after CloseSend is to call
868 // RecvMsg. This also matches historical behavior.
869 return nil
870 }
871 cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) })
872 if cs.binlog != nil {
873 cs.binlog.Log(&binarylog.ClientHalfClose{
874 OnClientSide: true,
875 })
876 }
877 // We never returned an error here for reasons.
878 return nil
879}
880
881func (cs *clientStream) finish(err error) {
882 if err == io.EOF {
883 // Ending a stream with EOF indicates a success.
884 err = nil
885 }
886 cs.mu.Lock()
887 if cs.finished {
888 cs.mu.Unlock()
889 return
890 }
891 cs.finished = true
892 cs.commitAttemptLocked()
893 if cs.attempt != nil {
894 cs.attempt.finish(err)
895 // after functions all rely upon having a stream.
896 if cs.attempt.s != nil {
897 for _, o := range cs.opts {
898 o.after(cs.callInfo, cs.attempt)
899 }
900 }
901 }
902 cs.mu.Unlock()
903 // For binary logging. only log cancel in finish (could be caused by RPC ctx
904 // canceled or ClientConn closed). Trailer will be logged in RecvMsg.
905 //
906 // Only one of cancel or trailer needs to be logged. In the cases where
907 // users don't call RecvMsg, users must have already canceled the RPC.
908 if cs.binlog != nil && status.Code(err) == codes.Canceled {
909 cs.binlog.Log(&binarylog.Cancel{
910 OnClientSide: true,
911 })
912 }
913 if err == nil {
914 cs.retryThrottler.successfulRPC()
915 }
916 if channelz.IsOn() {
917 if err != nil {
918 cs.cc.incrCallsFailed()
919 } else {
920 cs.cc.incrCallsSucceeded()
921 }
922 }
923 cs.cancel()
924}
925
926func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
927 cs := a.cs
928 if a.trInfo != nil {
929 a.mu.Lock()
930 if a.trInfo.tr != nil {
931 a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
932 }
933 a.mu.Unlock()
934 }
935 if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
936 if !cs.desc.ClientStreams {
937 // For non-client-streaming RPCs, we return nil instead of EOF on error
938 // because the generated code requires it. finish is not called; RecvMsg()
939 // will call it with the stream's status independently.
940 return nil
941 }
942 return io.EOF
943 }
944 if a.statsHandler != nil {
945 a.statsHandler.HandleRPC(a.ctx, outPayload(true, m, data, payld, time.Now()))
946 }
947 if channelz.IsOn() {
948 a.t.IncrMsgSent()
949 }
950 return nil
951}
952
953func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
954 cs := a.cs
955 if a.statsHandler != nil && payInfo == nil {
956 payInfo = &payloadInfo{}
957 }
958
959 if !a.decompSet {
960 // Block until we receive headers containing received message encoding.
961 if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity {
962 if a.dc == nil || a.dc.Type() != ct {
963 // No configured decompressor, or it does not match the incoming
964 // message encoding; attempt to find a registered compressor that does.
965 a.dc = nil
966 a.decomp = encoding.GetCompressor(ct)
967 }
968 } else {
969 // No compression is used; disable our decompressor.
970 a.dc = nil
971 }
972 // Only initialize this state once per stream.
973 a.decompSet = true
974 }
975 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
976 if err != nil {
977 if err == io.EOF {
978 if statusErr := a.s.Status().Err(); statusErr != nil {
979 return statusErr
980 }
981 return io.EOF // indicates successful end of stream.
982 }
983 return toRPCErr(err)
984 }
985 if a.trInfo != nil {
986 a.mu.Lock()
987 if a.trInfo.tr != nil {
988 a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
989 }
990 a.mu.Unlock()
991 }
992 if a.statsHandler != nil {
993 a.statsHandler.HandleRPC(a.ctx, &stats.InPayload{
994 Client: true,
995 RecvTime: time.Now(),
996 Payload: m,
997 // TODO truncate large payload.
998 Data: payInfo.uncompressedBytes,
999 WireLength: payInfo.wireLength + headerLen,
1000 Length: len(payInfo.uncompressedBytes),
1001 })
1002 }
1003 if channelz.IsOn() {
1004 a.t.IncrMsgRecv()
1005 }
1006 if cs.desc.ServerStreams {
1007 // Subsequent messages should be received by subsequent RecvMsg calls.
1008 return nil
1009 }
1010 // Special handling for non-server-stream rpcs.
1011 // This recv expects EOF or errors, so we don't collect inPayload.
1012 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp)
1013 if err == nil {
1014 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
1015 }
1016 if err == io.EOF {
1017 return a.s.Status().Err() // non-server streaming Recv returns nil on success
1018 }
1019 return toRPCErr(err)
1020}
1021
1022func (a *csAttempt) finish(err error) {
1023 a.mu.Lock()
1024 if a.finished {
1025 a.mu.Unlock()
1026 return
1027 }
1028 a.finished = true
1029 if err == io.EOF {
1030 // Ending a stream with EOF indicates a success.
1031 err = nil
1032 }
1033 var tr metadata.MD
1034 if a.s != nil {
1035 a.t.CloseStream(a.s, err)
1036 tr = a.s.Trailer()
1037 }
1038
1039 if a.done != nil {
1040 br := false
1041 if a.s != nil {
1042 br = a.s.BytesReceived()
1043 }
1044 a.done(balancer.DoneInfo{
1045 Err: err,
1046 Trailer: tr,
1047 BytesSent: a.s != nil,
1048 BytesReceived: br,
1049 ServerLoad: balancerload.Parse(tr),
1050 })
1051 }
1052 if a.statsHandler != nil {
1053 end := &stats.End{
1054 Client: true,
1055 BeginTime: a.beginTime,
1056 EndTime: time.Now(),
1057 Trailer: tr,
1058 Error: err,
1059 }
1060 a.statsHandler.HandleRPC(a.ctx, end)
1061 }
1062 if a.trInfo != nil && a.trInfo.tr != nil {
1063 if err == nil {
1064 a.trInfo.tr.LazyPrintf("RPC: [OK]")
1065 } else {
1066 a.trInfo.tr.LazyPrintf("RPC: [%v]", err)
1067 a.trInfo.tr.SetError()
1068 }
1069 a.trInfo.tr.Finish()
1070 a.trInfo.tr = nil
1071 }
1072 a.mu.Unlock()
1073}
1074
1075// newClientStream creates a ClientStream with the specified transport, on the
1076// given addrConn.
1077//
1078// It's expected that the given transport is either the same one in addrConn, or
1079// is already closed. To avoid race, transport is specified separately, instead
1080// of using ac.transpot.
1081//
1082// Main difference between this and ClientConn.NewStream:
1083// - no retry
1084// - no service config (or wait for service config)
1085// - no tracing or stats
1086func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, ac *addrConn, opts ...CallOption) (_ ClientStream, err error) {
1087 if t == nil {
1088 // TODO: return RPC error here?
1089 return nil, errors.New("transport provided is nil")
1090 }
1091 // defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct.
1092 c := &callInfo{}
1093
1094 // Possible context leak:
1095 // The cancel function for the child context we create will only be called
1096 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
1097 // an error is generated by SendMsg.
1098 // https://github.com/grpc/grpc-go/issues/1818.
1099 ctx, cancel := context.WithCancel(ctx)
1100 defer func() {
1101 if err != nil {
1102 cancel()
1103 }
1104 }()
1105
1106 for _, o := range opts {
1107 if err := o.before(c); err != nil {
1108 return nil, toRPCErr(err)
1109 }
1110 }
1111 c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
1112 c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
1113 if err := setCallInfoCodec(c); err != nil {
1114 return nil, err
1115 }
1116
1117 callHdr := &transport.CallHdr{
1118 Host: ac.cc.authority,
1119 Method: method,
1120 ContentSubtype: c.contentSubtype,
1121 }
1122
1123 // Set our outgoing compression according to the UseCompressor CallOption, if
1124 // set. In that case, also find the compressor from the encoding package.
1125 // Otherwise, use the compressor configured by the WithCompressor DialOption,
1126 // if set.
1127 var cp Compressor
1128 var comp encoding.Compressor
1129 if ct := c.compressorType; ct != "" {
1130 callHdr.SendCompress = ct
1131 if ct != encoding.Identity {
1132 comp = encoding.GetCompressor(ct)
1133 if comp == nil {
1134 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
1135 }
1136 }
1137 } else if ac.cc.dopts.cp != nil {
1138 callHdr.SendCompress = ac.cc.dopts.cp.Type()
1139 cp = ac.cc.dopts.cp
1140 }
1141 if c.creds != nil {
1142 callHdr.Creds = c.creds
1143 }
1144
1145 // Use a special addrConnStream to avoid retry.
1146 as := &addrConnStream{
1147 callHdr: callHdr,
1148 ac: ac,
1149 ctx: ctx,
1150 cancel: cancel,
1151 opts: opts,
1152 callInfo: c,
1153 desc: desc,
1154 codec: c.codec,
1155 cp: cp,
1156 comp: comp,
1157 t: t,
1158 }
1159
1160 s, err := as.t.NewStream(as.ctx, as.callHdr)
1161 if err != nil {
1162 err = toRPCErr(err)
1163 return nil, err
1164 }
1165 as.s = s
1166 as.p = &parser{r: s}
1167 ac.incrCallsStarted()
1168 if desc != unaryStreamDesc {
1169 // Listen on cc and stream contexts to cleanup when the user closes the
1170 // ClientConn or cancels the stream context. In all other cases, an error
1171 // should already be injected into the recv buffer by the transport, which
1172 // the client will eventually receive, and then we will cancel the stream's
1173 // context in clientStream.finish.
1174 go func() {
1175 select {
1176 case <-ac.ctx.Done():
1177 as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
1178 case <-ctx.Done():
1179 as.finish(toRPCErr(ctx.Err()))
1180 }
1181 }()
1182 }
1183 return as, nil
1184}
1185
1186type addrConnStream struct {
1187 s *transport.Stream
1188 ac *addrConn
1189 callHdr *transport.CallHdr
1190 cancel context.CancelFunc
1191 opts []CallOption
1192 callInfo *callInfo
1193 t transport.ClientTransport
1194 ctx context.Context
1195 sentLast bool
1196 desc *StreamDesc
1197 codec baseCodec
1198 cp Compressor
1199 comp encoding.Compressor
1200 decompSet bool
1201 dc Decompressor
1202 decomp encoding.Compressor
1203 p *parser
1204 mu sync.Mutex
1205 finished bool
1206}
1207
1208func (as *addrConnStream) Header() (metadata.MD, error) {
1209 m, err := as.s.Header()
1210 if err != nil {
1211 as.finish(toRPCErr(err))
1212 }
1213 return m, err
1214}
1215
1216func (as *addrConnStream) Trailer() metadata.MD {
1217 return as.s.Trailer()
1218}
1219
1220func (as *addrConnStream) CloseSend() error {
1221 if as.sentLast {
1222 // TODO: return an error and finish the stream instead, due to API misuse?
1223 return nil
1224 }
1225 as.sentLast = true
1226
1227 as.t.Write(as.s, nil, nil, &transport.Options{Last: true})
1228 // Always return nil; io.EOF is the only error that might make sense
1229 // instead, but there is no need to signal the client to call RecvMsg
1230 // as the only use left for the stream after CloseSend is to call
1231 // RecvMsg. This also matches historical behavior.
1232 return nil
1233}
1234
1235func (as *addrConnStream) Context() context.Context {
1236 return as.s.Context()
1237}
1238
1239func (as *addrConnStream) SendMsg(m interface{}) (err error) {
1240 defer func() {
1241 if err != nil && err != io.EOF {
1242 // Call finish on the client stream for errors generated by this SendMsg
1243 // call, as these indicate problems created by this client. (Transport
1244 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
1245 // error will be returned from RecvMsg eventually in that case, or be
1246 // retried.)
1247 as.finish(err)
1248 }
1249 }()
1250 if as.sentLast {
1251 return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
1252 }
1253 if !as.desc.ClientStreams {
1254 as.sentLast = true
1255 }
1256
1257 // load hdr, payload, data
1258 hdr, payld, _, err := prepareMsg(m, as.codec, as.cp, as.comp)
1259 if err != nil {
1260 return err
1261 }
1262
1263 // TODO(dfawley): should we be checking len(data) instead?
1264 if len(payld) > *as.callInfo.maxSendMessageSize {
1265 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize)
1266 }
1267
1268 if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams}); err != nil {
1269 if !as.desc.ClientStreams {
1270 // For non-client-streaming RPCs, we return nil instead of EOF on error
1271 // because the generated code requires it. finish is not called; RecvMsg()
1272 // will call it with the stream's status independently.
1273 return nil
1274 }
1275 return io.EOF
1276 }
1277
1278 if channelz.IsOn() {
1279 as.t.IncrMsgSent()
1280 }
1281 return nil
1282}
1283
1284func (as *addrConnStream) RecvMsg(m interface{}) (err error) {
1285 defer func() {
1286 if err != nil || !as.desc.ServerStreams {
1287 // err != nil or non-server-streaming indicates end of stream.
1288 as.finish(err)
1289 }
1290 }()
1291
1292 if !as.decompSet {
1293 // Block until we receive headers containing received message encoding.
1294 if ct := as.s.RecvCompress(); ct != "" && ct != encoding.Identity {
1295 if as.dc == nil || as.dc.Type() != ct {
1296 // No configured decompressor, or it does not match the incoming
1297 // message encoding; attempt to find a registered compressor that does.
1298 as.dc = nil
1299 as.decomp = encoding.GetCompressor(ct)
1300 }
1301 } else {
1302 // No compression is used; disable our decompressor.
1303 as.dc = nil
1304 }
1305 // Only initialize this state once per stream.
1306 as.decompSet = true
1307 }
1308 err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
1309 if err != nil {
1310 if err == io.EOF {
1311 if statusErr := as.s.Status().Err(); statusErr != nil {
1312 return statusErr
1313 }
1314 return io.EOF // indicates successful end of stream.
1315 }
1316 return toRPCErr(err)
1317 }
1318
1319 if channelz.IsOn() {
1320 as.t.IncrMsgRecv()
1321 }
1322 if as.desc.ServerStreams {
1323 // Subsequent messages should be received by subsequent RecvMsg calls.
1324 return nil
1325 }
1326
1327 // Special handling for non-server-stream rpcs.
1328 // This recv expects EOF or errors, so we don't collect inPayload.
1329 err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
1330 if err == nil {
1331 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
1332 }
1333 if err == io.EOF {
1334 return as.s.Status().Err() // non-server streaming Recv returns nil on success
1335 }
1336 return toRPCErr(err)
1337}
1338
1339func (as *addrConnStream) finish(err error) {
1340 as.mu.Lock()
1341 if as.finished {
1342 as.mu.Unlock()
1343 return
1344 }
1345 as.finished = true
1346 if err == io.EOF {
1347 // Ending a stream with EOF indicates a success.
1348 err = nil
1349 }
1350 if as.s != nil {
1351 as.t.CloseStream(as.s, err)
1352 }
1353
1354 if err != nil {
1355 as.ac.incrCallsFailed()
1356 } else {
1357 as.ac.incrCallsSucceeded()
1358 }
1359 as.cancel()
1360 as.mu.Unlock()
1361}
1362
1363// ServerStream defines the server-side behavior of a streaming RPC.
1364//
1365// All errors returned from ServerStream methods are compatible with the
1366// status package.
1367type ServerStream interface {
1368 // SetHeader sets the header metadata. It may be called multiple times.
1369 // When call multiple times, all the provided metadata will be merged.
1370 // All the metadata will be sent out when one of the following happens:
1371 // - ServerStream.SendHeader() is called;
1372 // - The first response is sent out;
1373 // - An RPC status is sent out (error or success).
1374 SetHeader(metadata.MD) error
1375 // SendHeader sends the header metadata.
1376 // The provided md and headers set by SetHeader() will be sent.
1377 // It fails if called multiple times.
1378 SendHeader(metadata.MD) error
1379 // SetTrailer sets the trailer metadata which will be sent with the RPC status.
1380 // When called more than once, all the provided metadata will be merged.
1381 SetTrailer(metadata.MD)
1382 // Context returns the context for this stream.
1383 Context() context.Context
1384 // SendMsg sends a message. On error, SendMsg aborts the stream and the
1385 // error is returned directly.
1386 //
1387 // SendMsg blocks until:
1388 // - There is sufficient flow control to schedule m with the transport, or
1389 // - The stream is done, or
1390 // - The stream breaks.
1391 //
1392 // SendMsg does not wait until the message is received by the client. An
1393 // untimely stream closure may result in lost messages.
1394 //
1395 // It is safe to have a goroutine calling SendMsg and another goroutine
1396 // calling RecvMsg on the same stream at the same time, but it is not safe
1397 // to call SendMsg on the same stream in different goroutines.
1398 SendMsg(m interface{}) error
1399 // RecvMsg blocks until it receives a message into m or the stream is
1400 // done. It returns io.EOF when the client has performed a CloseSend. On
1401 // any non-EOF error, the stream is aborted and the error contains the
1402 // RPC status.
1403 //
1404 // It is safe to have a goroutine calling SendMsg and another goroutine
1405 // calling RecvMsg on the same stream at the same time, but it is not
1406 // safe to call RecvMsg on the same stream in different goroutines.
1407 RecvMsg(m interface{}) error
1408}
1409
1410// serverStream implements a server side Stream.
1411type serverStream struct {
1412 ctx context.Context
1413 t transport.ServerTransport
1414 s *transport.Stream
1415 p *parser
1416 codec baseCodec
1417
1418 cp Compressor
1419 dc Decompressor
1420 comp encoding.Compressor
1421 decomp encoding.Compressor
1422
1423 maxReceiveMessageSize int
1424 maxSendMessageSize int
1425 trInfo *traceInfo
1426
1427 statsHandler stats.Handler
1428
1429 binlog *binarylog.MethodLogger
1430 // serverHeaderBinlogged indicates whether server header has been logged. It
1431 // will happen when one of the following two happens: stream.SendHeader(),
1432 // stream.Send().
1433 //
1434 // It's only checked in send and sendHeader, doesn't need to be
1435 // synchronized.
1436 serverHeaderBinlogged bool
1437
1438 mu sync.Mutex // protects trInfo.tr after the service handler runs.
1439}
1440
1441func (ss *serverStream) Context() context.Context {
1442 return ss.ctx
1443}
1444
1445func (ss *serverStream) SetHeader(md metadata.MD) error {
1446 if md.Len() == 0 {
1447 return nil
1448 }
1449 return ss.s.SetHeader(md)
1450}
1451
1452func (ss *serverStream) SendHeader(md metadata.MD) error {
1453 err := ss.t.WriteHeader(ss.s, md)
1454 if ss.binlog != nil && !ss.serverHeaderBinlogged {
1455 h, _ := ss.s.Header()
1456 ss.binlog.Log(&binarylog.ServerHeader{
1457 Header: h,
1458 })
1459 ss.serverHeaderBinlogged = true
1460 }
1461 return err
1462}
1463
1464func (ss *serverStream) SetTrailer(md metadata.MD) {
1465 if md.Len() == 0 {
1466 return
1467 }
1468 ss.s.SetTrailer(md)
1469}
1470
1471func (ss *serverStream) SendMsg(m interface{}) (err error) {
1472 defer func() {
1473 if ss.trInfo != nil {
1474 ss.mu.Lock()
1475 if ss.trInfo.tr != nil {
1476 if err == nil {
1477 ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
1478 } else {
1479 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1480 ss.trInfo.tr.SetError()
1481 }
1482 }
1483 ss.mu.Unlock()
1484 }
1485 if err != nil && err != io.EOF {
1486 st, _ := status.FromError(toRPCErr(err))
1487 ss.t.WriteStatus(ss.s, st)
1488 // Non-user specified status was sent out. This should be an error
1489 // case (as a server side Cancel maybe).
1490 //
1491 // This is not handled specifically now. User will return a final
1492 // status from the service handler, we will log that error instead.
1493 // This behavior is similar to an interceptor.
1494 }
1495 if channelz.IsOn() && err == nil {
1496 ss.t.IncrMsgSent()
1497 }
1498 }()
1499
1500 // load hdr, payload, data
1501 hdr, payload, data, err := prepareMsg(m, ss.codec, ss.cp, ss.comp)
1502 if err != nil {
1503 return err
1504 }
1505
1506 // TODO(dfawley): should we be checking len(data) instead?
1507 if len(payload) > ss.maxSendMessageSize {
1508 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize)
1509 }
1510 if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil {
1511 return toRPCErr(err)
1512 }
1513 if ss.binlog != nil {
1514 if !ss.serverHeaderBinlogged {
1515 h, _ := ss.s.Header()
1516 ss.binlog.Log(&binarylog.ServerHeader{
1517 Header: h,
1518 })
1519 ss.serverHeaderBinlogged = true
1520 }
1521 ss.binlog.Log(&binarylog.ServerMessage{
1522 Message: data,
1523 })
1524 }
1525 if ss.statsHandler != nil {
1526 ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
1527 }
1528 return nil
1529}
1530
1531func (ss *serverStream) RecvMsg(m interface{}) (err error) {
1532 defer func() {
1533 if ss.trInfo != nil {
1534 ss.mu.Lock()
1535 if ss.trInfo.tr != nil {
1536 if err == nil {
1537 ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
1538 } else if err != io.EOF {
1539 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1540 ss.trInfo.tr.SetError()
1541 }
1542 }
1543 ss.mu.Unlock()
1544 }
1545 if err != nil && err != io.EOF {
1546 st, _ := status.FromError(toRPCErr(err))
1547 ss.t.WriteStatus(ss.s, st)
1548 // Non-user specified status was sent out. This should be an error
1549 // case (as a server side Cancel maybe).
1550 //
1551 // This is not handled specifically now. User will return a final
1552 // status from the service handler, we will log that error instead.
1553 // This behavior is similar to an interceptor.
1554 }
1555 if channelz.IsOn() && err == nil {
1556 ss.t.IncrMsgRecv()
1557 }
1558 }()
1559 var payInfo *payloadInfo
1560 if ss.statsHandler != nil || ss.binlog != nil {
1561 payInfo = &payloadInfo{}
1562 }
1563 if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil {
1564 if err == io.EOF {
1565 if ss.binlog != nil {
1566 ss.binlog.Log(&binarylog.ClientHalfClose{})
1567 }
1568 return err
1569 }
1570 if err == io.ErrUnexpectedEOF {
1571 err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
1572 }
1573 return toRPCErr(err)
1574 }
1575 if ss.statsHandler != nil {
1576 ss.statsHandler.HandleRPC(ss.s.Context(), &stats.InPayload{
1577 RecvTime: time.Now(),
1578 Payload: m,
1579 // TODO truncate large payload.
1580 Data: payInfo.uncompressedBytes,
1581 WireLength: payInfo.wireLength + headerLen,
1582 Length: len(payInfo.uncompressedBytes),
1583 })
1584 }
1585 if ss.binlog != nil {
1586 ss.binlog.Log(&binarylog.ClientMessage{
1587 Message: payInfo.uncompressedBytes,
1588 })
1589 }
1590 return nil
1591}
1592
1593// MethodFromServerStream returns the method string for the input stream.
1594// The returned string is in the format of "/service/method".
1595func MethodFromServerStream(stream ServerStream) (string, bool) {
1596 return Method(stream.Context())
1597}
1598
1599// prepareMsg returns the hdr, payload and data
1600// using the compressors passed or using the
1601// passed preparedmsg
1602func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) {
1603 if preparedMsg, ok := m.(*PreparedMsg); ok {
1604 return preparedMsg.hdr, preparedMsg.payload, preparedMsg.encodedData, nil
1605 }
1606 // The input interface is not a prepared msg.
1607 // Marshal and Compress the data at this point
1608 data, err = encode(codec, m)
1609 if err != nil {
1610 return nil, nil, nil, err
1611 }
1612 compData, err := compress(data, cp, comp)
1613 if err != nil {
1614 return nil, nil, nil, err
1615 }
1616 hdr, payload = msgHeader(data, compData)
1617 return hdr, payload, data, nil
1618}