blob: db14c3225d1788c25c4cc1765c49091081ef5472 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "io"
25 "math"
26 "strconv"
27 "sync"
28 "time"
29
30 "golang.org/x/net/trace"
31 "google.golang.org/grpc/balancer"
32 "google.golang.org/grpc/codes"
William Kurkianea869482019-04-09 15:16:11 -040033 "google.golang.org/grpc/encoding"
34 "google.golang.org/grpc/grpclog"
Abhilash S.L3b494632019-07-16 15:51:09 +053035 "google.golang.org/grpc/internal/balancerload"
William Kurkianea869482019-04-09 15:16:11 -040036 "google.golang.org/grpc/internal/binarylog"
37 "google.golang.org/grpc/internal/channelz"
38 "google.golang.org/grpc/internal/grpcrand"
39 "google.golang.org/grpc/internal/transport"
40 "google.golang.org/grpc/metadata"
41 "google.golang.org/grpc/peer"
42 "google.golang.org/grpc/stats"
43 "google.golang.org/grpc/status"
44)
45
46// StreamHandler defines the handler called by gRPC server to complete the
47// execution of a streaming RPC. If a StreamHandler returns an error, it
48// should be produced by the status package, or else gRPC will use
49// codes.Unknown as the status code and err.Error() as the status message
50// of the RPC.
51type StreamHandler func(srv interface{}, stream ServerStream) error
52
53// StreamDesc represents a streaming RPC service's method specification.
54type StreamDesc struct {
55 StreamName string
56 Handler StreamHandler
57
58 // At least one of these is true.
59 ServerStreams bool
60 ClientStreams bool
61}
62
63// Stream defines the common interface a client or server stream has to satisfy.
64//
65// Deprecated: See ClientStream and ServerStream documentation instead.
66type Stream interface {
67 // Deprecated: See ClientStream and ServerStream documentation instead.
68 Context() context.Context
69 // Deprecated: See ClientStream and ServerStream documentation instead.
70 SendMsg(m interface{}) error
71 // Deprecated: See ClientStream and ServerStream documentation instead.
72 RecvMsg(m interface{}) error
73}
74
75// ClientStream defines the client-side behavior of a streaming RPC.
76//
77// All errors returned from ClientStream methods are compatible with the
78// status package.
79type ClientStream interface {
80 // Header returns the header metadata received from the server if there
81 // is any. It blocks if the metadata is not ready to read.
82 Header() (metadata.MD, error)
83 // Trailer returns the trailer metadata from the server, if there is any.
84 // It must only be called after stream.CloseAndRecv has returned, or
85 // stream.Recv has returned a non-nil error (including io.EOF).
86 Trailer() metadata.MD
87 // CloseSend closes the send direction of the stream. It closes the stream
88 // when non-nil error is met. It is also not safe to call CloseSend
89 // concurrently with SendMsg.
90 CloseSend() error
91 // Context returns the context for this stream.
92 //
93 // It should not be called until after Header or RecvMsg has returned. Once
94 // called, subsequent client-side retries are disabled.
95 Context() context.Context
96 // SendMsg is generally called by generated code. On error, SendMsg aborts
97 // the stream. If the error was generated by the client, the status is
98 // returned directly; otherwise, io.EOF is returned and the status of
99 // the stream may be discovered using RecvMsg.
100 //
101 // SendMsg blocks until:
102 // - There is sufficient flow control to schedule m with the transport, or
103 // - The stream is done, or
104 // - The stream breaks.
105 //
106 // SendMsg does not wait until the message is received by the server. An
107 // untimely stream closure may result in lost messages. To ensure delivery,
108 // users should ensure the RPC completed successfully using RecvMsg.
109 //
110 // It is safe to have a goroutine calling SendMsg and another goroutine
111 // calling RecvMsg on the same stream at the same time, but it is not safe
112 // to call SendMsg on the same stream in different goroutines. It is also
113 // not safe to call CloseSend concurrently with SendMsg.
114 SendMsg(m interface{}) error
115 // RecvMsg blocks until it receives a message into m or the stream is
116 // done. It returns io.EOF when the stream completes successfully. On
117 // any other error, the stream is aborted and the error contains the RPC
118 // status.
119 //
120 // It is safe to have a goroutine calling SendMsg and another goroutine
121 // calling RecvMsg on the same stream at the same time, but it is not
122 // safe to call RecvMsg on the same stream in different goroutines.
123 RecvMsg(m interface{}) error
124}
125
126// NewStream creates a new Stream for the client side. This is typically
127// called by generated code. ctx is used for the lifetime of the stream.
128//
129// To ensure resources are not leaked due to the stream returned, one of the following
130// actions must be performed:
131//
132// 1. Call Close on the ClientConn.
133// 2. Cancel the context provided.
134// 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
135// client-streaming RPC, for instance, might use the helper function
136// CloseAndRecv (note that CloseSend does not Recv, therefore is not
137// guaranteed to release all resources).
138// 4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
139//
140// If none of the above happen, a goroutine and a context will be leaked, and grpc
141// will not call the optionally-configured stats handler with a stats.End message.
142func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
143 // allow interceptor to see all applicable call options, which means those
144 // configured as defaults from dial option as well as per-call options
145 opts = combine(cc.dopts.callOptions, opts)
146
147 if cc.dopts.streamInt != nil {
148 return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
149 }
150 return newClientStream(ctx, desc, cc, method, opts...)
151}
152
153// NewClientStream is a wrapper for ClientConn.NewStream.
154func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
155 return cc.NewStream(ctx, desc, method, opts...)
156}
157
158func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
159 if channelz.IsOn() {
160 cc.incrCallsStarted()
161 defer func() {
162 if err != nil {
163 cc.incrCallsFailed()
164 }
165 }()
166 }
167 c := defaultCallInfo()
168 // Provide an opportunity for the first RPC to see the first service config
169 // provided by the resolver.
170 if err := cc.waitForResolvedAddrs(ctx); err != nil {
171 return nil, err
172 }
173 mc := cc.GetMethodConfig(method)
174 if mc.WaitForReady != nil {
175 c.failFast = !*mc.WaitForReady
176 }
177
178 // Possible context leak:
179 // The cancel function for the child context we create will only be called
180 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
181 // an error is generated by SendMsg.
182 // https://github.com/grpc/grpc-go/issues/1818.
183 var cancel context.CancelFunc
184 if mc.Timeout != nil && *mc.Timeout >= 0 {
185 ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
186 } else {
187 ctx, cancel = context.WithCancel(ctx)
188 }
189 defer func() {
190 if err != nil {
191 cancel()
192 }
193 }()
194
195 for _, o := range opts {
196 if err := o.before(c); err != nil {
197 return nil, toRPCErr(err)
198 }
199 }
200 c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize)
201 c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
202 if err := setCallInfoCodec(c); err != nil {
203 return nil, err
204 }
205
206 callHdr := &transport.CallHdr{
207 Host: cc.authority,
208 Method: method,
209 ContentSubtype: c.contentSubtype,
210 }
211
212 // Set our outgoing compression according to the UseCompressor CallOption, if
213 // set. In that case, also find the compressor from the encoding package.
214 // Otherwise, use the compressor configured by the WithCompressor DialOption,
215 // if set.
216 var cp Compressor
217 var comp encoding.Compressor
218 if ct := c.compressorType; ct != "" {
219 callHdr.SendCompress = ct
220 if ct != encoding.Identity {
221 comp = encoding.GetCompressor(ct)
222 if comp == nil {
223 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
224 }
225 }
226 } else if cc.dopts.cp != nil {
227 callHdr.SendCompress = cc.dopts.cp.Type()
228 cp = cc.dopts.cp
229 }
230 if c.creds != nil {
231 callHdr.Creds = c.creds
232 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530233 var trInfo *traceInfo
William Kurkianea869482019-04-09 15:16:11 -0400234 if EnableTracing {
Abhilash S.L3b494632019-07-16 15:51:09 +0530235 trInfo = &traceInfo{
236 tr: trace.New("grpc.Sent."+methodFamily(method), method),
237 firstLine: firstLine{
238 client: true,
239 },
240 }
William Kurkianea869482019-04-09 15:16:11 -0400241 if deadline, ok := ctx.Deadline(); ok {
242 trInfo.firstLine.deadline = time.Until(deadline)
243 }
244 trInfo.tr.LazyLog(&trInfo.firstLine, false)
245 ctx = trace.NewContext(ctx, trInfo.tr)
246 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530247 ctx = newContextWithRPCInfo(ctx, c.failFast, c.codec, cp, comp)
William Kurkianea869482019-04-09 15:16:11 -0400248 sh := cc.dopts.copts.StatsHandler
249 var beginTime time.Time
250 if sh != nil {
251 ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
252 beginTime = time.Now()
253 begin := &stats.Begin{
254 Client: true,
255 BeginTime: beginTime,
256 FailFast: c.failFast,
257 }
258 sh.HandleRPC(ctx, begin)
259 }
260
261 cs := &clientStream{
262 callHdr: callHdr,
263 ctx: ctx,
264 methodConfig: &mc,
265 opts: opts,
266 callInfo: c,
267 cc: cc,
268 desc: desc,
269 codec: c.codec,
270 cp: cp,
271 comp: comp,
272 cancel: cancel,
273 beginTime: beginTime,
274 firstAttempt: true,
275 }
276 if !cc.dopts.disableRetry {
277 cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
278 }
279 cs.binlog = binarylog.GetMethodLogger(method)
280
281 cs.callInfo.stream = cs
282 // Only this initial attempt has stats/tracing.
283 // TODO(dfawley): move to newAttempt when per-attempt stats are implemented.
284 if err := cs.newAttemptLocked(sh, trInfo); err != nil {
285 cs.finish(err)
286 return nil, err
287 }
288
289 op := func(a *csAttempt) error { return a.newStream() }
290 if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
291 cs.finish(err)
292 return nil, err
293 }
294
295 if cs.binlog != nil {
296 md, _ := metadata.FromOutgoingContext(ctx)
297 logEntry := &binarylog.ClientHeader{
298 OnClientSide: true,
299 Header: md,
300 MethodName: method,
301 Authority: cs.cc.authority,
302 }
303 if deadline, ok := ctx.Deadline(); ok {
304 logEntry.Timeout = time.Until(deadline)
305 if logEntry.Timeout < 0 {
306 logEntry.Timeout = 0
307 }
308 }
309 cs.binlog.Log(logEntry)
310 }
311
312 if desc != unaryStreamDesc {
313 // Listen on cc and stream contexts to cleanup when the user closes the
314 // ClientConn or cancels the stream context. In all other cases, an error
315 // should already be injected into the recv buffer by the transport, which
316 // the client will eventually receive, and then we will cancel the stream's
317 // context in clientStream.finish.
318 go func() {
319 select {
320 case <-cc.ctx.Done():
321 cs.finish(ErrClientConnClosing)
322 case <-ctx.Done():
323 cs.finish(toRPCErr(ctx.Err()))
324 }
325 }()
326 }
327 return cs, nil
328}
329
Abhilash S.L3b494632019-07-16 15:51:09 +0530330func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) error {
William Kurkianea869482019-04-09 15:16:11 -0400331 cs.attempt = &csAttempt{
332 cs: cs,
333 dc: cs.cc.dopts.dc,
334 statsHandler: sh,
335 trInfo: trInfo,
336 }
337
338 if err := cs.ctx.Err(); err != nil {
339 return toRPCErr(err)
340 }
341 t, done, err := cs.cc.getTransport(cs.ctx, cs.callInfo.failFast, cs.callHdr.Method)
342 if err != nil {
343 return err
344 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530345 if trInfo != nil {
346 trInfo.firstLine.SetRemoteAddr(t.RemoteAddr())
347 }
William Kurkianea869482019-04-09 15:16:11 -0400348 cs.attempt.t = t
349 cs.attempt.done = done
350 return nil
351}
352
353func (a *csAttempt) newStream() error {
354 cs := a.cs
355 cs.callHdr.PreviousAttempts = cs.numRetries
356 s, err := a.t.NewStream(cs.ctx, cs.callHdr)
357 if err != nil {
358 return toRPCErr(err)
359 }
360 cs.attempt.s = s
361 cs.attempt.p = &parser{r: s}
362 return nil
363}
364
365// clientStream implements a client side Stream.
366type clientStream struct {
367 callHdr *transport.CallHdr
368 opts []CallOption
369 callInfo *callInfo
370 cc *ClientConn
371 desc *StreamDesc
372
373 codec baseCodec
374 cp Compressor
375 comp encoding.Compressor
376
377 cancel context.CancelFunc // cancels all attempts
378
379 sentLast bool // sent an end stream
380 beginTime time.Time
381
382 methodConfig *MethodConfig
383
384 ctx context.Context // the application's context, wrapped by stats/tracing
385
386 retryThrottler *retryThrottler // The throttler active when the RPC began.
387
388 binlog *binarylog.MethodLogger // Binary logger, can be nil.
389 // serverHeaderBinlogged is a boolean for whether server header has been
390 // logged. Server header will be logged when the first time one of those
391 // happens: stream.Header(), stream.Recv().
392 //
393 // It's only read and used by Recv() and Header(), so it doesn't need to be
394 // synchronized.
395 serverHeaderBinlogged bool
396
397 mu sync.Mutex
398 firstAttempt bool // if true, transparent retry is valid
399 numRetries int // exclusive of transparent retry attempt(s)
400 numRetriesSincePushback int // retries since pushback; to reset backoff
401 finished bool // TODO: replace with atomic cmpxchg or sync.Once?
402 attempt *csAttempt // the active client stream attempt
403 // TODO(hedging): hedging will have multiple attempts simultaneously.
404 committed bool // active attempt committed for retry?
405 buffer []func(a *csAttempt) error // operations to replay on retry
406 bufferSize int // current size of buffer
407}
408
409// csAttempt implements a single transport stream attempt within a
410// clientStream.
411type csAttempt struct {
412 cs *clientStream
413 t transport.ClientTransport
414 s *transport.Stream
415 p *parser
416 done func(balancer.DoneInfo)
417
418 finished bool
419 dc Decompressor
420 decomp encoding.Compressor
421 decompSet bool
422
423 mu sync.Mutex // guards trInfo.tr
Abhilash S.L3b494632019-07-16 15:51:09 +0530424 // trInfo may be nil (if EnableTracing is false).
William Kurkianea869482019-04-09 15:16:11 -0400425 // trInfo.tr is set when created (if EnableTracing is true),
426 // and cleared when the finish method is called.
Abhilash S.L3b494632019-07-16 15:51:09 +0530427 trInfo *traceInfo
William Kurkianea869482019-04-09 15:16:11 -0400428
429 statsHandler stats.Handler
430}
431
432func (cs *clientStream) commitAttemptLocked() {
433 cs.committed = true
434 cs.buffer = nil
435}
436
437func (cs *clientStream) commitAttempt() {
438 cs.mu.Lock()
439 cs.commitAttemptLocked()
440 cs.mu.Unlock()
441}
442
443// shouldRetry returns nil if the RPC should be retried; otherwise it returns
444// the error that should be returned by the operation.
445func (cs *clientStream) shouldRetry(err error) error {
446 if cs.attempt.s == nil && !cs.callInfo.failFast {
447 // In the event of any error from NewStream (attempt.s == nil), we
448 // never attempted to write anything to the wire, so we can retry
449 // indefinitely for non-fail-fast RPCs.
450 return nil
451 }
452 if cs.finished || cs.committed {
453 // RPC is finished or committed; cannot retry.
454 return err
455 }
456 // Wait for the trailers.
457 if cs.attempt.s != nil {
458 <-cs.attempt.s.Done()
459 }
460 if cs.firstAttempt && !cs.callInfo.failFast && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) {
461 // First attempt, wait-for-ready, stream unprocessed: transparently retry.
462 cs.firstAttempt = false
463 return nil
464 }
465 cs.firstAttempt = false
466 if cs.cc.dopts.disableRetry {
467 return err
468 }
469
470 pushback := 0
471 hasPushback := false
472 if cs.attempt.s != nil {
473 if to, toErr := cs.attempt.s.TrailersOnly(); toErr != nil || !to {
474 return err
475 }
476
477 // TODO(retry): Move down if the spec changes to not check server pushback
478 // before considering this a failure for throttling.
479 sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"]
480 if len(sps) == 1 {
481 var e error
482 if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
483 grpclog.Infof("Server retry pushback specified to abort (%q).", sps[0])
484 cs.retryThrottler.throttle() // This counts as a failure for throttling.
485 return err
486 }
487 hasPushback = true
488 } else if len(sps) > 1 {
489 grpclog.Warningf("Server retry pushback specified multiple values (%q); not retrying.", sps)
490 cs.retryThrottler.throttle() // This counts as a failure for throttling.
491 return err
492 }
493 }
494
495 var code codes.Code
496 if cs.attempt.s != nil {
497 code = cs.attempt.s.Status().Code()
498 } else {
499 code = status.Convert(err).Code()
500 }
501
502 rp := cs.methodConfig.retryPolicy
503 if rp == nil || !rp.retryableStatusCodes[code] {
504 return err
505 }
506
507 // Note: the ordering here is important; we count this as a failure
508 // only if the code matched a retryable code.
509 if cs.retryThrottler.throttle() {
510 return err
511 }
512 if cs.numRetries+1 >= rp.maxAttempts {
513 return err
514 }
515
516 var dur time.Duration
517 if hasPushback {
518 dur = time.Millisecond * time.Duration(pushback)
519 cs.numRetriesSincePushback = 0
520 } else {
521 fact := math.Pow(rp.backoffMultiplier, float64(cs.numRetriesSincePushback))
522 cur := float64(rp.initialBackoff) * fact
523 if max := float64(rp.maxBackoff); cur > max {
524 cur = max
525 }
526 dur = time.Duration(grpcrand.Int63n(int64(cur)))
527 cs.numRetriesSincePushback++
528 }
529
530 // TODO(dfawley): we could eagerly fail here if dur puts us past the
531 // deadline, but unsure if it is worth doing.
532 t := time.NewTimer(dur)
533 select {
534 case <-t.C:
535 cs.numRetries++
536 return nil
537 case <-cs.ctx.Done():
538 t.Stop()
539 return status.FromContextError(cs.ctx.Err()).Err()
540 }
541}
542
543// Returns nil if a retry was performed and succeeded; error otherwise.
544func (cs *clientStream) retryLocked(lastErr error) error {
545 for {
546 cs.attempt.finish(lastErr)
547 if err := cs.shouldRetry(lastErr); err != nil {
548 cs.commitAttemptLocked()
549 return err
550 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530551 if err := cs.newAttemptLocked(nil, nil); err != nil {
William Kurkianea869482019-04-09 15:16:11 -0400552 return err
553 }
554 if lastErr = cs.replayBufferLocked(); lastErr == nil {
555 return nil
556 }
557 }
558}
559
560func (cs *clientStream) Context() context.Context {
561 cs.commitAttempt()
562 // No need to lock before using attempt, since we know it is committed and
563 // cannot change.
564 return cs.attempt.s.Context()
565}
566
567func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
568 cs.mu.Lock()
569 for {
570 if cs.committed {
571 cs.mu.Unlock()
572 return op(cs.attempt)
573 }
574 a := cs.attempt
575 cs.mu.Unlock()
576 err := op(a)
577 cs.mu.Lock()
578 if a != cs.attempt {
579 // We started another attempt already.
580 continue
581 }
582 if err == io.EOF {
583 <-a.s.Done()
584 }
585 if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) {
586 onSuccess()
587 cs.mu.Unlock()
588 return err
589 }
590 if err := cs.retryLocked(err); err != nil {
591 cs.mu.Unlock()
592 return err
593 }
594 }
595}
596
597func (cs *clientStream) Header() (metadata.MD, error) {
598 var m metadata.MD
599 err := cs.withRetry(func(a *csAttempt) error {
600 var err error
601 m, err = a.s.Header()
602 return toRPCErr(err)
603 }, cs.commitAttemptLocked)
604 if err != nil {
605 cs.finish(err)
606 return nil, err
607 }
608 if cs.binlog != nil && !cs.serverHeaderBinlogged {
609 // Only log if binary log is on and header has not been logged.
610 logEntry := &binarylog.ServerHeader{
611 OnClientSide: true,
612 Header: m,
613 PeerAddr: nil,
614 }
615 if peer, ok := peer.FromContext(cs.Context()); ok {
616 logEntry.PeerAddr = peer.Addr
617 }
618 cs.binlog.Log(logEntry)
619 cs.serverHeaderBinlogged = true
620 }
621 return m, err
622}
623
624func (cs *clientStream) Trailer() metadata.MD {
625 // On RPC failure, we never need to retry, because usage requires that
626 // RecvMsg() returned a non-nil error before calling this function is valid.
627 // We would have retried earlier if necessary.
628 //
629 // Commit the attempt anyway, just in case users are not following those
630 // directions -- it will prevent races and should not meaningfully impact
631 // performance.
632 cs.commitAttempt()
633 if cs.attempt.s == nil {
634 return nil
635 }
636 return cs.attempt.s.Trailer()
637}
638
639func (cs *clientStream) replayBufferLocked() error {
640 a := cs.attempt
641 for _, f := range cs.buffer {
642 if err := f(a); err != nil {
643 return err
644 }
645 }
646 return nil
647}
648
649func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error) {
650 // Note: we still will buffer if retry is disabled (for transparent retries).
651 if cs.committed {
652 return
653 }
654 cs.bufferSize += sz
655 if cs.bufferSize > cs.callInfo.maxRetryRPCBufferSize {
656 cs.commitAttemptLocked()
657 return
658 }
659 cs.buffer = append(cs.buffer, op)
660}
661
662func (cs *clientStream) SendMsg(m interface{}) (err error) {
663 defer func() {
664 if err != nil && err != io.EOF {
665 // Call finish on the client stream for errors generated by this SendMsg
666 // call, as these indicate problems created by this client. (Transport
667 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
668 // error will be returned from RecvMsg eventually in that case, or be
669 // retried.)
670 cs.finish(err)
671 }
672 }()
673 if cs.sentLast {
674 return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
675 }
676 if !cs.desc.ClientStreams {
677 cs.sentLast = true
678 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530679
680 // load hdr, payload, data
681 hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
William Kurkianea869482019-04-09 15:16:11 -0400682 if err != nil {
683 return err
684 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530685
William Kurkianea869482019-04-09 15:16:11 -0400686 // TODO(dfawley): should we be checking len(data) instead?
687 if len(payload) > *cs.callInfo.maxSendMessageSize {
688 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
689 }
690 msgBytes := data // Store the pointer before setting to nil. For binary logging.
691 op := func(a *csAttempt) error {
692 err := a.sendMsg(m, hdr, payload, data)
693 // nil out the message and uncomp when replaying; they are only needed for
694 // stats which is disabled for subsequent attempts.
695 m, data = nil, nil
696 return err
697 }
698 err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
699 if cs.binlog != nil && err == nil {
700 cs.binlog.Log(&binarylog.ClientMessage{
701 OnClientSide: true,
702 Message: msgBytes,
703 })
704 }
705 return
706}
707
708func (cs *clientStream) RecvMsg(m interface{}) error {
709 if cs.binlog != nil && !cs.serverHeaderBinlogged {
710 // Call Header() to binary log header if it's not already logged.
711 cs.Header()
712 }
713 var recvInfo *payloadInfo
714 if cs.binlog != nil {
715 recvInfo = &payloadInfo{}
716 }
717 err := cs.withRetry(func(a *csAttempt) error {
718 return a.recvMsg(m, recvInfo)
719 }, cs.commitAttemptLocked)
720 if cs.binlog != nil && err == nil {
721 cs.binlog.Log(&binarylog.ServerMessage{
722 OnClientSide: true,
723 Message: recvInfo.uncompressedBytes,
724 })
725 }
726 if err != nil || !cs.desc.ServerStreams {
727 // err != nil or non-server-streaming indicates end of stream.
728 cs.finish(err)
729
730 if cs.binlog != nil {
731 // finish will not log Trailer. Log Trailer here.
732 logEntry := &binarylog.ServerTrailer{
733 OnClientSide: true,
734 Trailer: cs.Trailer(),
735 Err: err,
736 }
737 if logEntry.Err == io.EOF {
738 logEntry.Err = nil
739 }
740 if peer, ok := peer.FromContext(cs.Context()); ok {
741 logEntry.PeerAddr = peer.Addr
742 }
743 cs.binlog.Log(logEntry)
744 }
745 }
746 return err
747}
748
749func (cs *clientStream) CloseSend() error {
750 if cs.sentLast {
751 // TODO: return an error and finish the stream instead, due to API misuse?
752 return nil
753 }
754 cs.sentLast = true
755 op := func(a *csAttempt) error {
756 a.t.Write(a.s, nil, nil, &transport.Options{Last: true})
757 // Always return nil; io.EOF is the only error that might make sense
758 // instead, but there is no need to signal the client to call RecvMsg
759 // as the only use left for the stream after CloseSend is to call
760 // RecvMsg. This also matches historical behavior.
761 return nil
762 }
763 cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) })
764 if cs.binlog != nil {
765 cs.binlog.Log(&binarylog.ClientHalfClose{
766 OnClientSide: true,
767 })
768 }
769 // We never returned an error here for reasons.
770 return nil
771}
772
773func (cs *clientStream) finish(err error) {
774 if err == io.EOF {
775 // Ending a stream with EOF indicates a success.
776 err = nil
777 }
778 cs.mu.Lock()
779 if cs.finished {
780 cs.mu.Unlock()
781 return
782 }
783 cs.finished = true
784 cs.commitAttemptLocked()
785 cs.mu.Unlock()
786 // For binary logging. only log cancel in finish (could be caused by RPC ctx
787 // canceled or ClientConn closed). Trailer will be logged in RecvMsg.
788 //
789 // Only one of cancel or trailer needs to be logged. In the cases where
790 // users don't call RecvMsg, users must have already canceled the RPC.
791 if cs.binlog != nil && status.Code(err) == codes.Canceled {
792 cs.binlog.Log(&binarylog.Cancel{
793 OnClientSide: true,
794 })
795 }
796 if err == nil {
797 cs.retryThrottler.successfulRPC()
798 }
799 if channelz.IsOn() {
800 if err != nil {
801 cs.cc.incrCallsFailed()
802 } else {
803 cs.cc.incrCallsSucceeded()
804 }
805 }
806 if cs.attempt != nil {
807 cs.attempt.finish(err)
808 }
809 // after functions all rely upon having a stream.
810 if cs.attempt.s != nil {
811 for _, o := range cs.opts {
812 o.after(cs.callInfo)
813 }
814 }
815 cs.cancel()
816}
817
818func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
819 cs := a.cs
Abhilash S.L3b494632019-07-16 15:51:09 +0530820 if a.trInfo != nil {
William Kurkianea869482019-04-09 15:16:11 -0400821 a.mu.Lock()
822 if a.trInfo.tr != nil {
823 a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
824 }
825 a.mu.Unlock()
826 }
827 if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
828 if !cs.desc.ClientStreams {
829 // For non-client-streaming RPCs, we return nil instead of EOF on error
830 // because the generated code requires it. finish is not called; RecvMsg()
831 // will call it with the stream's status independently.
832 return nil
833 }
834 return io.EOF
835 }
836 if a.statsHandler != nil {
837 a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now()))
838 }
839 if channelz.IsOn() {
840 a.t.IncrMsgSent()
841 }
842 return nil
843}
844
845func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
846 cs := a.cs
847 if a.statsHandler != nil && payInfo == nil {
848 payInfo = &payloadInfo{}
849 }
850
851 if !a.decompSet {
852 // Block until we receive headers containing received message encoding.
853 if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity {
854 if a.dc == nil || a.dc.Type() != ct {
855 // No configured decompressor, or it does not match the incoming
856 // message encoding; attempt to find a registered compressor that does.
857 a.dc = nil
858 a.decomp = encoding.GetCompressor(ct)
859 }
860 } else {
861 // No compression is used; disable our decompressor.
862 a.dc = nil
863 }
864 // Only initialize this state once per stream.
865 a.decompSet = true
866 }
867 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
868 if err != nil {
869 if err == io.EOF {
870 if statusErr := a.s.Status().Err(); statusErr != nil {
871 return statusErr
872 }
873 return io.EOF // indicates successful end of stream.
874 }
875 return toRPCErr(err)
876 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530877 if a.trInfo != nil {
William Kurkianea869482019-04-09 15:16:11 -0400878 a.mu.Lock()
879 if a.trInfo.tr != nil {
880 a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
881 }
882 a.mu.Unlock()
883 }
884 if a.statsHandler != nil {
885 a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{
886 Client: true,
887 RecvTime: time.Now(),
888 Payload: m,
889 // TODO truncate large payload.
Abhilash S.L3b494632019-07-16 15:51:09 +0530890 Data: payInfo.uncompressedBytes,
891 WireLength: payInfo.wireLength,
892 Length: len(payInfo.uncompressedBytes),
William Kurkianea869482019-04-09 15:16:11 -0400893 })
894 }
895 if channelz.IsOn() {
896 a.t.IncrMsgRecv()
897 }
898 if cs.desc.ServerStreams {
899 // Subsequent messages should be received by subsequent RecvMsg calls.
900 return nil
901 }
902 // Special handling for non-server-stream rpcs.
903 // This recv expects EOF or errors, so we don't collect inPayload.
904 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp)
905 if err == nil {
906 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
907 }
908 if err == io.EOF {
909 return a.s.Status().Err() // non-server streaming Recv returns nil on success
910 }
911 return toRPCErr(err)
912}
913
914func (a *csAttempt) finish(err error) {
915 a.mu.Lock()
916 if a.finished {
917 a.mu.Unlock()
918 return
919 }
920 a.finished = true
921 if err == io.EOF {
922 // Ending a stream with EOF indicates a success.
923 err = nil
924 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530925 var tr metadata.MD
William Kurkianea869482019-04-09 15:16:11 -0400926 if a.s != nil {
927 a.t.CloseStream(a.s, err)
Abhilash S.L3b494632019-07-16 15:51:09 +0530928 tr = a.s.Trailer()
William Kurkianea869482019-04-09 15:16:11 -0400929 }
930
931 if a.done != nil {
932 br := false
William Kurkianea869482019-04-09 15:16:11 -0400933 if a.s != nil {
934 br = a.s.BytesReceived()
William Kurkianea869482019-04-09 15:16:11 -0400935 }
936 a.done(balancer.DoneInfo{
937 Err: err,
938 Trailer: tr,
939 BytesSent: a.s != nil,
940 BytesReceived: br,
Abhilash S.L3b494632019-07-16 15:51:09 +0530941 ServerLoad: balancerload.Parse(tr),
William Kurkianea869482019-04-09 15:16:11 -0400942 })
943 }
944 if a.statsHandler != nil {
945 end := &stats.End{
946 Client: true,
947 BeginTime: a.cs.beginTime,
948 EndTime: time.Now(),
Abhilash S.L3b494632019-07-16 15:51:09 +0530949 Trailer: tr,
William Kurkianea869482019-04-09 15:16:11 -0400950 Error: err,
951 }
952 a.statsHandler.HandleRPC(a.cs.ctx, end)
953 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530954 if a.trInfo != nil && a.trInfo.tr != nil {
William Kurkianea869482019-04-09 15:16:11 -0400955 if err == nil {
956 a.trInfo.tr.LazyPrintf("RPC: [OK]")
957 } else {
958 a.trInfo.tr.LazyPrintf("RPC: [%v]", err)
959 a.trInfo.tr.SetError()
960 }
961 a.trInfo.tr.Finish()
962 a.trInfo.tr = nil
963 }
964 a.mu.Unlock()
965}
966
Abhilash S.L3b494632019-07-16 15:51:09 +0530967// newClientStream creates a ClientStream with the specified transport, on the
968// given addrConn.
969//
970// It's expected that the given transport is either the same one in addrConn, or
971// is already closed. To avoid race, transport is specified separately, instead
972// of using ac.transpot.
973//
974// Main difference between this and ClientConn.NewStream:
975// - no retry
976// - no service config (or wait for service config)
977// - no tracing or stats
978func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, ac *addrConn, opts ...CallOption) (_ ClientStream, err error) {
William Kurkianea869482019-04-09 15:16:11 -0400979 if t == nil {
980 // TODO: return RPC error here?
981 return nil, errors.New("transport provided is nil")
982 }
983 // defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct.
984 c := &callInfo{}
985
William Kurkianea869482019-04-09 15:16:11 -0400986 // Possible context leak:
987 // The cancel function for the child context we create will only be called
988 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
989 // an error is generated by SendMsg.
990 // https://github.com/grpc/grpc-go/issues/1818.
991 ctx, cancel := context.WithCancel(ctx)
992 defer func() {
993 if err != nil {
994 cancel()
995 }
996 }()
997
Abhilash S.L3b494632019-07-16 15:51:09 +0530998 for _, o := range opts {
999 if err := o.before(c); err != nil {
1000 return nil, toRPCErr(err)
1001 }
1002 }
1003 c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
1004 c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
William Kurkianea869482019-04-09 15:16:11 -04001005 if err := setCallInfoCodec(c); err != nil {
1006 return nil, err
1007 }
1008
1009 callHdr := &transport.CallHdr{
1010 Host: ac.cc.authority,
1011 Method: method,
1012 ContentSubtype: c.contentSubtype,
1013 }
1014
1015 // Set our outgoing compression according to the UseCompressor CallOption, if
1016 // set. In that case, also find the compressor from the encoding package.
1017 // Otherwise, use the compressor configured by the WithCompressor DialOption,
1018 // if set.
1019 var cp Compressor
1020 var comp encoding.Compressor
1021 if ct := c.compressorType; ct != "" {
1022 callHdr.SendCompress = ct
1023 if ct != encoding.Identity {
1024 comp = encoding.GetCompressor(ct)
1025 if comp == nil {
1026 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
1027 }
1028 }
1029 } else if ac.cc.dopts.cp != nil {
1030 callHdr.SendCompress = ac.cc.dopts.cp.Type()
1031 cp = ac.cc.dopts.cp
1032 }
1033 if c.creds != nil {
1034 callHdr.Creds = c.creds
1035 }
1036
Abhilash S.L3b494632019-07-16 15:51:09 +05301037 // Use a special addrConnStream to avoid retry.
William Kurkianea869482019-04-09 15:16:11 -04001038 as := &addrConnStream{
1039 callHdr: callHdr,
1040 ac: ac,
1041 ctx: ctx,
1042 cancel: cancel,
1043 opts: opts,
1044 callInfo: c,
1045 desc: desc,
1046 codec: c.codec,
1047 cp: cp,
1048 comp: comp,
1049 t: t,
1050 }
1051
1052 as.callInfo.stream = as
1053 s, err := as.t.NewStream(as.ctx, as.callHdr)
1054 if err != nil {
1055 err = toRPCErr(err)
1056 return nil, err
1057 }
1058 as.s = s
1059 as.p = &parser{r: s}
1060 ac.incrCallsStarted()
1061 if desc != unaryStreamDesc {
1062 // Listen on cc and stream contexts to cleanup when the user closes the
1063 // ClientConn or cancels the stream context. In all other cases, an error
1064 // should already be injected into the recv buffer by the transport, which
1065 // the client will eventually receive, and then we will cancel the stream's
1066 // context in clientStream.finish.
1067 go func() {
1068 select {
1069 case <-ac.ctx.Done():
1070 as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
1071 case <-ctx.Done():
1072 as.finish(toRPCErr(ctx.Err()))
1073 }
1074 }()
1075 }
1076 return as, nil
1077}
1078
1079type addrConnStream struct {
1080 s *transport.Stream
1081 ac *addrConn
1082 callHdr *transport.CallHdr
1083 cancel context.CancelFunc
1084 opts []CallOption
1085 callInfo *callInfo
1086 t transport.ClientTransport
1087 ctx context.Context
1088 sentLast bool
1089 desc *StreamDesc
1090 codec baseCodec
1091 cp Compressor
1092 comp encoding.Compressor
1093 decompSet bool
1094 dc Decompressor
1095 decomp encoding.Compressor
1096 p *parser
1097 mu sync.Mutex
1098 finished bool
1099}
1100
1101func (as *addrConnStream) Header() (metadata.MD, error) {
1102 m, err := as.s.Header()
1103 if err != nil {
1104 as.finish(toRPCErr(err))
1105 }
1106 return m, err
1107}
1108
1109func (as *addrConnStream) Trailer() metadata.MD {
1110 return as.s.Trailer()
1111}
1112
1113func (as *addrConnStream) CloseSend() error {
1114 if as.sentLast {
1115 // TODO: return an error and finish the stream instead, due to API misuse?
1116 return nil
1117 }
1118 as.sentLast = true
1119
1120 as.t.Write(as.s, nil, nil, &transport.Options{Last: true})
1121 // Always return nil; io.EOF is the only error that might make sense
1122 // instead, but there is no need to signal the client to call RecvMsg
1123 // as the only use left for the stream after CloseSend is to call
1124 // RecvMsg. This also matches historical behavior.
1125 return nil
1126}
1127
1128func (as *addrConnStream) Context() context.Context {
1129 return as.s.Context()
1130}
1131
1132func (as *addrConnStream) SendMsg(m interface{}) (err error) {
1133 defer func() {
1134 if err != nil && err != io.EOF {
1135 // Call finish on the client stream for errors generated by this SendMsg
1136 // call, as these indicate problems created by this client. (Transport
1137 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
1138 // error will be returned from RecvMsg eventually in that case, or be
1139 // retried.)
1140 as.finish(err)
1141 }
1142 }()
1143 if as.sentLast {
1144 return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
1145 }
1146 if !as.desc.ClientStreams {
1147 as.sentLast = true
1148 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301149
1150 // load hdr, payload, data
1151 hdr, payld, _, err := prepareMsg(m, as.codec, as.cp, as.comp)
William Kurkianea869482019-04-09 15:16:11 -04001152 if err != nil {
1153 return err
1154 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301155
William Kurkianea869482019-04-09 15:16:11 -04001156 // TODO(dfawley): should we be checking len(data) instead?
1157 if len(payld) > *as.callInfo.maxSendMessageSize {
1158 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize)
1159 }
1160
1161 if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams}); err != nil {
1162 if !as.desc.ClientStreams {
1163 // For non-client-streaming RPCs, we return nil instead of EOF on error
1164 // because the generated code requires it. finish is not called; RecvMsg()
1165 // will call it with the stream's status independently.
1166 return nil
1167 }
1168 return io.EOF
1169 }
1170
1171 if channelz.IsOn() {
1172 as.t.IncrMsgSent()
1173 }
1174 return nil
1175}
1176
1177func (as *addrConnStream) RecvMsg(m interface{}) (err error) {
1178 defer func() {
1179 if err != nil || !as.desc.ServerStreams {
1180 // err != nil or non-server-streaming indicates end of stream.
1181 as.finish(err)
1182 }
1183 }()
1184
1185 if !as.decompSet {
1186 // Block until we receive headers containing received message encoding.
1187 if ct := as.s.RecvCompress(); ct != "" && ct != encoding.Identity {
1188 if as.dc == nil || as.dc.Type() != ct {
1189 // No configured decompressor, or it does not match the incoming
1190 // message encoding; attempt to find a registered compressor that does.
1191 as.dc = nil
1192 as.decomp = encoding.GetCompressor(ct)
1193 }
1194 } else {
1195 // No compression is used; disable our decompressor.
1196 as.dc = nil
1197 }
1198 // Only initialize this state once per stream.
1199 as.decompSet = true
1200 }
1201 err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
1202 if err != nil {
1203 if err == io.EOF {
1204 if statusErr := as.s.Status().Err(); statusErr != nil {
1205 return statusErr
1206 }
1207 return io.EOF // indicates successful end of stream.
1208 }
1209 return toRPCErr(err)
1210 }
1211
1212 if channelz.IsOn() {
1213 as.t.IncrMsgRecv()
1214 }
1215 if as.desc.ServerStreams {
1216 // Subsequent messages should be received by subsequent RecvMsg calls.
1217 return nil
1218 }
1219
1220 // Special handling for non-server-stream rpcs.
1221 // This recv expects EOF or errors, so we don't collect inPayload.
1222 err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
1223 if err == nil {
1224 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
1225 }
1226 if err == io.EOF {
1227 return as.s.Status().Err() // non-server streaming Recv returns nil on success
1228 }
1229 return toRPCErr(err)
1230}
1231
1232func (as *addrConnStream) finish(err error) {
1233 as.mu.Lock()
1234 if as.finished {
1235 as.mu.Unlock()
1236 return
1237 }
1238 as.finished = true
1239 if err == io.EOF {
1240 // Ending a stream with EOF indicates a success.
1241 err = nil
1242 }
1243 if as.s != nil {
1244 as.t.CloseStream(as.s, err)
1245 }
1246
1247 if err != nil {
1248 as.ac.incrCallsFailed()
1249 } else {
1250 as.ac.incrCallsSucceeded()
1251 }
1252 as.cancel()
1253 as.mu.Unlock()
1254}
1255
1256// ServerStream defines the server-side behavior of a streaming RPC.
1257//
1258// All errors returned from ServerStream methods are compatible with the
1259// status package.
1260type ServerStream interface {
1261 // SetHeader sets the header metadata. It may be called multiple times.
1262 // When call multiple times, all the provided metadata will be merged.
1263 // All the metadata will be sent out when one of the following happens:
1264 // - ServerStream.SendHeader() is called;
1265 // - The first response is sent out;
1266 // - An RPC status is sent out (error or success).
1267 SetHeader(metadata.MD) error
1268 // SendHeader sends the header metadata.
1269 // The provided md and headers set by SetHeader() will be sent.
1270 // It fails if called multiple times.
1271 SendHeader(metadata.MD) error
1272 // SetTrailer sets the trailer metadata which will be sent with the RPC status.
1273 // When called more than once, all the provided metadata will be merged.
1274 SetTrailer(metadata.MD)
1275 // Context returns the context for this stream.
1276 Context() context.Context
1277 // SendMsg sends a message. On error, SendMsg aborts the stream and the
1278 // error is returned directly.
1279 //
1280 // SendMsg blocks until:
1281 // - There is sufficient flow control to schedule m with the transport, or
1282 // - The stream is done, or
1283 // - The stream breaks.
1284 //
1285 // SendMsg does not wait until the message is received by the client. An
1286 // untimely stream closure may result in lost messages.
1287 //
1288 // It is safe to have a goroutine calling SendMsg and another goroutine
1289 // calling RecvMsg on the same stream at the same time, but it is not safe
1290 // to call SendMsg on the same stream in different goroutines.
1291 SendMsg(m interface{}) error
1292 // RecvMsg blocks until it receives a message into m or the stream is
1293 // done. It returns io.EOF when the client has performed a CloseSend. On
1294 // any non-EOF error, the stream is aborted and the error contains the
1295 // RPC status.
1296 //
1297 // It is safe to have a goroutine calling SendMsg and another goroutine
1298 // calling RecvMsg on the same stream at the same time, but it is not
1299 // safe to call RecvMsg on the same stream in different goroutines.
1300 RecvMsg(m interface{}) error
1301}
1302
1303// serverStream implements a server side Stream.
1304type serverStream struct {
1305 ctx context.Context
1306 t transport.ServerTransport
1307 s *transport.Stream
1308 p *parser
1309 codec baseCodec
1310
1311 cp Compressor
1312 dc Decompressor
1313 comp encoding.Compressor
1314 decomp encoding.Compressor
1315
1316 maxReceiveMessageSize int
1317 maxSendMessageSize int
1318 trInfo *traceInfo
1319
1320 statsHandler stats.Handler
1321
1322 binlog *binarylog.MethodLogger
1323 // serverHeaderBinlogged indicates whether server header has been logged. It
1324 // will happen when one of the following two happens: stream.SendHeader(),
1325 // stream.Send().
1326 //
1327 // It's only checked in send and sendHeader, doesn't need to be
1328 // synchronized.
1329 serverHeaderBinlogged bool
1330
1331 mu sync.Mutex // protects trInfo.tr after the service handler runs.
1332}
1333
1334func (ss *serverStream) Context() context.Context {
1335 return ss.ctx
1336}
1337
1338func (ss *serverStream) SetHeader(md metadata.MD) error {
1339 if md.Len() == 0 {
1340 return nil
1341 }
1342 return ss.s.SetHeader(md)
1343}
1344
1345func (ss *serverStream) SendHeader(md metadata.MD) error {
1346 err := ss.t.WriteHeader(ss.s, md)
1347 if ss.binlog != nil && !ss.serverHeaderBinlogged {
1348 h, _ := ss.s.Header()
1349 ss.binlog.Log(&binarylog.ServerHeader{
1350 Header: h,
1351 })
1352 ss.serverHeaderBinlogged = true
1353 }
1354 return err
1355}
1356
1357func (ss *serverStream) SetTrailer(md metadata.MD) {
1358 if md.Len() == 0 {
1359 return
1360 }
1361 ss.s.SetTrailer(md)
1362}
1363
1364func (ss *serverStream) SendMsg(m interface{}) (err error) {
1365 defer func() {
1366 if ss.trInfo != nil {
1367 ss.mu.Lock()
1368 if ss.trInfo.tr != nil {
1369 if err == nil {
1370 ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
1371 } else {
1372 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1373 ss.trInfo.tr.SetError()
1374 }
1375 }
1376 ss.mu.Unlock()
1377 }
1378 if err != nil && err != io.EOF {
1379 st, _ := status.FromError(toRPCErr(err))
1380 ss.t.WriteStatus(ss.s, st)
1381 // Non-user specified status was sent out. This should be an error
1382 // case (as a server side Cancel maybe).
1383 //
1384 // This is not handled specifically now. User will return a final
1385 // status from the service handler, we will log that error instead.
1386 // This behavior is similar to an interceptor.
1387 }
1388 if channelz.IsOn() && err == nil {
1389 ss.t.IncrMsgSent()
1390 }
1391 }()
Abhilash S.L3b494632019-07-16 15:51:09 +05301392
1393 // load hdr, payload, data
1394 hdr, payload, data, err := prepareMsg(m, ss.codec, ss.cp, ss.comp)
William Kurkianea869482019-04-09 15:16:11 -04001395 if err != nil {
1396 return err
1397 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301398
William Kurkianea869482019-04-09 15:16:11 -04001399 // TODO(dfawley): should we be checking len(data) instead?
1400 if len(payload) > ss.maxSendMessageSize {
1401 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize)
1402 }
1403 if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil {
1404 return toRPCErr(err)
1405 }
1406 if ss.binlog != nil {
1407 if !ss.serverHeaderBinlogged {
1408 h, _ := ss.s.Header()
1409 ss.binlog.Log(&binarylog.ServerHeader{
1410 Header: h,
1411 })
1412 ss.serverHeaderBinlogged = true
1413 }
1414 ss.binlog.Log(&binarylog.ServerMessage{
1415 Message: data,
1416 })
1417 }
1418 if ss.statsHandler != nil {
1419 ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
1420 }
1421 return nil
1422}
1423
1424func (ss *serverStream) RecvMsg(m interface{}) (err error) {
1425 defer func() {
1426 if ss.trInfo != nil {
1427 ss.mu.Lock()
1428 if ss.trInfo.tr != nil {
1429 if err == nil {
1430 ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
1431 } else if err != io.EOF {
1432 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1433 ss.trInfo.tr.SetError()
1434 }
1435 }
1436 ss.mu.Unlock()
1437 }
1438 if err != nil && err != io.EOF {
1439 st, _ := status.FromError(toRPCErr(err))
1440 ss.t.WriteStatus(ss.s, st)
1441 // Non-user specified status was sent out. This should be an error
1442 // case (as a server side Cancel maybe).
1443 //
1444 // This is not handled specifically now. User will return a final
1445 // status from the service handler, we will log that error instead.
1446 // This behavior is similar to an interceptor.
1447 }
1448 if channelz.IsOn() && err == nil {
1449 ss.t.IncrMsgRecv()
1450 }
1451 }()
1452 var payInfo *payloadInfo
1453 if ss.statsHandler != nil || ss.binlog != nil {
1454 payInfo = &payloadInfo{}
1455 }
1456 if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil {
1457 if err == io.EOF {
1458 if ss.binlog != nil {
1459 ss.binlog.Log(&binarylog.ClientHalfClose{})
1460 }
1461 return err
1462 }
1463 if err == io.ErrUnexpectedEOF {
1464 err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
1465 }
1466 return toRPCErr(err)
1467 }
1468 if ss.statsHandler != nil {
1469 ss.statsHandler.HandleRPC(ss.s.Context(), &stats.InPayload{
1470 RecvTime: time.Now(),
1471 Payload: m,
1472 // TODO truncate large payload.
Abhilash S.L3b494632019-07-16 15:51:09 +05301473 Data: payInfo.uncompressedBytes,
1474 WireLength: payInfo.wireLength,
1475 Length: len(payInfo.uncompressedBytes),
William Kurkianea869482019-04-09 15:16:11 -04001476 })
1477 }
1478 if ss.binlog != nil {
1479 ss.binlog.Log(&binarylog.ClientMessage{
1480 Message: payInfo.uncompressedBytes,
1481 })
1482 }
1483 return nil
1484}
1485
1486// MethodFromServerStream returns the method string for the input stream.
1487// The returned string is in the format of "/service/method".
1488func MethodFromServerStream(stream ServerStream) (string, bool) {
1489 return Method(stream.Context())
1490}
Abhilash S.L3b494632019-07-16 15:51:09 +05301491
1492// prepareMsg returns the hdr, payload and data
1493// using the compressors passed or using the
1494// passed preparedmsg
1495func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) {
1496 if preparedMsg, ok := m.(*PreparedMsg); ok {
1497 return preparedMsg.hdr, preparedMsg.payload, preparedMsg.encodedData, nil
1498 }
1499 // The input interface is not a prepared msg.
1500 // Marshal and Compress the data at this point
1501 data, err = encode(codec, m)
1502 if err != nil {
1503 return nil, nil, nil, err
1504 }
1505 compData, err := compress(data, cp, comp)
1506 if err != nil {
1507 return nil, nil, nil, err
1508 }
1509 hdr, payload = msgHeader(data, compData)
1510 return hdr, payload, data, nil
1511}