blob: 934ef68321ccefc466192bb91286a2d127268d1f [file] [log] [blame]
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -07001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "io"
25 "math"
26 "strconv"
27 "sync"
28 "time"
29
30 "golang.org/x/net/trace"
31 "google.golang.org/grpc/balancer"
32 "google.golang.org/grpc/codes"
33 "google.golang.org/grpc/encoding"
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -070034 "google.golang.org/grpc/internal/balancerload"
35 "google.golang.org/grpc/internal/binarylog"
36 "google.golang.org/grpc/internal/channelz"
37 "google.golang.org/grpc/internal/grpcrand"
38 "google.golang.org/grpc/internal/transport"
39 "google.golang.org/grpc/metadata"
40 "google.golang.org/grpc/peer"
41 "google.golang.org/grpc/stats"
42 "google.golang.org/grpc/status"
43)
44
45// StreamHandler defines the handler called by gRPC server to complete the
46// execution of a streaming RPC. If a StreamHandler returns an error, it
47// should be produced by the status package, or else gRPC will use
48// codes.Unknown as the status code and err.Error() as the status message
49// of the RPC.
50type StreamHandler func(srv interface{}, stream ServerStream) error
51
52// StreamDesc represents a streaming RPC service's method specification.
53type StreamDesc struct {
54 StreamName string
55 Handler StreamHandler
56
57 // At least one of these is true.
58 ServerStreams bool
59 ClientStreams bool
60}
61
62// Stream defines the common interface a client or server stream has to satisfy.
63//
64// Deprecated: See ClientStream and ServerStream documentation instead.
65type Stream interface {
66 // Deprecated: See ClientStream and ServerStream documentation instead.
67 Context() context.Context
68 // Deprecated: See ClientStream and ServerStream documentation instead.
69 SendMsg(m interface{}) error
70 // Deprecated: See ClientStream and ServerStream documentation instead.
71 RecvMsg(m interface{}) error
72}
73
74// ClientStream defines the client-side behavior of a streaming RPC.
75//
76// All errors returned from ClientStream methods are compatible with the
77// status package.
78type ClientStream interface {
79 // Header returns the header metadata received from the server if there
80 // is any. It blocks if the metadata is not ready to read.
81 Header() (metadata.MD, error)
82 // Trailer returns the trailer metadata from the server, if there is any.
83 // It must only be called after stream.CloseAndRecv has returned, or
84 // stream.Recv has returned a non-nil error (including io.EOF).
85 Trailer() metadata.MD
86 // CloseSend closes the send direction of the stream. It closes the stream
87 // when non-nil error is met. It is also not safe to call CloseSend
88 // concurrently with SendMsg.
89 CloseSend() error
90 // Context returns the context for this stream.
91 //
92 // It should not be called until after Header or RecvMsg has returned. Once
93 // called, subsequent client-side retries are disabled.
94 Context() context.Context
95 // SendMsg is generally called by generated code. On error, SendMsg aborts
96 // the stream. If the error was generated by the client, the status is
97 // returned directly; otherwise, io.EOF is returned and the status of
98 // the stream may be discovered using RecvMsg.
99 //
100 // SendMsg blocks until:
101 // - There is sufficient flow control to schedule m with the transport, or
102 // - The stream is done, or
103 // - The stream breaks.
104 //
105 // SendMsg does not wait until the message is received by the server. An
106 // untimely stream closure may result in lost messages. To ensure delivery,
107 // users should ensure the RPC completed successfully using RecvMsg.
108 //
109 // It is safe to have a goroutine calling SendMsg and another goroutine
110 // calling RecvMsg on the same stream at the same time, but it is not safe
111 // to call SendMsg on the same stream in different goroutines. It is also
112 // not safe to call CloseSend concurrently with SendMsg.
113 SendMsg(m interface{}) error
114 // RecvMsg blocks until it receives a message into m or the stream is
115 // done. It returns io.EOF when the stream completes successfully. On
116 // any other error, the stream is aborted and the error contains the RPC
117 // status.
118 //
119 // It is safe to have a goroutine calling SendMsg and another goroutine
120 // calling RecvMsg on the same stream at the same time, but it is not
121 // safe to call RecvMsg on the same stream in different goroutines.
122 RecvMsg(m interface{}) error
123}
124
125// NewStream creates a new Stream for the client side. This is typically
126// called by generated code. ctx is used for the lifetime of the stream.
127//
128// To ensure resources are not leaked due to the stream returned, one of the following
129// actions must be performed:
130//
131// 1. Call Close on the ClientConn.
132// 2. Cancel the context provided.
133// 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
134// client-streaming RPC, for instance, might use the helper function
135// CloseAndRecv (note that CloseSend does not Recv, therefore is not
136// guaranteed to release all resources).
137// 4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
138//
139// If none of the above happen, a goroutine and a context will be leaked, and grpc
140// will not call the optionally-configured stats handler with a stats.End message.
141func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
142 // allow interceptor to see all applicable call options, which means those
143 // configured as defaults from dial option as well as per-call options
144 opts = combine(cc.dopts.callOptions, opts)
145
146 if cc.dopts.streamInt != nil {
147 return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
148 }
149 return newClientStream(ctx, desc, cc, method, opts...)
150}
151
152// NewClientStream is a wrapper for ClientConn.NewStream.
153func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
154 return cc.NewStream(ctx, desc, method, opts...)
155}
156
157func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
158 if channelz.IsOn() {
159 cc.incrCallsStarted()
160 defer func() {
161 if err != nil {
162 cc.incrCallsFailed()
163 }
164 }()
165 }
166 c := defaultCallInfo()
167 // Provide an opportunity for the first RPC to see the first service config
168 // provided by the resolver.
169 if err := cc.waitForResolvedAddrs(ctx); err != nil {
170 return nil, err
171 }
172 mc := cc.GetMethodConfig(method)
173 if mc.WaitForReady != nil {
174 c.failFast = !*mc.WaitForReady
175 }
176
177 // Possible context leak:
178 // The cancel function for the child context we create will only be called
179 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
180 // an error is generated by SendMsg.
181 // https://github.com/grpc/grpc-go/issues/1818.
182 var cancel context.CancelFunc
183 if mc.Timeout != nil && *mc.Timeout >= 0 {
184 ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
185 } else {
186 ctx, cancel = context.WithCancel(ctx)
187 }
188 defer func() {
189 if err != nil {
190 cancel()
191 }
192 }()
193
194 for _, o := range opts {
195 if err := o.before(c); err != nil {
196 return nil, toRPCErr(err)
197 }
198 }
199 c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize)
200 c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
201 if err := setCallInfoCodec(c); err != nil {
202 return nil, err
203 }
204
205 callHdr := &transport.CallHdr{
206 Host: cc.authority,
207 Method: method,
208 ContentSubtype: c.contentSubtype,
209 }
210
211 // Set our outgoing compression according to the UseCompressor CallOption, if
212 // set. In that case, also find the compressor from the encoding package.
213 // Otherwise, use the compressor configured by the WithCompressor DialOption,
214 // if set.
215 var cp Compressor
216 var comp encoding.Compressor
217 if ct := c.compressorType; ct != "" {
218 callHdr.SendCompress = ct
219 if ct != encoding.Identity {
220 comp = encoding.GetCompressor(ct)
221 if comp == nil {
222 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
223 }
224 }
225 } else if cc.dopts.cp != nil {
226 callHdr.SendCompress = cc.dopts.cp.Type()
227 cp = cc.dopts.cp
228 }
229 if c.creds != nil {
230 callHdr.Creds = c.creds
231 }
232 var trInfo *traceInfo
233 if EnableTracing {
234 trInfo = &traceInfo{
235 tr: trace.New("grpc.Sent."+methodFamily(method), method),
236 firstLine: firstLine{
237 client: true,
238 },
239 }
240 if deadline, ok := ctx.Deadline(); ok {
241 trInfo.firstLine.deadline = time.Until(deadline)
242 }
243 trInfo.tr.LazyLog(&trInfo.firstLine, false)
244 ctx = trace.NewContext(ctx, trInfo.tr)
245 }
246 ctx = newContextWithRPCInfo(ctx, c.failFast, c.codec, cp, comp)
247 sh := cc.dopts.copts.StatsHandler
248 var beginTime time.Time
249 if sh != nil {
250 ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
251 beginTime = time.Now()
252 begin := &stats.Begin{
253 Client: true,
254 BeginTime: beginTime,
255 FailFast: c.failFast,
256 }
257 sh.HandleRPC(ctx, begin)
258 }
259
260 cs := &clientStream{
261 callHdr: callHdr,
262 ctx: ctx,
263 methodConfig: &mc,
264 opts: opts,
265 callInfo: c,
266 cc: cc,
267 desc: desc,
268 codec: c.codec,
269 cp: cp,
270 comp: comp,
271 cancel: cancel,
272 beginTime: beginTime,
273 firstAttempt: true,
274 }
275 if !cc.dopts.disableRetry {
276 cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
277 }
278 cs.binlog = binarylog.GetMethodLogger(method)
279
280 cs.callInfo.stream = cs
281 // Only this initial attempt has stats/tracing.
282 // TODO(dfawley): move to newAttempt when per-attempt stats are implemented.
283 if err := cs.newAttemptLocked(sh, trInfo); err != nil {
284 cs.finish(err)
285 return nil, err
286 }
287
288 op := func(a *csAttempt) error { return a.newStream() }
289 if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
290 cs.finish(err)
291 return nil, err
292 }
293
294 if cs.binlog != nil {
295 md, _ := metadata.FromOutgoingContext(ctx)
296 logEntry := &binarylog.ClientHeader{
297 OnClientSide: true,
298 Header: md,
299 MethodName: method,
300 Authority: cs.cc.authority,
301 }
302 if deadline, ok := ctx.Deadline(); ok {
303 logEntry.Timeout = time.Until(deadline)
304 if logEntry.Timeout < 0 {
305 logEntry.Timeout = 0
306 }
307 }
308 cs.binlog.Log(logEntry)
309 }
310
311 if desc != unaryStreamDesc {
312 // Listen on cc and stream contexts to cleanup when the user closes the
313 // ClientConn or cancels the stream context. In all other cases, an error
314 // should already be injected into the recv buffer by the transport, which
315 // the client will eventually receive, and then we will cancel the stream's
316 // context in clientStream.finish.
317 go func() {
318 select {
319 case <-cc.ctx.Done():
320 cs.finish(ErrClientConnClosing)
321 case <-ctx.Done():
322 cs.finish(toRPCErr(ctx.Err()))
323 }
324 }()
325 }
326 return cs, nil
327}
328
Arjun E K57a7fcb2020-01-30 06:44:45 +0000329// newAttemptLocked creates a new attempt with a transport.
330// If it succeeds, then it replaces clientStream's attempt with this new attempt.
331func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (retErr error) {
332 newAttempt := &csAttempt{
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700333 cs: cs,
334 dc: cs.cc.dopts.dc,
335 statsHandler: sh,
336 trInfo: trInfo,
337 }
Arjun E K57a7fcb2020-01-30 06:44:45 +0000338 defer func() {
339 if retErr != nil {
340 // This attempt is not set in the clientStream, so it's finish won't
341 // be called. Call it here for stats and trace in case they are not
342 // nil.
343 newAttempt.finish(retErr)
344 }
345 }()
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700346
347 if err := cs.ctx.Err(); err != nil {
348 return toRPCErr(err)
349 }
350 t, done, err := cs.cc.getTransport(cs.ctx, cs.callInfo.failFast, cs.callHdr.Method)
351 if err != nil {
352 return err
353 }
354 if trInfo != nil {
355 trInfo.firstLine.SetRemoteAddr(t.RemoteAddr())
356 }
Arjun E K57a7fcb2020-01-30 06:44:45 +0000357 newAttempt.t = t
358 newAttempt.done = done
359 cs.attempt = newAttempt
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700360 return nil
361}
362
363func (a *csAttempt) newStream() error {
364 cs := a.cs
365 cs.callHdr.PreviousAttempts = cs.numRetries
366 s, err := a.t.NewStream(cs.ctx, cs.callHdr)
367 if err != nil {
368 return toRPCErr(err)
369 }
370 cs.attempt.s = s
371 cs.attempt.p = &parser{r: s}
372 return nil
373}
374
375// clientStream implements a client side Stream.
376type clientStream struct {
377 callHdr *transport.CallHdr
378 opts []CallOption
379 callInfo *callInfo
380 cc *ClientConn
381 desc *StreamDesc
382
383 codec baseCodec
384 cp Compressor
385 comp encoding.Compressor
386
387 cancel context.CancelFunc // cancels all attempts
388
389 sentLast bool // sent an end stream
390 beginTime time.Time
391
392 methodConfig *MethodConfig
393
394 ctx context.Context // the application's context, wrapped by stats/tracing
395
396 retryThrottler *retryThrottler // The throttler active when the RPC began.
397
398 binlog *binarylog.MethodLogger // Binary logger, can be nil.
399 // serverHeaderBinlogged is a boolean for whether server header has been
400 // logged. Server header will be logged when the first time one of those
401 // happens: stream.Header(), stream.Recv().
402 //
403 // It's only read and used by Recv() and Header(), so it doesn't need to be
404 // synchronized.
405 serverHeaderBinlogged bool
406
407 mu sync.Mutex
Arjun E K57a7fcb2020-01-30 06:44:45 +0000408 firstAttempt bool // if true, transparent retry is valid
409 numRetries int // exclusive of transparent retry attempt(s)
410 numRetriesSincePushback int // retries since pushback; to reset backoff
411 finished bool // TODO: replace with atomic cmpxchg or sync.Once?
412 // attempt is the active client stream attempt.
413 // The only place where it is written is the newAttemptLocked method and this method never writes nil.
414 // So, attempt can be nil only inside newClientStream function when clientStream is first created.
415 // One of the first things done after clientStream's creation, is to call newAttemptLocked which either
416 // assigns a non nil value to the attempt or returns an error. If an error is returned from newAttemptLocked,
417 // then newClientStream calls finish on the clientStream and returns. So, finish method is the only
418 // place where we need to check if the attempt is nil.
419 attempt *csAttempt
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700420 // TODO(hedging): hedging will have multiple attempts simultaneously.
421 committed bool // active attempt committed for retry?
422 buffer []func(a *csAttempt) error // operations to replay on retry
423 bufferSize int // current size of buffer
424}
425
426// csAttempt implements a single transport stream attempt within a
427// clientStream.
428type csAttempt struct {
429 cs *clientStream
430 t transport.ClientTransport
431 s *transport.Stream
432 p *parser
433 done func(balancer.DoneInfo)
434
435 finished bool
436 dc Decompressor
437 decomp encoding.Compressor
438 decompSet bool
439
440 mu sync.Mutex // guards trInfo.tr
441 // trInfo may be nil (if EnableTracing is false).
442 // trInfo.tr is set when created (if EnableTracing is true),
443 // and cleared when the finish method is called.
444 trInfo *traceInfo
445
446 statsHandler stats.Handler
447}
448
449func (cs *clientStream) commitAttemptLocked() {
450 cs.committed = true
451 cs.buffer = nil
452}
453
454func (cs *clientStream) commitAttempt() {
455 cs.mu.Lock()
456 cs.commitAttemptLocked()
457 cs.mu.Unlock()
458}
459
460// shouldRetry returns nil if the RPC should be retried; otherwise it returns
461// the error that should be returned by the operation.
462func (cs *clientStream) shouldRetry(err error) error {
463 if cs.attempt.s == nil && !cs.callInfo.failFast {
464 // In the event of any error from NewStream (attempt.s == nil), we
465 // never attempted to write anything to the wire, so we can retry
466 // indefinitely for non-fail-fast RPCs.
467 return nil
468 }
469 if cs.finished || cs.committed {
470 // RPC is finished or committed; cannot retry.
471 return err
472 }
473 // Wait for the trailers.
474 if cs.attempt.s != nil {
475 <-cs.attempt.s.Done()
476 }
Arjun E K57a7fcb2020-01-30 06:44:45 +0000477 if cs.firstAttempt && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) {
478 // First attempt, stream unprocessed: transparently retry.
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700479 cs.firstAttempt = false
480 return nil
481 }
482 cs.firstAttempt = false
483 if cs.cc.dopts.disableRetry {
484 return err
485 }
486
487 pushback := 0
488 hasPushback := false
489 if cs.attempt.s != nil {
Arjun E K57a7fcb2020-01-30 06:44:45 +0000490 if !cs.attempt.s.TrailersOnly() {
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700491 return err
492 }
493
494 // TODO(retry): Move down if the spec changes to not check server pushback
495 // before considering this a failure for throttling.
496 sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"]
497 if len(sps) == 1 {
498 var e error
499 if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
amit.ghosh258d14c2020-10-02 15:13:38 +0200500 channelz.Infof(cs.cc.channelzID, "Server retry pushback specified to abort (%q).", sps[0])
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700501 cs.retryThrottler.throttle() // This counts as a failure for throttling.
502 return err
503 }
504 hasPushback = true
505 } else if len(sps) > 1 {
amit.ghosh258d14c2020-10-02 15:13:38 +0200506 channelz.Warningf(cs.cc.channelzID, "Server retry pushback specified multiple values (%q); not retrying.", sps)
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700507 cs.retryThrottler.throttle() // This counts as a failure for throttling.
508 return err
509 }
510 }
511
512 var code codes.Code
513 if cs.attempt.s != nil {
514 code = cs.attempt.s.Status().Code()
515 } else {
516 code = status.Convert(err).Code()
517 }
518
519 rp := cs.methodConfig.retryPolicy
520 if rp == nil || !rp.retryableStatusCodes[code] {
521 return err
522 }
523
524 // Note: the ordering here is important; we count this as a failure
525 // only if the code matched a retryable code.
526 if cs.retryThrottler.throttle() {
527 return err
528 }
529 if cs.numRetries+1 >= rp.maxAttempts {
530 return err
531 }
532
533 var dur time.Duration
534 if hasPushback {
535 dur = time.Millisecond * time.Duration(pushback)
536 cs.numRetriesSincePushback = 0
537 } else {
538 fact := math.Pow(rp.backoffMultiplier, float64(cs.numRetriesSincePushback))
539 cur := float64(rp.initialBackoff) * fact
540 if max := float64(rp.maxBackoff); cur > max {
541 cur = max
542 }
543 dur = time.Duration(grpcrand.Int63n(int64(cur)))
544 cs.numRetriesSincePushback++
545 }
546
547 // TODO(dfawley): we could eagerly fail here if dur puts us past the
548 // deadline, but unsure if it is worth doing.
549 t := time.NewTimer(dur)
550 select {
551 case <-t.C:
552 cs.numRetries++
553 return nil
554 case <-cs.ctx.Done():
555 t.Stop()
556 return status.FromContextError(cs.ctx.Err()).Err()
557 }
558}
559
560// Returns nil if a retry was performed and succeeded; error otherwise.
561func (cs *clientStream) retryLocked(lastErr error) error {
562 for {
563 cs.attempt.finish(lastErr)
564 if err := cs.shouldRetry(lastErr); err != nil {
565 cs.commitAttemptLocked()
566 return err
567 }
568 if err := cs.newAttemptLocked(nil, nil); err != nil {
569 return err
570 }
571 if lastErr = cs.replayBufferLocked(); lastErr == nil {
572 return nil
573 }
574 }
575}
576
577func (cs *clientStream) Context() context.Context {
578 cs.commitAttempt()
579 // No need to lock before using attempt, since we know it is committed and
580 // cannot change.
581 return cs.attempt.s.Context()
582}
583
584func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
585 cs.mu.Lock()
586 for {
587 if cs.committed {
588 cs.mu.Unlock()
589 return op(cs.attempt)
590 }
591 a := cs.attempt
592 cs.mu.Unlock()
593 err := op(a)
594 cs.mu.Lock()
595 if a != cs.attempt {
596 // We started another attempt already.
597 continue
598 }
599 if err == io.EOF {
600 <-a.s.Done()
601 }
602 if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) {
603 onSuccess()
604 cs.mu.Unlock()
605 return err
606 }
607 if err := cs.retryLocked(err); err != nil {
608 cs.mu.Unlock()
609 return err
610 }
611 }
612}
613
614func (cs *clientStream) Header() (metadata.MD, error) {
615 var m metadata.MD
616 err := cs.withRetry(func(a *csAttempt) error {
617 var err error
618 m, err = a.s.Header()
619 return toRPCErr(err)
620 }, cs.commitAttemptLocked)
621 if err != nil {
622 cs.finish(err)
623 return nil, err
624 }
625 if cs.binlog != nil && !cs.serverHeaderBinlogged {
626 // Only log if binary log is on and header has not been logged.
627 logEntry := &binarylog.ServerHeader{
628 OnClientSide: true,
629 Header: m,
630 PeerAddr: nil,
631 }
632 if peer, ok := peer.FromContext(cs.Context()); ok {
633 logEntry.PeerAddr = peer.Addr
634 }
635 cs.binlog.Log(logEntry)
636 cs.serverHeaderBinlogged = true
637 }
638 return m, err
639}
640
641func (cs *clientStream) Trailer() metadata.MD {
642 // On RPC failure, we never need to retry, because usage requires that
643 // RecvMsg() returned a non-nil error before calling this function is valid.
644 // We would have retried earlier if necessary.
645 //
646 // Commit the attempt anyway, just in case users are not following those
647 // directions -- it will prevent races and should not meaningfully impact
648 // performance.
649 cs.commitAttempt()
650 if cs.attempt.s == nil {
651 return nil
652 }
653 return cs.attempt.s.Trailer()
654}
655
656func (cs *clientStream) replayBufferLocked() error {
657 a := cs.attempt
658 for _, f := range cs.buffer {
659 if err := f(a); err != nil {
660 return err
661 }
662 }
663 return nil
664}
665
666func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error) {
667 // Note: we still will buffer if retry is disabled (for transparent retries).
668 if cs.committed {
669 return
670 }
671 cs.bufferSize += sz
672 if cs.bufferSize > cs.callInfo.maxRetryRPCBufferSize {
673 cs.commitAttemptLocked()
674 return
675 }
676 cs.buffer = append(cs.buffer, op)
677}
678
679func (cs *clientStream) SendMsg(m interface{}) (err error) {
680 defer func() {
681 if err != nil && err != io.EOF {
682 // Call finish on the client stream for errors generated by this SendMsg
683 // call, as these indicate problems created by this client. (Transport
684 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
685 // error will be returned from RecvMsg eventually in that case, or be
686 // retried.)
687 cs.finish(err)
688 }
689 }()
690 if cs.sentLast {
691 return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
692 }
693 if !cs.desc.ClientStreams {
694 cs.sentLast = true
695 }
696
697 // load hdr, payload, data
698 hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
699 if err != nil {
700 return err
701 }
702
703 // TODO(dfawley): should we be checking len(data) instead?
704 if len(payload) > *cs.callInfo.maxSendMessageSize {
705 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
706 }
707 msgBytes := data // Store the pointer before setting to nil. For binary logging.
708 op := func(a *csAttempt) error {
709 err := a.sendMsg(m, hdr, payload, data)
710 // nil out the message and uncomp when replaying; they are only needed for
711 // stats which is disabled for subsequent attempts.
712 m, data = nil, nil
713 return err
714 }
715 err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
716 if cs.binlog != nil && err == nil {
717 cs.binlog.Log(&binarylog.ClientMessage{
718 OnClientSide: true,
719 Message: msgBytes,
720 })
721 }
722 return
723}
724
725func (cs *clientStream) RecvMsg(m interface{}) error {
726 if cs.binlog != nil && !cs.serverHeaderBinlogged {
727 // Call Header() to binary log header if it's not already logged.
728 cs.Header()
729 }
730 var recvInfo *payloadInfo
731 if cs.binlog != nil {
732 recvInfo = &payloadInfo{}
733 }
734 err := cs.withRetry(func(a *csAttempt) error {
735 return a.recvMsg(m, recvInfo)
736 }, cs.commitAttemptLocked)
737 if cs.binlog != nil && err == nil {
738 cs.binlog.Log(&binarylog.ServerMessage{
739 OnClientSide: true,
740 Message: recvInfo.uncompressedBytes,
741 })
742 }
743 if err != nil || !cs.desc.ServerStreams {
744 // err != nil or non-server-streaming indicates end of stream.
745 cs.finish(err)
746
747 if cs.binlog != nil {
748 // finish will not log Trailer. Log Trailer here.
749 logEntry := &binarylog.ServerTrailer{
750 OnClientSide: true,
751 Trailer: cs.Trailer(),
752 Err: err,
753 }
754 if logEntry.Err == io.EOF {
755 logEntry.Err = nil
756 }
757 if peer, ok := peer.FromContext(cs.Context()); ok {
758 logEntry.PeerAddr = peer.Addr
759 }
760 cs.binlog.Log(logEntry)
761 }
762 }
763 return err
764}
765
766func (cs *clientStream) CloseSend() error {
767 if cs.sentLast {
768 // TODO: return an error and finish the stream instead, due to API misuse?
769 return nil
770 }
771 cs.sentLast = true
772 op := func(a *csAttempt) error {
773 a.t.Write(a.s, nil, nil, &transport.Options{Last: true})
774 // Always return nil; io.EOF is the only error that might make sense
775 // instead, but there is no need to signal the client to call RecvMsg
776 // as the only use left for the stream after CloseSend is to call
777 // RecvMsg. This also matches historical behavior.
778 return nil
779 }
780 cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) })
781 if cs.binlog != nil {
782 cs.binlog.Log(&binarylog.ClientHalfClose{
783 OnClientSide: true,
784 })
785 }
786 // We never returned an error here for reasons.
787 return nil
788}
789
790func (cs *clientStream) finish(err error) {
791 if err == io.EOF {
792 // Ending a stream with EOF indicates a success.
793 err = nil
794 }
795 cs.mu.Lock()
796 if cs.finished {
797 cs.mu.Unlock()
798 return
799 }
800 cs.finished = true
801 cs.commitAttemptLocked()
802 cs.mu.Unlock()
803 // For binary logging. only log cancel in finish (could be caused by RPC ctx
804 // canceled or ClientConn closed). Trailer will be logged in RecvMsg.
805 //
806 // Only one of cancel or trailer needs to be logged. In the cases where
807 // users don't call RecvMsg, users must have already canceled the RPC.
808 if cs.binlog != nil && status.Code(err) == codes.Canceled {
809 cs.binlog.Log(&binarylog.Cancel{
810 OnClientSide: true,
811 })
812 }
813 if err == nil {
814 cs.retryThrottler.successfulRPC()
815 }
816 if channelz.IsOn() {
817 if err != nil {
818 cs.cc.incrCallsFailed()
819 } else {
820 cs.cc.incrCallsSucceeded()
821 }
822 }
823 if cs.attempt != nil {
824 cs.attempt.finish(err)
Arjun E K57a7fcb2020-01-30 06:44:45 +0000825 // after functions all rely upon having a stream.
826 if cs.attempt.s != nil {
827 for _, o := range cs.opts {
828 o.after(cs.callInfo)
829 }
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700830 }
831 }
832 cs.cancel()
833}
834
835func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
836 cs := a.cs
837 if a.trInfo != nil {
838 a.mu.Lock()
839 if a.trInfo.tr != nil {
840 a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
841 }
842 a.mu.Unlock()
843 }
844 if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
845 if !cs.desc.ClientStreams {
846 // For non-client-streaming RPCs, we return nil instead of EOF on error
847 // because the generated code requires it. finish is not called; RecvMsg()
848 // will call it with the stream's status independently.
849 return nil
850 }
851 return io.EOF
852 }
853 if a.statsHandler != nil {
854 a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now()))
855 }
856 if channelz.IsOn() {
857 a.t.IncrMsgSent()
858 }
859 return nil
860}
861
862func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
863 cs := a.cs
864 if a.statsHandler != nil && payInfo == nil {
865 payInfo = &payloadInfo{}
866 }
867
868 if !a.decompSet {
869 // Block until we receive headers containing received message encoding.
870 if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity {
871 if a.dc == nil || a.dc.Type() != ct {
872 // No configured decompressor, or it does not match the incoming
873 // message encoding; attempt to find a registered compressor that does.
874 a.dc = nil
875 a.decomp = encoding.GetCompressor(ct)
876 }
877 } else {
878 // No compression is used; disable our decompressor.
879 a.dc = nil
880 }
881 // Only initialize this state once per stream.
882 a.decompSet = true
883 }
884 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
885 if err != nil {
886 if err == io.EOF {
887 if statusErr := a.s.Status().Err(); statusErr != nil {
888 return statusErr
889 }
890 return io.EOF // indicates successful end of stream.
891 }
892 return toRPCErr(err)
893 }
894 if a.trInfo != nil {
895 a.mu.Lock()
896 if a.trInfo.tr != nil {
897 a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
898 }
899 a.mu.Unlock()
900 }
901 if a.statsHandler != nil {
902 a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{
903 Client: true,
904 RecvTime: time.Now(),
905 Payload: m,
906 // TODO truncate large payload.
907 Data: payInfo.uncompressedBytes,
908 WireLength: payInfo.wireLength,
909 Length: len(payInfo.uncompressedBytes),
910 })
911 }
912 if channelz.IsOn() {
913 a.t.IncrMsgRecv()
914 }
915 if cs.desc.ServerStreams {
916 // Subsequent messages should be received by subsequent RecvMsg calls.
917 return nil
918 }
919 // Special handling for non-server-stream rpcs.
920 // This recv expects EOF or errors, so we don't collect inPayload.
921 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp)
922 if err == nil {
923 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
924 }
925 if err == io.EOF {
926 return a.s.Status().Err() // non-server streaming Recv returns nil on success
927 }
928 return toRPCErr(err)
929}
930
931func (a *csAttempt) finish(err error) {
932 a.mu.Lock()
933 if a.finished {
934 a.mu.Unlock()
935 return
936 }
937 a.finished = true
938 if err == io.EOF {
939 // Ending a stream with EOF indicates a success.
940 err = nil
941 }
942 var tr metadata.MD
943 if a.s != nil {
944 a.t.CloseStream(a.s, err)
945 tr = a.s.Trailer()
946 }
947
948 if a.done != nil {
949 br := false
950 if a.s != nil {
951 br = a.s.BytesReceived()
952 }
953 a.done(balancer.DoneInfo{
954 Err: err,
955 Trailer: tr,
956 BytesSent: a.s != nil,
957 BytesReceived: br,
958 ServerLoad: balancerload.Parse(tr),
959 })
960 }
961 if a.statsHandler != nil {
962 end := &stats.End{
963 Client: true,
964 BeginTime: a.cs.beginTime,
965 EndTime: time.Now(),
966 Trailer: tr,
967 Error: err,
968 }
969 a.statsHandler.HandleRPC(a.cs.ctx, end)
970 }
971 if a.trInfo != nil && a.trInfo.tr != nil {
972 if err == nil {
973 a.trInfo.tr.LazyPrintf("RPC: [OK]")
974 } else {
975 a.trInfo.tr.LazyPrintf("RPC: [%v]", err)
976 a.trInfo.tr.SetError()
977 }
978 a.trInfo.tr.Finish()
979 a.trInfo.tr = nil
980 }
981 a.mu.Unlock()
982}
983
984// newClientStream creates a ClientStream with the specified transport, on the
985// given addrConn.
986//
987// It's expected that the given transport is either the same one in addrConn, or
988// is already closed. To avoid race, transport is specified separately, instead
989// of using ac.transpot.
990//
991// Main difference between this and ClientConn.NewStream:
992// - no retry
993// - no service config (or wait for service config)
994// - no tracing or stats
995func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, ac *addrConn, opts ...CallOption) (_ ClientStream, err error) {
996 if t == nil {
997 // TODO: return RPC error here?
998 return nil, errors.New("transport provided is nil")
999 }
1000 // defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct.
1001 c := &callInfo{}
1002
1003 // Possible context leak:
1004 // The cancel function for the child context we create will only be called
1005 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
1006 // an error is generated by SendMsg.
1007 // https://github.com/grpc/grpc-go/issues/1818.
1008 ctx, cancel := context.WithCancel(ctx)
1009 defer func() {
1010 if err != nil {
1011 cancel()
1012 }
1013 }()
1014
1015 for _, o := range opts {
1016 if err := o.before(c); err != nil {
1017 return nil, toRPCErr(err)
1018 }
1019 }
1020 c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
1021 c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
1022 if err := setCallInfoCodec(c); err != nil {
1023 return nil, err
1024 }
1025
1026 callHdr := &transport.CallHdr{
1027 Host: ac.cc.authority,
1028 Method: method,
1029 ContentSubtype: c.contentSubtype,
1030 }
1031
1032 // Set our outgoing compression according to the UseCompressor CallOption, if
1033 // set. In that case, also find the compressor from the encoding package.
1034 // Otherwise, use the compressor configured by the WithCompressor DialOption,
1035 // if set.
1036 var cp Compressor
1037 var comp encoding.Compressor
1038 if ct := c.compressorType; ct != "" {
1039 callHdr.SendCompress = ct
1040 if ct != encoding.Identity {
1041 comp = encoding.GetCompressor(ct)
1042 if comp == nil {
1043 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
1044 }
1045 }
1046 } else if ac.cc.dopts.cp != nil {
1047 callHdr.SendCompress = ac.cc.dopts.cp.Type()
1048 cp = ac.cc.dopts.cp
1049 }
1050 if c.creds != nil {
1051 callHdr.Creds = c.creds
1052 }
1053
1054 // Use a special addrConnStream to avoid retry.
1055 as := &addrConnStream{
1056 callHdr: callHdr,
1057 ac: ac,
1058 ctx: ctx,
1059 cancel: cancel,
1060 opts: opts,
1061 callInfo: c,
1062 desc: desc,
1063 codec: c.codec,
1064 cp: cp,
1065 comp: comp,
1066 t: t,
1067 }
1068
1069 as.callInfo.stream = as
1070 s, err := as.t.NewStream(as.ctx, as.callHdr)
1071 if err != nil {
1072 err = toRPCErr(err)
1073 return nil, err
1074 }
1075 as.s = s
1076 as.p = &parser{r: s}
1077 ac.incrCallsStarted()
1078 if desc != unaryStreamDesc {
1079 // Listen on cc and stream contexts to cleanup when the user closes the
1080 // ClientConn or cancels the stream context. In all other cases, an error
1081 // should already be injected into the recv buffer by the transport, which
1082 // the client will eventually receive, and then we will cancel the stream's
1083 // context in clientStream.finish.
1084 go func() {
1085 select {
1086 case <-ac.ctx.Done():
1087 as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
1088 case <-ctx.Done():
1089 as.finish(toRPCErr(ctx.Err()))
1090 }
1091 }()
1092 }
1093 return as, nil
1094}
1095
1096type addrConnStream struct {
1097 s *transport.Stream
1098 ac *addrConn
1099 callHdr *transport.CallHdr
1100 cancel context.CancelFunc
1101 opts []CallOption
1102 callInfo *callInfo
1103 t transport.ClientTransport
1104 ctx context.Context
1105 sentLast bool
1106 desc *StreamDesc
1107 codec baseCodec
1108 cp Compressor
1109 comp encoding.Compressor
1110 decompSet bool
1111 dc Decompressor
1112 decomp encoding.Compressor
1113 p *parser
1114 mu sync.Mutex
1115 finished bool
1116}
1117
1118func (as *addrConnStream) Header() (metadata.MD, error) {
1119 m, err := as.s.Header()
1120 if err != nil {
1121 as.finish(toRPCErr(err))
1122 }
1123 return m, err
1124}
1125
1126func (as *addrConnStream) Trailer() metadata.MD {
1127 return as.s.Trailer()
1128}
1129
1130func (as *addrConnStream) CloseSend() error {
1131 if as.sentLast {
1132 // TODO: return an error and finish the stream instead, due to API misuse?
1133 return nil
1134 }
1135 as.sentLast = true
1136
1137 as.t.Write(as.s, nil, nil, &transport.Options{Last: true})
1138 // Always return nil; io.EOF is the only error that might make sense
1139 // instead, but there is no need to signal the client to call RecvMsg
1140 // as the only use left for the stream after CloseSend is to call
1141 // RecvMsg. This also matches historical behavior.
1142 return nil
1143}
1144
1145func (as *addrConnStream) Context() context.Context {
1146 return as.s.Context()
1147}
1148
1149func (as *addrConnStream) SendMsg(m interface{}) (err error) {
1150 defer func() {
1151 if err != nil && err != io.EOF {
1152 // Call finish on the client stream for errors generated by this SendMsg
1153 // call, as these indicate problems created by this client. (Transport
1154 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
1155 // error will be returned from RecvMsg eventually in that case, or be
1156 // retried.)
1157 as.finish(err)
1158 }
1159 }()
1160 if as.sentLast {
1161 return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
1162 }
1163 if !as.desc.ClientStreams {
1164 as.sentLast = true
1165 }
1166
1167 // load hdr, payload, data
1168 hdr, payld, _, err := prepareMsg(m, as.codec, as.cp, as.comp)
1169 if err != nil {
1170 return err
1171 }
1172
1173 // TODO(dfawley): should we be checking len(data) instead?
1174 if len(payld) > *as.callInfo.maxSendMessageSize {
1175 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize)
1176 }
1177
1178 if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams}); err != nil {
1179 if !as.desc.ClientStreams {
1180 // For non-client-streaming RPCs, we return nil instead of EOF on error
1181 // because the generated code requires it. finish is not called; RecvMsg()
1182 // will call it with the stream's status independently.
1183 return nil
1184 }
1185 return io.EOF
1186 }
1187
1188 if channelz.IsOn() {
1189 as.t.IncrMsgSent()
1190 }
1191 return nil
1192}
1193
1194func (as *addrConnStream) RecvMsg(m interface{}) (err error) {
1195 defer func() {
1196 if err != nil || !as.desc.ServerStreams {
1197 // err != nil or non-server-streaming indicates end of stream.
1198 as.finish(err)
1199 }
1200 }()
1201
1202 if !as.decompSet {
1203 // Block until we receive headers containing received message encoding.
1204 if ct := as.s.RecvCompress(); ct != "" && ct != encoding.Identity {
1205 if as.dc == nil || as.dc.Type() != ct {
1206 // No configured decompressor, or it does not match the incoming
1207 // message encoding; attempt to find a registered compressor that does.
1208 as.dc = nil
1209 as.decomp = encoding.GetCompressor(ct)
1210 }
1211 } else {
1212 // No compression is used; disable our decompressor.
1213 as.dc = nil
1214 }
1215 // Only initialize this state once per stream.
1216 as.decompSet = true
1217 }
1218 err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
1219 if err != nil {
1220 if err == io.EOF {
1221 if statusErr := as.s.Status().Err(); statusErr != nil {
1222 return statusErr
1223 }
1224 return io.EOF // indicates successful end of stream.
1225 }
1226 return toRPCErr(err)
1227 }
1228
1229 if channelz.IsOn() {
1230 as.t.IncrMsgRecv()
1231 }
1232 if as.desc.ServerStreams {
1233 // Subsequent messages should be received by subsequent RecvMsg calls.
1234 return nil
1235 }
1236
1237 // Special handling for non-server-stream rpcs.
1238 // This recv expects EOF or errors, so we don't collect inPayload.
1239 err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
1240 if err == nil {
1241 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
1242 }
1243 if err == io.EOF {
1244 return as.s.Status().Err() // non-server streaming Recv returns nil on success
1245 }
1246 return toRPCErr(err)
1247}
1248
1249func (as *addrConnStream) finish(err error) {
1250 as.mu.Lock()
1251 if as.finished {
1252 as.mu.Unlock()
1253 return
1254 }
1255 as.finished = true
1256 if err == io.EOF {
1257 // Ending a stream with EOF indicates a success.
1258 err = nil
1259 }
1260 if as.s != nil {
1261 as.t.CloseStream(as.s, err)
1262 }
1263
1264 if err != nil {
1265 as.ac.incrCallsFailed()
1266 } else {
1267 as.ac.incrCallsSucceeded()
1268 }
1269 as.cancel()
1270 as.mu.Unlock()
1271}
1272
1273// ServerStream defines the server-side behavior of a streaming RPC.
1274//
1275// All errors returned from ServerStream methods are compatible with the
1276// status package.
1277type ServerStream interface {
1278 // SetHeader sets the header metadata. It may be called multiple times.
1279 // When call multiple times, all the provided metadata will be merged.
1280 // All the metadata will be sent out when one of the following happens:
1281 // - ServerStream.SendHeader() is called;
1282 // - The first response is sent out;
1283 // - An RPC status is sent out (error or success).
1284 SetHeader(metadata.MD) error
1285 // SendHeader sends the header metadata.
1286 // The provided md and headers set by SetHeader() will be sent.
1287 // It fails if called multiple times.
1288 SendHeader(metadata.MD) error
1289 // SetTrailer sets the trailer metadata which will be sent with the RPC status.
1290 // When called more than once, all the provided metadata will be merged.
1291 SetTrailer(metadata.MD)
1292 // Context returns the context for this stream.
1293 Context() context.Context
1294 // SendMsg sends a message. On error, SendMsg aborts the stream and the
1295 // error is returned directly.
1296 //
1297 // SendMsg blocks until:
1298 // - There is sufficient flow control to schedule m with the transport, or
1299 // - The stream is done, or
1300 // - The stream breaks.
1301 //
1302 // SendMsg does not wait until the message is received by the client. An
1303 // untimely stream closure may result in lost messages.
1304 //
1305 // It is safe to have a goroutine calling SendMsg and another goroutine
1306 // calling RecvMsg on the same stream at the same time, but it is not safe
1307 // to call SendMsg on the same stream in different goroutines.
1308 SendMsg(m interface{}) error
1309 // RecvMsg blocks until it receives a message into m or the stream is
1310 // done. It returns io.EOF when the client has performed a CloseSend. On
1311 // any non-EOF error, the stream is aborted and the error contains the
1312 // RPC status.
1313 //
1314 // It is safe to have a goroutine calling SendMsg and another goroutine
1315 // calling RecvMsg on the same stream at the same time, but it is not
1316 // safe to call RecvMsg on the same stream in different goroutines.
1317 RecvMsg(m interface{}) error
1318}
1319
1320// serverStream implements a server side Stream.
1321type serverStream struct {
1322 ctx context.Context
1323 t transport.ServerTransport
1324 s *transport.Stream
1325 p *parser
1326 codec baseCodec
1327
1328 cp Compressor
1329 dc Decompressor
1330 comp encoding.Compressor
1331 decomp encoding.Compressor
1332
1333 maxReceiveMessageSize int
1334 maxSendMessageSize int
1335 trInfo *traceInfo
1336
1337 statsHandler stats.Handler
1338
1339 binlog *binarylog.MethodLogger
1340 // serverHeaderBinlogged indicates whether server header has been logged. It
1341 // will happen when one of the following two happens: stream.SendHeader(),
1342 // stream.Send().
1343 //
1344 // It's only checked in send and sendHeader, doesn't need to be
1345 // synchronized.
1346 serverHeaderBinlogged bool
1347
1348 mu sync.Mutex // protects trInfo.tr after the service handler runs.
1349}
1350
1351func (ss *serverStream) Context() context.Context {
1352 return ss.ctx
1353}
1354
1355func (ss *serverStream) SetHeader(md metadata.MD) error {
1356 if md.Len() == 0 {
1357 return nil
1358 }
1359 return ss.s.SetHeader(md)
1360}
1361
1362func (ss *serverStream) SendHeader(md metadata.MD) error {
1363 err := ss.t.WriteHeader(ss.s, md)
1364 if ss.binlog != nil && !ss.serverHeaderBinlogged {
1365 h, _ := ss.s.Header()
1366 ss.binlog.Log(&binarylog.ServerHeader{
1367 Header: h,
1368 })
1369 ss.serverHeaderBinlogged = true
1370 }
1371 return err
1372}
1373
1374func (ss *serverStream) SetTrailer(md metadata.MD) {
1375 if md.Len() == 0 {
1376 return
1377 }
1378 ss.s.SetTrailer(md)
1379}
1380
1381func (ss *serverStream) SendMsg(m interface{}) (err error) {
1382 defer func() {
1383 if ss.trInfo != nil {
1384 ss.mu.Lock()
1385 if ss.trInfo.tr != nil {
1386 if err == nil {
1387 ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
1388 } else {
1389 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1390 ss.trInfo.tr.SetError()
1391 }
1392 }
1393 ss.mu.Unlock()
1394 }
1395 if err != nil && err != io.EOF {
1396 st, _ := status.FromError(toRPCErr(err))
1397 ss.t.WriteStatus(ss.s, st)
1398 // Non-user specified status was sent out. This should be an error
1399 // case (as a server side Cancel maybe).
1400 //
1401 // This is not handled specifically now. User will return a final
1402 // status from the service handler, we will log that error instead.
1403 // This behavior is similar to an interceptor.
1404 }
1405 if channelz.IsOn() && err == nil {
1406 ss.t.IncrMsgSent()
1407 }
1408 }()
1409
1410 // load hdr, payload, data
1411 hdr, payload, data, err := prepareMsg(m, ss.codec, ss.cp, ss.comp)
1412 if err != nil {
1413 return err
1414 }
1415
1416 // TODO(dfawley): should we be checking len(data) instead?
1417 if len(payload) > ss.maxSendMessageSize {
1418 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize)
1419 }
1420 if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil {
1421 return toRPCErr(err)
1422 }
1423 if ss.binlog != nil {
1424 if !ss.serverHeaderBinlogged {
1425 h, _ := ss.s.Header()
1426 ss.binlog.Log(&binarylog.ServerHeader{
1427 Header: h,
1428 })
1429 ss.serverHeaderBinlogged = true
1430 }
1431 ss.binlog.Log(&binarylog.ServerMessage{
1432 Message: data,
1433 })
1434 }
1435 if ss.statsHandler != nil {
1436 ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
1437 }
1438 return nil
1439}
1440
1441func (ss *serverStream) RecvMsg(m interface{}) (err error) {
1442 defer func() {
1443 if ss.trInfo != nil {
1444 ss.mu.Lock()
1445 if ss.trInfo.tr != nil {
1446 if err == nil {
1447 ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
1448 } else if err != io.EOF {
1449 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1450 ss.trInfo.tr.SetError()
1451 }
1452 }
1453 ss.mu.Unlock()
1454 }
1455 if err != nil && err != io.EOF {
1456 st, _ := status.FromError(toRPCErr(err))
1457 ss.t.WriteStatus(ss.s, st)
1458 // Non-user specified status was sent out. This should be an error
1459 // case (as a server side Cancel maybe).
1460 //
1461 // This is not handled specifically now. User will return a final
1462 // status from the service handler, we will log that error instead.
1463 // This behavior is similar to an interceptor.
1464 }
1465 if channelz.IsOn() && err == nil {
1466 ss.t.IncrMsgRecv()
1467 }
1468 }()
1469 var payInfo *payloadInfo
1470 if ss.statsHandler != nil || ss.binlog != nil {
1471 payInfo = &payloadInfo{}
1472 }
1473 if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil {
1474 if err == io.EOF {
1475 if ss.binlog != nil {
1476 ss.binlog.Log(&binarylog.ClientHalfClose{})
1477 }
1478 return err
1479 }
1480 if err == io.ErrUnexpectedEOF {
1481 err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
1482 }
1483 return toRPCErr(err)
1484 }
1485 if ss.statsHandler != nil {
1486 ss.statsHandler.HandleRPC(ss.s.Context(), &stats.InPayload{
1487 RecvTime: time.Now(),
1488 Payload: m,
1489 // TODO truncate large payload.
1490 Data: payInfo.uncompressedBytes,
1491 WireLength: payInfo.wireLength,
1492 Length: len(payInfo.uncompressedBytes),
1493 })
1494 }
1495 if ss.binlog != nil {
1496 ss.binlog.Log(&binarylog.ClientMessage{
1497 Message: payInfo.uncompressedBytes,
1498 })
1499 }
1500 return nil
1501}
1502
1503// MethodFromServerStream returns the method string for the input stream.
1504// The returned string is in the format of "/service/method".
1505func MethodFromServerStream(stream ServerStream) (string, bool) {
1506 return Method(stream.Context())
1507}
1508
1509// prepareMsg returns the hdr, payload and data
1510// using the compressors passed or using the
1511// passed preparedmsg
1512func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) {
1513 if preparedMsg, ok := m.(*PreparedMsg); ok {
1514 return preparedMsg.hdr, preparedMsg.payload, preparedMsg.encodedData, nil
1515 }
1516 // The input interface is not a prepared msg.
1517 // Marshal and Compress the data at this point
1518 data, err = encode(codec, m)
1519 if err != nil {
1520 return nil, nil, nil, err
1521 }
1522 compData, err := compress(data, cp, comp)
1523 if err != nil {
1524 return nil, nil, nil, err
1525 }
1526 hdr, payload = msgHeader(data, compData)
1527 return hdr, payload, data, nil
1528}