blob: 10092685b2283467675364465910904897266299 [file] [log] [blame]
khenaidoo5fc5cea2021-08-11 17:39:16 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "io"
25 "math"
26 "strconv"
27 "sync"
28 "time"
29
30 "golang.org/x/net/trace"
31 "google.golang.org/grpc/balancer"
32 "google.golang.org/grpc/codes"
33 "google.golang.org/grpc/encoding"
34 "google.golang.org/grpc/internal/balancerload"
35 "google.golang.org/grpc/internal/binarylog"
36 "google.golang.org/grpc/internal/channelz"
37 "google.golang.org/grpc/internal/grpcrand"
38 "google.golang.org/grpc/internal/grpcutil"
Akash Kankanala761955c2024-02-21 19:32:20 +053039 imetadata "google.golang.org/grpc/internal/metadata"
khenaidoo5fc5cea2021-08-11 17:39:16 -040040 iresolver "google.golang.org/grpc/internal/resolver"
41 "google.golang.org/grpc/internal/serviceconfig"
Akash Kankanala761955c2024-02-21 19:32:20 +053042 istatus "google.golang.org/grpc/internal/status"
khenaidoo5fc5cea2021-08-11 17:39:16 -040043 "google.golang.org/grpc/internal/transport"
44 "google.golang.org/grpc/metadata"
45 "google.golang.org/grpc/peer"
46 "google.golang.org/grpc/stats"
47 "google.golang.org/grpc/status"
48)
49
50// StreamHandler defines the handler called by gRPC server to complete the
Akash Kankanala761955c2024-02-21 19:32:20 +053051// execution of a streaming RPC.
52//
53// If a StreamHandler returns an error, it should either be produced by the
54// status package, or be one of the context errors. Otherwise, gRPC will use
55// codes.Unknown as the status code and err.Error() as the status message of the
56// RPC.
khenaidoo5fc5cea2021-08-11 17:39:16 -040057type StreamHandler func(srv interface{}, stream ServerStream) error
58
59// StreamDesc represents a streaming RPC service's method specification. Used
60// on the server when registering services and on the client when initiating
61// new streams.
62type StreamDesc struct {
63 // StreamName and Handler are only used when registering handlers on a
64 // server.
65 StreamName string // the name of the method excluding the service
66 Handler StreamHandler // the handler called for the method
67
68 // ServerStreams and ClientStreams are used for registering handlers on a
69 // server as well as defining RPC behavior when passed to NewClientStream
70 // and ClientConn.NewStream. At least one must be true.
71 ServerStreams bool // indicates the server can perform streaming sends
72 ClientStreams bool // indicates the client can perform streaming sends
73}
74
75// Stream defines the common interface a client or server stream has to satisfy.
76//
77// Deprecated: See ClientStream and ServerStream documentation instead.
78type Stream interface {
79 // Deprecated: See ClientStream and ServerStream documentation instead.
80 Context() context.Context
81 // Deprecated: See ClientStream and ServerStream documentation instead.
82 SendMsg(m interface{}) error
83 // Deprecated: See ClientStream and ServerStream documentation instead.
84 RecvMsg(m interface{}) error
85}
86
87// ClientStream defines the client-side behavior of a streaming RPC.
88//
89// All errors returned from ClientStream methods are compatible with the
90// status package.
91type ClientStream interface {
92 // Header returns the header metadata received from the server if there
93 // is any. It blocks if the metadata is not ready to read.
94 Header() (metadata.MD, error)
95 // Trailer returns the trailer metadata from the server, if there is any.
96 // It must only be called after stream.CloseAndRecv has returned, or
97 // stream.Recv has returned a non-nil error (including io.EOF).
98 Trailer() metadata.MD
99 // CloseSend closes the send direction of the stream. It closes the stream
100 // when non-nil error is met. It is also not safe to call CloseSend
101 // concurrently with SendMsg.
102 CloseSend() error
103 // Context returns the context for this stream.
104 //
105 // It should not be called until after Header or RecvMsg has returned. Once
106 // called, subsequent client-side retries are disabled.
107 Context() context.Context
108 // SendMsg is generally called by generated code. On error, SendMsg aborts
109 // the stream. If the error was generated by the client, the status is
110 // returned directly; otherwise, io.EOF is returned and the status of
111 // the stream may be discovered using RecvMsg.
112 //
113 // SendMsg blocks until:
114 // - There is sufficient flow control to schedule m with the transport, or
115 // - The stream is done, or
116 // - The stream breaks.
117 //
118 // SendMsg does not wait until the message is received by the server. An
119 // untimely stream closure may result in lost messages. To ensure delivery,
120 // users should ensure the RPC completed successfully using RecvMsg.
121 //
122 // It is safe to have a goroutine calling SendMsg and another goroutine
123 // calling RecvMsg on the same stream at the same time, but it is not safe
124 // to call SendMsg on the same stream in different goroutines. It is also
125 // not safe to call CloseSend concurrently with SendMsg.
Akash Kankanala761955c2024-02-21 19:32:20 +0530126 //
127 // It is not safe to modify the message after calling SendMsg. Tracing
128 // libraries and stats handlers may use the message lazily.
khenaidoo5fc5cea2021-08-11 17:39:16 -0400129 SendMsg(m interface{}) error
130 // RecvMsg blocks until it receives a message into m or the stream is
131 // done. It returns io.EOF when the stream completes successfully. On
132 // any other error, the stream is aborted and the error contains the RPC
133 // status.
134 //
135 // It is safe to have a goroutine calling SendMsg and another goroutine
136 // calling RecvMsg on the same stream at the same time, but it is not
137 // safe to call RecvMsg on the same stream in different goroutines.
138 RecvMsg(m interface{}) error
139}
140
141// NewStream creates a new Stream for the client side. This is typically
142// called by generated code. ctx is used for the lifetime of the stream.
143//
144// To ensure resources are not leaked due to the stream returned, one of the following
145// actions must be performed:
146//
Joey Armstrongba3d9d12024-01-15 14:22:11 -0500147// 1. Call Close on the ClientConn.
148// 2. Cancel the context provided.
149// 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
150// client-streaming RPC, for instance, might use the helper function
151// CloseAndRecv (note that CloseSend does not Recv, therefore is not
152// guaranteed to release all resources).
153// 4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
khenaidoo5fc5cea2021-08-11 17:39:16 -0400154//
155// If none of the above happen, a goroutine and a context will be leaked, and grpc
156// will not call the optionally-configured stats handler with a stats.End message.
157func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
Akash Kankanala761955c2024-02-21 19:32:20 +0530158 if err := cc.idlenessMgr.onCallBegin(); err != nil {
159 return nil, err
160 }
161 defer cc.idlenessMgr.onCallEnd()
162
khenaidoo5fc5cea2021-08-11 17:39:16 -0400163 // allow interceptor to see all applicable call options, which means those
164 // configured as defaults from dial option as well as per-call options
165 opts = combine(cc.dopts.callOptions, opts)
166
167 if cc.dopts.streamInt != nil {
168 return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
169 }
170 return newClientStream(ctx, desc, cc, method, opts...)
171}
172
173// NewClientStream is a wrapper for ClientConn.NewStream.
174func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
175 return cc.NewStream(ctx, desc, method, opts...)
176}
177
178func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
Akash Kankanala761955c2024-02-21 19:32:20 +0530179 if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
180 // validate md
181 if err := imetadata.Validate(md); err != nil {
182 return nil, status.Error(codes.Internal, err.Error())
183 }
184 // validate added
185 for _, kvs := range added {
186 for i := 0; i < len(kvs); i += 2 {
187 if err := imetadata.ValidatePair(kvs[i], kvs[i+1]); err != nil {
188 return nil, status.Error(codes.Internal, err.Error())
189 }
190 }
191 }
192 }
khenaidoo5fc5cea2021-08-11 17:39:16 -0400193 if channelz.IsOn() {
194 cc.incrCallsStarted()
195 defer func() {
196 if err != nil {
197 cc.incrCallsFailed()
198 }
199 }()
200 }
201 // Provide an opportunity for the first RPC to see the first service config
202 // provided by the resolver.
203 if err := cc.waitForResolvedAddrs(ctx); err != nil {
204 return nil, err
205 }
206
207 var mc serviceconfig.MethodConfig
208 var onCommit func()
209 var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
210 return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
211 }
212
213 rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method}
214 rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo)
215 if err != nil {
Akash Kankanala761955c2024-02-21 19:32:20 +0530216 if st, ok := status.FromError(err); ok {
217 // Restrict the code to the list allowed by gRFC A54.
218 if istatus.IsRestrictedControlPlaneCode(st) {
219 err = status.Errorf(codes.Internal, "config selector returned illegal status: %v", err)
220 }
221 return nil, err
222 }
khenaidoo5fc5cea2021-08-11 17:39:16 -0400223 return nil, toRPCErr(err)
224 }
225
226 if rpcConfig != nil {
227 if rpcConfig.Context != nil {
228 ctx = rpcConfig.Context
229 }
230 mc = rpcConfig.MethodConfig
231 onCommit = rpcConfig.OnCommitted
232 if rpcConfig.Interceptor != nil {
233 rpcInfo.Context = nil
234 ns := newStream
235 newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
236 cs, err := rpcConfig.Interceptor.NewStream(ctx, rpcInfo, done, ns)
237 if err != nil {
238 return nil, toRPCErr(err)
239 }
240 return cs, nil
241 }
242 }
243 }
244
245 return newStream(ctx, func() {})
246}
247
248func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
249 c := defaultCallInfo()
250 if mc.WaitForReady != nil {
251 c.failFast = !*mc.WaitForReady
252 }
253
254 // Possible context leak:
255 // The cancel function for the child context we create will only be called
256 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
257 // an error is generated by SendMsg.
258 // https://github.com/grpc/grpc-go/issues/1818.
259 var cancel context.CancelFunc
260 if mc.Timeout != nil && *mc.Timeout >= 0 {
261 ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
262 } else {
263 ctx, cancel = context.WithCancel(ctx)
264 }
265 defer func() {
266 if err != nil {
267 cancel()
268 }
269 }()
270
271 for _, o := range opts {
272 if err := o.before(c); err != nil {
273 return nil, toRPCErr(err)
274 }
275 }
276 c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize)
277 c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
278 if err := setCallInfoCodec(c); err != nil {
279 return nil, err
280 }
281
282 callHdr := &transport.CallHdr{
283 Host: cc.authority,
284 Method: method,
285 ContentSubtype: c.contentSubtype,
286 DoneFunc: doneFunc,
287 }
288
289 // Set our outgoing compression according to the UseCompressor CallOption, if
290 // set. In that case, also find the compressor from the encoding package.
291 // Otherwise, use the compressor configured by the WithCompressor DialOption,
292 // if set.
293 var cp Compressor
294 var comp encoding.Compressor
295 if ct := c.compressorType; ct != "" {
296 callHdr.SendCompress = ct
297 if ct != encoding.Identity {
298 comp = encoding.GetCompressor(ct)
299 if comp == nil {
300 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
301 }
302 }
303 } else if cc.dopts.cp != nil {
304 callHdr.SendCompress = cc.dopts.cp.Type()
305 cp = cc.dopts.cp
306 }
307 if c.creds != nil {
308 callHdr.Creds = c.creds
309 }
310
311 cs := &clientStream{
312 callHdr: callHdr,
313 ctx: ctx,
314 methodConfig: &mc,
315 opts: opts,
316 callInfo: c,
317 cc: cc,
318 desc: desc,
319 codec: c.codec,
320 cp: cp,
321 comp: comp,
322 cancel: cancel,
323 firstAttempt: true,
324 onCommit: onCommit,
325 }
326 if !cc.dopts.disableRetry {
327 cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
328 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530329 if ml := binarylog.GetMethodLogger(method); ml != nil {
330 cs.binlogs = append(cs.binlogs, ml)
331 }
332 if cc.dopts.binaryLogger != nil {
333 if ml := cc.dopts.binaryLogger.GetMethodLogger(method); ml != nil {
334 cs.binlogs = append(cs.binlogs, ml)
335 }
khenaidoo5fc5cea2021-08-11 17:39:16 -0400336 }
337
Akash Kankanala761955c2024-02-21 19:32:20 +0530338 // Pick the transport to use and create a new stream on the transport.
339 // Assign cs.attempt upon success.
340 op := func(a *csAttempt) error {
341 if err := a.getTransport(); err != nil {
342 return err
343 }
344 if err := a.newStream(); err != nil {
345 return err
346 }
347 // Because this operation is always called either here (while creating
348 // the clientStream) or by the retry code while locked when replaying
349 // the operation, it is safe to access cs.attempt directly.
350 cs.attempt = a
351 return nil
352 }
khenaidoo5fc5cea2021-08-11 17:39:16 -0400353 if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400354 return nil, err
355 }
356
Akash Kankanala761955c2024-02-21 19:32:20 +0530357 if len(cs.binlogs) != 0 {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400358 md, _ := metadata.FromOutgoingContext(ctx)
359 logEntry := &binarylog.ClientHeader{
360 OnClientSide: true,
361 Header: md,
362 MethodName: method,
363 Authority: cs.cc.authority,
364 }
365 if deadline, ok := ctx.Deadline(); ok {
366 logEntry.Timeout = time.Until(deadline)
367 if logEntry.Timeout < 0 {
368 logEntry.Timeout = 0
369 }
370 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530371 for _, binlog := range cs.binlogs {
372 binlog.Log(cs.ctx, logEntry)
373 }
khenaidoo5fc5cea2021-08-11 17:39:16 -0400374 }
375
376 if desc != unaryStreamDesc {
377 // Listen on cc and stream contexts to cleanup when the user closes the
378 // ClientConn or cancels the stream context. In all other cases, an error
379 // should already be injected into the recv buffer by the transport, which
380 // the client will eventually receive, and then we will cancel the stream's
381 // context in clientStream.finish.
382 go func() {
383 select {
384 case <-cc.ctx.Done():
385 cs.finish(ErrClientConnClosing)
386 case <-ctx.Done():
387 cs.finish(toRPCErr(ctx.Err()))
388 }
389 }()
390 }
391 return cs, nil
392}
393
Akash Kankanala761955c2024-02-21 19:32:20 +0530394// newAttemptLocked creates a new csAttempt without a transport or stream.
395func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error) {
396 if err := cs.ctx.Err(); err != nil {
397 return nil, toRPCErr(err)
398 }
399 if err := cs.cc.ctx.Err(); err != nil {
400 return nil, ErrClientConnClosing
401 }
402
khenaidoo5fc5cea2021-08-11 17:39:16 -0400403 ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.cp, cs.comp)
404 method := cs.callHdr.Method
khenaidoo5fc5cea2021-08-11 17:39:16 -0400405 var beginTime time.Time
Akash Kankanala761955c2024-02-21 19:32:20 +0530406 shs := cs.cc.dopts.copts.StatsHandlers
407 for _, sh := range shs {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400408 ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast})
409 beginTime = time.Now()
410 begin := &stats.Begin{
411 Client: true,
412 BeginTime: beginTime,
413 FailFast: cs.callInfo.failFast,
414 IsClientStream: cs.desc.ClientStreams,
415 IsServerStream: cs.desc.ServerStreams,
416 IsTransparentRetryAttempt: isTransparent,
417 }
418 sh.HandleRPC(ctx, begin)
419 }
420
421 var trInfo *traceInfo
422 if EnableTracing {
423 trInfo = &traceInfo{
424 tr: trace.New("grpc.Sent."+methodFamily(method), method),
425 firstLine: firstLine{
426 client: true,
427 },
428 }
429 if deadline, ok := ctx.Deadline(); ok {
430 trInfo.firstLine.deadline = time.Until(deadline)
431 }
432 trInfo.tr.LazyLog(&trInfo.firstLine, false)
433 ctx = trace.NewContext(ctx, trInfo.tr)
434 }
435
Akash Kankanala761955c2024-02-21 19:32:20 +0530436 if cs.cc.parsedTarget.URL.Scheme == "xds" {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400437 // Add extra metadata (metadata that will be added by transport) to context
438 // so the balancer can see them.
439 ctx = grpcutil.WithExtraMetadata(ctx, metadata.Pairs(
440 "content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype),
441 ))
442 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530443
444 return &csAttempt{
445 ctx: ctx,
446 beginTime: beginTime,
447 cs: cs,
448 dc: cs.cc.dopts.dc,
449 statsHandlers: shs,
450 trInfo: trInfo,
451 }, nil
452}
453
454func (a *csAttempt) getTransport() error {
455 cs := a.cs
456
457 var err error
458 a.t, a.pickResult, err = cs.cc.getTransport(a.ctx, cs.callInfo.failFast, cs.callHdr.Method)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400459 if err != nil {
Akash Kankanala761955c2024-02-21 19:32:20 +0530460 if de, ok := err.(dropError); ok {
461 err = de.error
462 a.drop = true
463 }
khenaidoo5fc5cea2021-08-11 17:39:16 -0400464 return err
465 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530466 if a.trInfo != nil {
467 a.trInfo.firstLine.SetRemoteAddr(a.t.RemoteAddr())
khenaidoo5fc5cea2021-08-11 17:39:16 -0400468 }
khenaidoo5fc5cea2021-08-11 17:39:16 -0400469 return nil
470}
471
472func (a *csAttempt) newStream() error {
473 cs := a.cs
474 cs.callHdr.PreviousAttempts = cs.numRetries
Akash Kankanala761955c2024-02-21 19:32:20 +0530475
476 // Merge metadata stored in PickResult, if any, with existing call metadata.
477 // It is safe to overwrite the csAttempt's context here, since all state
478 // maintained in it are local to the attempt. When the attempt has to be
479 // retried, a new instance of csAttempt will be created.
480 if a.pickResult.Metadata != nil {
481 // We currently do not have a function it the metadata package which
482 // merges given metadata with existing metadata in a context. Existing
483 // function `AppendToOutgoingContext()` takes a variadic argument of key
484 // value pairs.
485 //
486 // TODO: Make it possible to retrieve key value pairs from metadata.MD
487 // in a form passable to AppendToOutgoingContext(), or create a version
488 // of AppendToOutgoingContext() that accepts a metadata.MD.
489 md, _ := metadata.FromOutgoingContext(a.ctx)
490 md = metadata.Join(md, a.pickResult.Metadata)
491 a.ctx = metadata.NewOutgoingContext(a.ctx, md)
492 }
493
khenaidoo5fc5cea2021-08-11 17:39:16 -0400494 s, err := a.t.NewStream(a.ctx, cs.callHdr)
495 if err != nil {
Akash Kankanala761955c2024-02-21 19:32:20 +0530496 nse, ok := err.(*transport.NewStreamError)
497 if !ok {
498 // Unexpected.
499 return err
500 }
501
502 if nse.AllowTransparentRetry {
503 a.allowTransparentRetry = true
504 }
505
506 // Unwrap and convert error.
507 return toRPCErr(nse.Err)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400508 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530509 a.s = s
510 a.p = &parser{r: s}
khenaidoo5fc5cea2021-08-11 17:39:16 -0400511 return nil
512}
513
514// clientStream implements a client side Stream.
515type clientStream struct {
516 callHdr *transport.CallHdr
517 opts []CallOption
518 callInfo *callInfo
519 cc *ClientConn
520 desc *StreamDesc
521
522 codec baseCodec
523 cp Compressor
524 comp encoding.Compressor
525
526 cancel context.CancelFunc // cancels all attempts
527
528 sentLast bool // sent an end stream
529
530 methodConfig *MethodConfig
531
532 ctx context.Context // the application's context, wrapped by stats/tracing
533
534 retryThrottler *retryThrottler // The throttler active when the RPC began.
535
Akash Kankanala761955c2024-02-21 19:32:20 +0530536 binlogs []binarylog.MethodLogger
khenaidoo5fc5cea2021-08-11 17:39:16 -0400537 // serverHeaderBinlogged is a boolean for whether server header has been
538 // logged. Server header will be logged when the first time one of those
539 // happens: stream.Header(), stream.Recv().
540 //
541 // It's only read and used by Recv() and Header(), so it doesn't need to be
542 // synchronized.
543 serverHeaderBinlogged bool
544
545 mu sync.Mutex
546 firstAttempt bool // if true, transparent retry is valid
547 numRetries int // exclusive of transparent retry attempt(s)
548 numRetriesSincePushback int // retries since pushback; to reset backoff
549 finished bool // TODO: replace with atomic cmpxchg or sync.Once?
550 // attempt is the active client stream attempt.
551 // The only place where it is written is the newAttemptLocked method and this method never writes nil.
552 // So, attempt can be nil only inside newClientStream function when clientStream is first created.
553 // One of the first things done after clientStream's creation, is to call newAttemptLocked which either
554 // assigns a non nil value to the attempt or returns an error. If an error is returned from newAttemptLocked,
555 // then newClientStream calls finish on the clientStream and returns. So, finish method is the only
556 // place where we need to check if the attempt is nil.
557 attempt *csAttempt
558 // TODO(hedging): hedging will have multiple attempts simultaneously.
559 committed bool // active attempt committed for retry?
560 onCommit func()
561 buffer []func(a *csAttempt) error // operations to replay on retry
562 bufferSize int // current size of buffer
563}
564
565// csAttempt implements a single transport stream attempt within a
566// clientStream.
567type csAttempt struct {
Akash Kankanala761955c2024-02-21 19:32:20 +0530568 ctx context.Context
569 cs *clientStream
570 t transport.ClientTransport
571 s *transport.Stream
572 p *parser
573 pickResult balancer.PickResult
khenaidoo5fc5cea2021-08-11 17:39:16 -0400574
575 finished bool
576 dc Decompressor
577 decomp encoding.Compressor
578 decompSet bool
579
580 mu sync.Mutex // guards trInfo.tr
581 // trInfo may be nil (if EnableTracing is false).
582 // trInfo.tr is set when created (if EnableTracing is true),
583 // and cleared when the finish method is called.
584 trInfo *traceInfo
585
Akash Kankanala761955c2024-02-21 19:32:20 +0530586 statsHandlers []stats.Handler
587 beginTime time.Time
588
589 // set for newStream errors that may be transparently retried
590 allowTransparentRetry bool
591 // set for pick errors that are returned as a status
592 drop bool
khenaidoo5fc5cea2021-08-11 17:39:16 -0400593}
594
595func (cs *clientStream) commitAttemptLocked() {
596 if !cs.committed && cs.onCommit != nil {
597 cs.onCommit()
598 }
599 cs.committed = true
600 cs.buffer = nil
601}
602
603func (cs *clientStream) commitAttempt() {
604 cs.mu.Lock()
605 cs.commitAttemptLocked()
606 cs.mu.Unlock()
607}
608
609// shouldRetry returns nil if the RPC should be retried; otherwise it returns
610// the error that should be returned by the operation. If the RPC should be
611// retried, the bool indicates whether it is being retried transparently.
Akash Kankanala761955c2024-02-21 19:32:20 +0530612func (a *csAttempt) shouldRetry(err error) (bool, error) {
613 cs := a.cs
khenaidoo5fc5cea2021-08-11 17:39:16 -0400614
Akash Kankanala761955c2024-02-21 19:32:20 +0530615 if cs.finished || cs.committed || a.drop {
616 // RPC is finished or committed or was dropped by the picker; cannot retry.
khenaidoo5fc5cea2021-08-11 17:39:16 -0400617 return false, err
618 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530619 if a.s == nil && a.allowTransparentRetry {
620 return true, nil
621 }
khenaidoo5fc5cea2021-08-11 17:39:16 -0400622 // Wait for the trailers.
623 unprocessed := false
Akash Kankanala761955c2024-02-21 19:32:20 +0530624 if a.s != nil {
625 <-a.s.Done()
626 unprocessed = a.s.Unprocessed()
khenaidoo5fc5cea2021-08-11 17:39:16 -0400627 }
628 if cs.firstAttempt && unprocessed {
629 // First attempt, stream unprocessed: transparently retry.
630 return true, nil
631 }
632 if cs.cc.dopts.disableRetry {
633 return false, err
634 }
635
636 pushback := 0
637 hasPushback := false
Akash Kankanala761955c2024-02-21 19:32:20 +0530638 if a.s != nil {
639 if !a.s.TrailersOnly() {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400640 return false, err
641 }
642
643 // TODO(retry): Move down if the spec changes to not check server pushback
644 // before considering this a failure for throttling.
Akash Kankanala761955c2024-02-21 19:32:20 +0530645 sps := a.s.Trailer()["grpc-retry-pushback-ms"]
khenaidoo5fc5cea2021-08-11 17:39:16 -0400646 if len(sps) == 1 {
647 var e error
648 if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
649 channelz.Infof(logger, cs.cc.channelzID, "Server retry pushback specified to abort (%q).", sps[0])
650 cs.retryThrottler.throttle() // This counts as a failure for throttling.
651 return false, err
652 }
653 hasPushback = true
654 } else if len(sps) > 1 {
655 channelz.Warningf(logger, cs.cc.channelzID, "Server retry pushback specified multiple values (%q); not retrying.", sps)
656 cs.retryThrottler.throttle() // This counts as a failure for throttling.
657 return false, err
658 }
659 }
660
661 var code codes.Code
Akash Kankanala761955c2024-02-21 19:32:20 +0530662 if a.s != nil {
663 code = a.s.Status().Code()
khenaidoo5fc5cea2021-08-11 17:39:16 -0400664 } else {
Akash Kankanala761955c2024-02-21 19:32:20 +0530665 code = status.Code(err)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400666 }
667
668 rp := cs.methodConfig.RetryPolicy
669 if rp == nil || !rp.RetryableStatusCodes[code] {
670 return false, err
671 }
672
673 // Note: the ordering here is important; we count this as a failure
674 // only if the code matched a retryable code.
675 if cs.retryThrottler.throttle() {
676 return false, err
677 }
678 if cs.numRetries+1 >= rp.MaxAttempts {
679 return false, err
680 }
681
682 var dur time.Duration
683 if hasPushback {
684 dur = time.Millisecond * time.Duration(pushback)
685 cs.numRetriesSincePushback = 0
686 } else {
687 fact := math.Pow(rp.BackoffMultiplier, float64(cs.numRetriesSincePushback))
688 cur := float64(rp.InitialBackoff) * fact
689 if max := float64(rp.MaxBackoff); cur > max {
690 cur = max
691 }
692 dur = time.Duration(grpcrand.Int63n(int64(cur)))
693 cs.numRetriesSincePushback++
694 }
695
696 // TODO(dfawley): we could eagerly fail here if dur puts us past the
697 // deadline, but unsure if it is worth doing.
698 t := time.NewTimer(dur)
699 select {
700 case <-t.C:
701 cs.numRetries++
702 return false, nil
703 case <-cs.ctx.Done():
704 t.Stop()
705 return false, status.FromContextError(cs.ctx.Err()).Err()
706 }
707}
708
709// Returns nil if a retry was performed and succeeded; error otherwise.
Akash Kankanala761955c2024-02-21 19:32:20 +0530710func (cs *clientStream) retryLocked(attempt *csAttempt, lastErr error) error {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400711 for {
Akash Kankanala761955c2024-02-21 19:32:20 +0530712 attempt.finish(toRPCErr(lastErr))
713 isTransparent, err := attempt.shouldRetry(lastErr)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400714 if err != nil {
715 cs.commitAttemptLocked()
716 return err
717 }
718 cs.firstAttempt = false
Akash Kankanala761955c2024-02-21 19:32:20 +0530719 attempt, err = cs.newAttemptLocked(isTransparent)
720 if err != nil {
721 // Only returns error if the clientconn is closed or the context of
722 // the stream is canceled.
khenaidoo5fc5cea2021-08-11 17:39:16 -0400723 return err
724 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530725 // Note that the first op in the replay buffer always sets cs.attempt
726 // if it is able to pick a transport and create a stream.
727 if lastErr = cs.replayBufferLocked(attempt); lastErr == nil {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400728 return nil
729 }
730 }
731}
732
733func (cs *clientStream) Context() context.Context {
734 cs.commitAttempt()
735 // No need to lock before using attempt, since we know it is committed and
736 // cannot change.
Akash Kankanala761955c2024-02-21 19:32:20 +0530737 if cs.attempt.s != nil {
738 return cs.attempt.s.Context()
739 }
740 return cs.ctx
khenaidoo5fc5cea2021-08-11 17:39:16 -0400741}
742
743func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
744 cs.mu.Lock()
745 for {
746 if cs.committed {
747 cs.mu.Unlock()
748 // toRPCErr is used in case the error from the attempt comes from
749 // NewClientStream, which intentionally doesn't return a status
750 // error to allow for further inspection; all other errors should
751 // already be status errors.
752 return toRPCErr(op(cs.attempt))
753 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530754 if len(cs.buffer) == 0 {
755 // For the first op, which controls creation of the stream and
756 // assigns cs.attempt, we need to create a new attempt inline
757 // before executing the first op. On subsequent ops, the attempt
758 // is created immediately before replaying the ops.
759 var err error
760 if cs.attempt, err = cs.newAttemptLocked(false /* isTransparent */); err != nil {
761 cs.mu.Unlock()
762 cs.finish(err)
763 return err
764 }
765 }
khenaidoo5fc5cea2021-08-11 17:39:16 -0400766 a := cs.attempt
767 cs.mu.Unlock()
768 err := op(a)
769 cs.mu.Lock()
770 if a != cs.attempt {
771 // We started another attempt already.
772 continue
773 }
774 if err == io.EOF {
775 <-a.s.Done()
776 }
777 if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) {
778 onSuccess()
779 cs.mu.Unlock()
780 return err
781 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530782 if err := cs.retryLocked(a, err); err != nil {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400783 cs.mu.Unlock()
784 return err
785 }
786 }
787}
788
789func (cs *clientStream) Header() (metadata.MD, error) {
790 var m metadata.MD
Akash Kankanala761955c2024-02-21 19:32:20 +0530791 noHeader := false
khenaidoo5fc5cea2021-08-11 17:39:16 -0400792 err := cs.withRetry(func(a *csAttempt) error {
793 var err error
794 m, err = a.s.Header()
Akash Kankanala761955c2024-02-21 19:32:20 +0530795 if err == transport.ErrNoHeaders {
796 noHeader = true
797 return nil
798 }
khenaidoo5fc5cea2021-08-11 17:39:16 -0400799 return toRPCErr(err)
800 }, cs.commitAttemptLocked)
Akash Kankanala761955c2024-02-21 19:32:20 +0530801
khenaidoo5fc5cea2021-08-11 17:39:16 -0400802 if err != nil {
803 cs.finish(err)
804 return nil, err
805 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530806
807 if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged && !noHeader {
808 // Only log if binary log is on and header has not been logged, and
809 // there is actually headers to log.
khenaidoo5fc5cea2021-08-11 17:39:16 -0400810 logEntry := &binarylog.ServerHeader{
811 OnClientSide: true,
812 Header: m,
813 PeerAddr: nil,
814 }
815 if peer, ok := peer.FromContext(cs.Context()); ok {
816 logEntry.PeerAddr = peer.Addr
817 }
khenaidoo5fc5cea2021-08-11 17:39:16 -0400818 cs.serverHeaderBinlogged = true
Akash Kankanala761955c2024-02-21 19:32:20 +0530819 for _, binlog := range cs.binlogs {
820 binlog.Log(cs.ctx, logEntry)
821 }
khenaidoo5fc5cea2021-08-11 17:39:16 -0400822 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530823 return m, nil
khenaidoo5fc5cea2021-08-11 17:39:16 -0400824}
825
826func (cs *clientStream) Trailer() metadata.MD {
827 // On RPC failure, we never need to retry, because usage requires that
828 // RecvMsg() returned a non-nil error before calling this function is valid.
829 // We would have retried earlier if necessary.
830 //
831 // Commit the attempt anyway, just in case users are not following those
832 // directions -- it will prevent races and should not meaningfully impact
833 // performance.
834 cs.commitAttempt()
835 if cs.attempt.s == nil {
836 return nil
837 }
838 return cs.attempt.s.Trailer()
839}
840
Akash Kankanala761955c2024-02-21 19:32:20 +0530841func (cs *clientStream) replayBufferLocked(attempt *csAttempt) error {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400842 for _, f := range cs.buffer {
Akash Kankanala761955c2024-02-21 19:32:20 +0530843 if err := f(attempt); err != nil {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400844 return err
845 }
846 }
847 return nil
848}
849
850func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error) {
851 // Note: we still will buffer if retry is disabled (for transparent retries).
852 if cs.committed {
853 return
854 }
855 cs.bufferSize += sz
856 if cs.bufferSize > cs.callInfo.maxRetryRPCBufferSize {
857 cs.commitAttemptLocked()
858 return
859 }
860 cs.buffer = append(cs.buffer, op)
861}
862
863func (cs *clientStream) SendMsg(m interface{}) (err error) {
864 defer func() {
865 if err != nil && err != io.EOF {
866 // Call finish on the client stream for errors generated by this SendMsg
867 // call, as these indicate problems created by this client. (Transport
868 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
869 // error will be returned from RecvMsg eventually in that case, or be
870 // retried.)
871 cs.finish(err)
872 }
873 }()
874 if cs.sentLast {
875 return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
876 }
877 if !cs.desc.ClientStreams {
878 cs.sentLast = true
879 }
880
881 // load hdr, payload, data
882 hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
883 if err != nil {
884 return err
885 }
886
887 // TODO(dfawley): should we be checking len(data) instead?
888 if len(payload) > *cs.callInfo.maxSendMessageSize {
889 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
890 }
khenaidoo5fc5cea2021-08-11 17:39:16 -0400891 op := func(a *csAttempt) error {
Akash Kankanala761955c2024-02-21 19:32:20 +0530892 return a.sendMsg(m, hdr, payload, data)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400893 }
894 err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
Akash Kankanala761955c2024-02-21 19:32:20 +0530895 if len(cs.binlogs) != 0 && err == nil {
896 cm := &binarylog.ClientMessage{
khenaidoo5fc5cea2021-08-11 17:39:16 -0400897 OnClientSide: true,
Akash Kankanala761955c2024-02-21 19:32:20 +0530898 Message: data,
899 }
900 for _, binlog := range cs.binlogs {
901 binlog.Log(cs.ctx, cm)
902 }
khenaidoo5fc5cea2021-08-11 17:39:16 -0400903 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530904 return err
khenaidoo5fc5cea2021-08-11 17:39:16 -0400905}
906
907func (cs *clientStream) RecvMsg(m interface{}) error {
Akash Kankanala761955c2024-02-21 19:32:20 +0530908 if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400909 // Call Header() to binary log header if it's not already logged.
910 cs.Header()
911 }
912 var recvInfo *payloadInfo
Akash Kankanala761955c2024-02-21 19:32:20 +0530913 if len(cs.binlogs) != 0 {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400914 recvInfo = &payloadInfo{}
915 }
916 err := cs.withRetry(func(a *csAttempt) error {
917 return a.recvMsg(m, recvInfo)
918 }, cs.commitAttemptLocked)
Akash Kankanala761955c2024-02-21 19:32:20 +0530919 if len(cs.binlogs) != 0 && err == nil {
920 sm := &binarylog.ServerMessage{
khenaidoo5fc5cea2021-08-11 17:39:16 -0400921 OnClientSide: true,
922 Message: recvInfo.uncompressedBytes,
Akash Kankanala761955c2024-02-21 19:32:20 +0530923 }
924 for _, binlog := range cs.binlogs {
925 binlog.Log(cs.ctx, sm)
926 }
khenaidoo5fc5cea2021-08-11 17:39:16 -0400927 }
928 if err != nil || !cs.desc.ServerStreams {
929 // err != nil or non-server-streaming indicates end of stream.
930 cs.finish(err)
931
Akash Kankanala761955c2024-02-21 19:32:20 +0530932 if len(cs.binlogs) != 0 {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400933 // finish will not log Trailer. Log Trailer here.
934 logEntry := &binarylog.ServerTrailer{
935 OnClientSide: true,
936 Trailer: cs.Trailer(),
937 Err: err,
938 }
939 if logEntry.Err == io.EOF {
940 logEntry.Err = nil
941 }
942 if peer, ok := peer.FromContext(cs.Context()); ok {
943 logEntry.PeerAddr = peer.Addr
944 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530945 for _, binlog := range cs.binlogs {
946 binlog.Log(cs.ctx, logEntry)
947 }
khenaidoo5fc5cea2021-08-11 17:39:16 -0400948 }
949 }
950 return err
951}
952
953func (cs *clientStream) CloseSend() error {
954 if cs.sentLast {
955 // TODO: return an error and finish the stream instead, due to API misuse?
956 return nil
957 }
958 cs.sentLast = true
959 op := func(a *csAttempt) error {
960 a.t.Write(a.s, nil, nil, &transport.Options{Last: true})
961 // Always return nil; io.EOF is the only error that might make sense
962 // instead, but there is no need to signal the client to call RecvMsg
963 // as the only use left for the stream after CloseSend is to call
964 // RecvMsg. This also matches historical behavior.
965 return nil
966 }
967 cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) })
Akash Kankanala761955c2024-02-21 19:32:20 +0530968 if len(cs.binlogs) != 0 {
969 chc := &binarylog.ClientHalfClose{
khenaidoo5fc5cea2021-08-11 17:39:16 -0400970 OnClientSide: true,
Akash Kankanala761955c2024-02-21 19:32:20 +0530971 }
972 for _, binlog := range cs.binlogs {
973 binlog.Log(cs.ctx, chc)
974 }
khenaidoo5fc5cea2021-08-11 17:39:16 -0400975 }
976 // We never returned an error here for reasons.
977 return nil
978}
979
980func (cs *clientStream) finish(err error) {
981 if err == io.EOF {
982 // Ending a stream with EOF indicates a success.
983 err = nil
984 }
985 cs.mu.Lock()
986 if cs.finished {
987 cs.mu.Unlock()
988 return
989 }
990 cs.finished = true
Akash Kankanala761955c2024-02-21 19:32:20 +0530991 for _, onFinish := range cs.callInfo.onFinish {
992 onFinish(err)
993 }
khenaidoo5fc5cea2021-08-11 17:39:16 -0400994 cs.commitAttemptLocked()
995 if cs.attempt != nil {
996 cs.attempt.finish(err)
997 // after functions all rely upon having a stream.
998 if cs.attempt.s != nil {
999 for _, o := range cs.opts {
1000 o.after(cs.callInfo, cs.attempt)
1001 }
1002 }
1003 }
1004 cs.mu.Unlock()
1005 // For binary logging. only log cancel in finish (could be caused by RPC ctx
1006 // canceled or ClientConn closed). Trailer will be logged in RecvMsg.
1007 //
1008 // Only one of cancel or trailer needs to be logged. In the cases where
1009 // users don't call RecvMsg, users must have already canceled the RPC.
Akash Kankanala761955c2024-02-21 19:32:20 +05301010 if len(cs.binlogs) != 0 && status.Code(err) == codes.Canceled {
1011 c := &binarylog.Cancel{
khenaidoo5fc5cea2021-08-11 17:39:16 -04001012 OnClientSide: true,
Akash Kankanala761955c2024-02-21 19:32:20 +05301013 }
1014 for _, binlog := range cs.binlogs {
1015 binlog.Log(cs.ctx, c)
1016 }
khenaidoo5fc5cea2021-08-11 17:39:16 -04001017 }
1018 if err == nil {
1019 cs.retryThrottler.successfulRPC()
1020 }
1021 if channelz.IsOn() {
1022 if err != nil {
1023 cs.cc.incrCallsFailed()
1024 } else {
1025 cs.cc.incrCallsSucceeded()
1026 }
1027 }
1028 cs.cancel()
1029}
1030
1031func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
1032 cs := a.cs
1033 if a.trInfo != nil {
1034 a.mu.Lock()
1035 if a.trInfo.tr != nil {
1036 a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
1037 }
1038 a.mu.Unlock()
1039 }
1040 if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
1041 if !cs.desc.ClientStreams {
1042 // For non-client-streaming RPCs, we return nil instead of EOF on error
1043 // because the generated code requires it. finish is not called; RecvMsg()
1044 // will call it with the stream's status independently.
1045 return nil
1046 }
1047 return io.EOF
1048 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301049 for _, sh := range a.statsHandlers {
1050 sh.HandleRPC(a.ctx, outPayload(true, m, data, payld, time.Now()))
khenaidoo5fc5cea2021-08-11 17:39:16 -04001051 }
1052 if channelz.IsOn() {
1053 a.t.IncrMsgSent()
1054 }
1055 return nil
1056}
1057
1058func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
1059 cs := a.cs
Akash Kankanala761955c2024-02-21 19:32:20 +05301060 if len(a.statsHandlers) != 0 && payInfo == nil {
khenaidoo5fc5cea2021-08-11 17:39:16 -04001061 payInfo = &payloadInfo{}
1062 }
1063
1064 if !a.decompSet {
1065 // Block until we receive headers containing received message encoding.
1066 if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity {
1067 if a.dc == nil || a.dc.Type() != ct {
1068 // No configured decompressor, or it does not match the incoming
1069 // message encoding; attempt to find a registered compressor that does.
1070 a.dc = nil
1071 a.decomp = encoding.GetCompressor(ct)
1072 }
1073 } else {
1074 // No compression is used; disable our decompressor.
1075 a.dc = nil
1076 }
1077 // Only initialize this state once per stream.
1078 a.decompSet = true
1079 }
1080 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
1081 if err != nil {
1082 if err == io.EOF {
1083 if statusErr := a.s.Status().Err(); statusErr != nil {
1084 return statusErr
1085 }
1086 return io.EOF // indicates successful end of stream.
1087 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301088
khenaidoo5fc5cea2021-08-11 17:39:16 -04001089 return toRPCErr(err)
1090 }
1091 if a.trInfo != nil {
1092 a.mu.Lock()
1093 if a.trInfo.tr != nil {
1094 a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
1095 }
1096 a.mu.Unlock()
1097 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301098 for _, sh := range a.statsHandlers {
1099 sh.HandleRPC(a.ctx, &stats.InPayload{
khenaidoo5fc5cea2021-08-11 17:39:16 -04001100 Client: true,
1101 RecvTime: time.Now(),
1102 Payload: m,
1103 // TODO truncate large payload.
Akash Kankanala761955c2024-02-21 19:32:20 +05301104 Data: payInfo.uncompressedBytes,
1105 WireLength: payInfo.compressedLength + headerLen,
1106 CompressedLength: payInfo.compressedLength,
1107 Length: len(payInfo.uncompressedBytes),
khenaidoo5fc5cea2021-08-11 17:39:16 -04001108 })
1109 }
1110 if channelz.IsOn() {
1111 a.t.IncrMsgRecv()
1112 }
1113 if cs.desc.ServerStreams {
1114 // Subsequent messages should be received by subsequent RecvMsg calls.
1115 return nil
1116 }
1117 // Special handling for non-server-stream rpcs.
1118 // This recv expects EOF or errors, so we don't collect inPayload.
1119 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp)
1120 if err == nil {
1121 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
1122 }
1123 if err == io.EOF {
1124 return a.s.Status().Err() // non-server streaming Recv returns nil on success
1125 }
1126 return toRPCErr(err)
1127}
1128
1129func (a *csAttempt) finish(err error) {
1130 a.mu.Lock()
1131 if a.finished {
1132 a.mu.Unlock()
1133 return
1134 }
1135 a.finished = true
1136 if err == io.EOF {
1137 // Ending a stream with EOF indicates a success.
1138 err = nil
1139 }
1140 var tr metadata.MD
1141 if a.s != nil {
1142 a.t.CloseStream(a.s, err)
1143 tr = a.s.Trailer()
1144 }
1145
Akash Kankanala761955c2024-02-21 19:32:20 +05301146 if a.pickResult.Done != nil {
khenaidoo5fc5cea2021-08-11 17:39:16 -04001147 br := false
1148 if a.s != nil {
1149 br = a.s.BytesReceived()
1150 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301151 a.pickResult.Done(balancer.DoneInfo{
khenaidoo5fc5cea2021-08-11 17:39:16 -04001152 Err: err,
1153 Trailer: tr,
1154 BytesSent: a.s != nil,
1155 BytesReceived: br,
1156 ServerLoad: balancerload.Parse(tr),
1157 })
1158 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301159 for _, sh := range a.statsHandlers {
khenaidoo5fc5cea2021-08-11 17:39:16 -04001160 end := &stats.End{
1161 Client: true,
1162 BeginTime: a.beginTime,
1163 EndTime: time.Now(),
1164 Trailer: tr,
1165 Error: err,
1166 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301167 sh.HandleRPC(a.ctx, end)
khenaidoo5fc5cea2021-08-11 17:39:16 -04001168 }
1169 if a.trInfo != nil && a.trInfo.tr != nil {
1170 if err == nil {
1171 a.trInfo.tr.LazyPrintf("RPC: [OK]")
1172 } else {
1173 a.trInfo.tr.LazyPrintf("RPC: [%v]", err)
1174 a.trInfo.tr.SetError()
1175 }
1176 a.trInfo.tr.Finish()
1177 a.trInfo.tr = nil
1178 }
1179 a.mu.Unlock()
1180}
1181
1182// newClientStream creates a ClientStream with the specified transport, on the
1183// given addrConn.
1184//
1185// It's expected that the given transport is either the same one in addrConn, or
1186// is already closed. To avoid race, transport is specified separately, instead
1187// of using ac.transpot.
1188//
1189// Main difference between this and ClientConn.NewStream:
1190// - no retry
1191// - no service config (or wait for service config)
1192// - no tracing or stats
1193func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, ac *addrConn, opts ...CallOption) (_ ClientStream, err error) {
1194 if t == nil {
1195 // TODO: return RPC error here?
1196 return nil, errors.New("transport provided is nil")
1197 }
1198 // defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct.
1199 c := &callInfo{}
1200
1201 // Possible context leak:
1202 // The cancel function for the child context we create will only be called
1203 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
1204 // an error is generated by SendMsg.
1205 // https://github.com/grpc/grpc-go/issues/1818.
1206 ctx, cancel := context.WithCancel(ctx)
1207 defer func() {
1208 if err != nil {
1209 cancel()
1210 }
1211 }()
1212
1213 for _, o := range opts {
1214 if err := o.before(c); err != nil {
1215 return nil, toRPCErr(err)
1216 }
1217 }
1218 c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
1219 c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
1220 if err := setCallInfoCodec(c); err != nil {
1221 return nil, err
1222 }
1223
1224 callHdr := &transport.CallHdr{
1225 Host: ac.cc.authority,
1226 Method: method,
1227 ContentSubtype: c.contentSubtype,
1228 }
1229
1230 // Set our outgoing compression according to the UseCompressor CallOption, if
1231 // set. In that case, also find the compressor from the encoding package.
1232 // Otherwise, use the compressor configured by the WithCompressor DialOption,
1233 // if set.
1234 var cp Compressor
1235 var comp encoding.Compressor
1236 if ct := c.compressorType; ct != "" {
1237 callHdr.SendCompress = ct
1238 if ct != encoding.Identity {
1239 comp = encoding.GetCompressor(ct)
1240 if comp == nil {
1241 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
1242 }
1243 }
1244 } else if ac.cc.dopts.cp != nil {
1245 callHdr.SendCompress = ac.cc.dopts.cp.Type()
1246 cp = ac.cc.dopts.cp
1247 }
1248 if c.creds != nil {
1249 callHdr.Creds = c.creds
1250 }
1251
1252 // Use a special addrConnStream to avoid retry.
1253 as := &addrConnStream{
1254 callHdr: callHdr,
1255 ac: ac,
1256 ctx: ctx,
1257 cancel: cancel,
1258 opts: opts,
1259 callInfo: c,
1260 desc: desc,
1261 codec: c.codec,
1262 cp: cp,
1263 comp: comp,
1264 t: t,
1265 }
1266
1267 s, err := as.t.NewStream(as.ctx, as.callHdr)
1268 if err != nil {
1269 err = toRPCErr(err)
1270 return nil, err
1271 }
1272 as.s = s
1273 as.p = &parser{r: s}
1274 ac.incrCallsStarted()
1275 if desc != unaryStreamDesc {
Akash Kankanala761955c2024-02-21 19:32:20 +05301276 // Listen on stream context to cleanup when the stream context is
1277 // canceled. Also listen for the addrConn's context in case the
1278 // addrConn is closed or reconnects to a different address. In all
1279 // other cases, an error should already be injected into the recv
1280 // buffer by the transport, which the client will eventually receive,
1281 // and then we will cancel the stream's context in
1282 // addrConnStream.finish.
khenaidoo5fc5cea2021-08-11 17:39:16 -04001283 go func() {
Akash Kankanala761955c2024-02-21 19:32:20 +05301284 ac.mu.Lock()
1285 acCtx := ac.ctx
1286 ac.mu.Unlock()
khenaidoo5fc5cea2021-08-11 17:39:16 -04001287 select {
Akash Kankanala761955c2024-02-21 19:32:20 +05301288 case <-acCtx.Done():
khenaidoo5fc5cea2021-08-11 17:39:16 -04001289 as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
1290 case <-ctx.Done():
1291 as.finish(toRPCErr(ctx.Err()))
1292 }
1293 }()
1294 }
1295 return as, nil
1296}
1297
1298type addrConnStream struct {
1299 s *transport.Stream
1300 ac *addrConn
1301 callHdr *transport.CallHdr
1302 cancel context.CancelFunc
1303 opts []CallOption
1304 callInfo *callInfo
1305 t transport.ClientTransport
1306 ctx context.Context
1307 sentLast bool
1308 desc *StreamDesc
1309 codec baseCodec
1310 cp Compressor
1311 comp encoding.Compressor
1312 decompSet bool
1313 dc Decompressor
1314 decomp encoding.Compressor
1315 p *parser
1316 mu sync.Mutex
1317 finished bool
1318}
1319
1320func (as *addrConnStream) Header() (metadata.MD, error) {
1321 m, err := as.s.Header()
1322 if err != nil {
1323 as.finish(toRPCErr(err))
1324 }
1325 return m, err
1326}
1327
1328func (as *addrConnStream) Trailer() metadata.MD {
1329 return as.s.Trailer()
1330}
1331
1332func (as *addrConnStream) CloseSend() error {
1333 if as.sentLast {
1334 // TODO: return an error and finish the stream instead, due to API misuse?
1335 return nil
1336 }
1337 as.sentLast = true
1338
1339 as.t.Write(as.s, nil, nil, &transport.Options{Last: true})
1340 // Always return nil; io.EOF is the only error that might make sense
1341 // instead, but there is no need to signal the client to call RecvMsg
1342 // as the only use left for the stream after CloseSend is to call
1343 // RecvMsg. This also matches historical behavior.
1344 return nil
1345}
1346
1347func (as *addrConnStream) Context() context.Context {
1348 return as.s.Context()
1349}
1350
1351func (as *addrConnStream) SendMsg(m interface{}) (err error) {
1352 defer func() {
1353 if err != nil && err != io.EOF {
1354 // Call finish on the client stream for errors generated by this SendMsg
1355 // call, as these indicate problems created by this client. (Transport
1356 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
1357 // error will be returned from RecvMsg eventually in that case, or be
1358 // retried.)
1359 as.finish(err)
1360 }
1361 }()
1362 if as.sentLast {
1363 return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
1364 }
1365 if !as.desc.ClientStreams {
1366 as.sentLast = true
1367 }
1368
1369 // load hdr, payload, data
1370 hdr, payld, _, err := prepareMsg(m, as.codec, as.cp, as.comp)
1371 if err != nil {
1372 return err
1373 }
1374
1375 // TODO(dfawley): should we be checking len(data) instead?
1376 if len(payld) > *as.callInfo.maxSendMessageSize {
1377 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize)
1378 }
1379
1380 if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams}); err != nil {
1381 if !as.desc.ClientStreams {
1382 // For non-client-streaming RPCs, we return nil instead of EOF on error
1383 // because the generated code requires it. finish is not called; RecvMsg()
1384 // will call it with the stream's status independently.
1385 return nil
1386 }
1387 return io.EOF
1388 }
1389
1390 if channelz.IsOn() {
1391 as.t.IncrMsgSent()
1392 }
1393 return nil
1394}
1395
1396func (as *addrConnStream) RecvMsg(m interface{}) (err error) {
1397 defer func() {
1398 if err != nil || !as.desc.ServerStreams {
1399 // err != nil or non-server-streaming indicates end of stream.
1400 as.finish(err)
1401 }
1402 }()
1403
1404 if !as.decompSet {
1405 // Block until we receive headers containing received message encoding.
1406 if ct := as.s.RecvCompress(); ct != "" && ct != encoding.Identity {
1407 if as.dc == nil || as.dc.Type() != ct {
1408 // No configured decompressor, or it does not match the incoming
1409 // message encoding; attempt to find a registered compressor that does.
1410 as.dc = nil
1411 as.decomp = encoding.GetCompressor(ct)
1412 }
1413 } else {
1414 // No compression is used; disable our decompressor.
1415 as.dc = nil
1416 }
1417 // Only initialize this state once per stream.
1418 as.decompSet = true
1419 }
1420 err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
1421 if err != nil {
1422 if err == io.EOF {
1423 if statusErr := as.s.Status().Err(); statusErr != nil {
1424 return statusErr
1425 }
1426 return io.EOF // indicates successful end of stream.
1427 }
1428 return toRPCErr(err)
1429 }
1430
1431 if channelz.IsOn() {
1432 as.t.IncrMsgRecv()
1433 }
1434 if as.desc.ServerStreams {
1435 // Subsequent messages should be received by subsequent RecvMsg calls.
1436 return nil
1437 }
1438
1439 // Special handling for non-server-stream rpcs.
1440 // This recv expects EOF or errors, so we don't collect inPayload.
1441 err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
1442 if err == nil {
1443 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
1444 }
1445 if err == io.EOF {
1446 return as.s.Status().Err() // non-server streaming Recv returns nil on success
1447 }
1448 return toRPCErr(err)
1449}
1450
1451func (as *addrConnStream) finish(err error) {
1452 as.mu.Lock()
1453 if as.finished {
1454 as.mu.Unlock()
1455 return
1456 }
1457 as.finished = true
1458 if err == io.EOF {
1459 // Ending a stream with EOF indicates a success.
1460 err = nil
1461 }
1462 if as.s != nil {
1463 as.t.CloseStream(as.s, err)
1464 }
1465
1466 if err != nil {
1467 as.ac.incrCallsFailed()
1468 } else {
1469 as.ac.incrCallsSucceeded()
1470 }
1471 as.cancel()
1472 as.mu.Unlock()
1473}
1474
1475// ServerStream defines the server-side behavior of a streaming RPC.
1476//
Akash Kankanala761955c2024-02-21 19:32:20 +05301477// Errors returned from ServerStream methods are compatible with the status
1478// package. However, the status code will often not match the RPC status as
1479// seen by the client application, and therefore, should not be relied upon for
1480// this purpose.
khenaidoo5fc5cea2021-08-11 17:39:16 -04001481type ServerStream interface {
1482 // SetHeader sets the header metadata. It may be called multiple times.
1483 // When call multiple times, all the provided metadata will be merged.
1484 // All the metadata will be sent out when one of the following happens:
1485 // - ServerStream.SendHeader() is called;
1486 // - The first response is sent out;
1487 // - An RPC status is sent out (error or success).
1488 SetHeader(metadata.MD) error
1489 // SendHeader sends the header metadata.
1490 // The provided md and headers set by SetHeader() will be sent.
1491 // It fails if called multiple times.
1492 SendHeader(metadata.MD) error
1493 // SetTrailer sets the trailer metadata which will be sent with the RPC status.
1494 // When called more than once, all the provided metadata will be merged.
1495 SetTrailer(metadata.MD)
1496 // Context returns the context for this stream.
1497 Context() context.Context
1498 // SendMsg sends a message. On error, SendMsg aborts the stream and the
1499 // error is returned directly.
1500 //
1501 // SendMsg blocks until:
1502 // - There is sufficient flow control to schedule m with the transport, or
1503 // - The stream is done, or
1504 // - The stream breaks.
1505 //
1506 // SendMsg does not wait until the message is received by the client. An
1507 // untimely stream closure may result in lost messages.
1508 //
1509 // It is safe to have a goroutine calling SendMsg and another goroutine
1510 // calling RecvMsg on the same stream at the same time, but it is not safe
1511 // to call SendMsg on the same stream in different goroutines.
Akash Kankanala761955c2024-02-21 19:32:20 +05301512 //
1513 // It is not safe to modify the message after calling SendMsg. Tracing
1514 // libraries and stats handlers may use the message lazily.
khenaidoo5fc5cea2021-08-11 17:39:16 -04001515 SendMsg(m interface{}) error
1516 // RecvMsg blocks until it receives a message into m or the stream is
1517 // done. It returns io.EOF when the client has performed a CloseSend. On
1518 // any non-EOF error, the stream is aborted and the error contains the
1519 // RPC status.
1520 //
1521 // It is safe to have a goroutine calling SendMsg and another goroutine
1522 // calling RecvMsg on the same stream at the same time, but it is not
1523 // safe to call RecvMsg on the same stream in different goroutines.
1524 RecvMsg(m interface{}) error
1525}
1526
1527// serverStream implements a server side Stream.
1528type serverStream struct {
1529 ctx context.Context
1530 t transport.ServerTransport
1531 s *transport.Stream
1532 p *parser
1533 codec baseCodec
1534
1535 cp Compressor
1536 dc Decompressor
1537 comp encoding.Compressor
1538 decomp encoding.Compressor
1539
Akash Kankanala761955c2024-02-21 19:32:20 +05301540 sendCompressorName string
1541
khenaidoo5fc5cea2021-08-11 17:39:16 -04001542 maxReceiveMessageSize int
1543 maxSendMessageSize int
1544 trInfo *traceInfo
1545
Akash Kankanala761955c2024-02-21 19:32:20 +05301546 statsHandler []stats.Handler
khenaidoo5fc5cea2021-08-11 17:39:16 -04001547
Akash Kankanala761955c2024-02-21 19:32:20 +05301548 binlogs []binarylog.MethodLogger
khenaidoo5fc5cea2021-08-11 17:39:16 -04001549 // serverHeaderBinlogged indicates whether server header has been logged. It
1550 // will happen when one of the following two happens: stream.SendHeader(),
1551 // stream.Send().
1552 //
1553 // It's only checked in send and sendHeader, doesn't need to be
1554 // synchronized.
1555 serverHeaderBinlogged bool
1556
1557 mu sync.Mutex // protects trInfo.tr after the service handler runs.
1558}
1559
1560func (ss *serverStream) Context() context.Context {
1561 return ss.ctx
1562}
1563
1564func (ss *serverStream) SetHeader(md metadata.MD) error {
1565 if md.Len() == 0 {
1566 return nil
1567 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301568 err := imetadata.Validate(md)
1569 if err != nil {
1570 return status.Error(codes.Internal, err.Error())
1571 }
khenaidoo5fc5cea2021-08-11 17:39:16 -04001572 return ss.s.SetHeader(md)
1573}
1574
1575func (ss *serverStream) SendHeader(md metadata.MD) error {
Akash Kankanala761955c2024-02-21 19:32:20 +05301576 err := imetadata.Validate(md)
1577 if err != nil {
1578 return status.Error(codes.Internal, err.Error())
1579 }
1580
1581 err = ss.t.WriteHeader(ss.s, md)
1582 if len(ss.binlogs) != 0 && !ss.serverHeaderBinlogged {
khenaidoo5fc5cea2021-08-11 17:39:16 -04001583 h, _ := ss.s.Header()
Akash Kankanala761955c2024-02-21 19:32:20 +05301584 sh := &binarylog.ServerHeader{
khenaidoo5fc5cea2021-08-11 17:39:16 -04001585 Header: h,
Akash Kankanala761955c2024-02-21 19:32:20 +05301586 }
khenaidoo5fc5cea2021-08-11 17:39:16 -04001587 ss.serverHeaderBinlogged = true
Akash Kankanala761955c2024-02-21 19:32:20 +05301588 for _, binlog := range ss.binlogs {
1589 binlog.Log(ss.ctx, sh)
1590 }
khenaidoo5fc5cea2021-08-11 17:39:16 -04001591 }
1592 return err
1593}
1594
1595func (ss *serverStream) SetTrailer(md metadata.MD) {
1596 if md.Len() == 0 {
1597 return
1598 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301599 if err := imetadata.Validate(md); err != nil {
1600 logger.Errorf("stream: failed to validate md when setting trailer, err: %v", err)
1601 }
khenaidoo5fc5cea2021-08-11 17:39:16 -04001602 ss.s.SetTrailer(md)
1603}
1604
1605func (ss *serverStream) SendMsg(m interface{}) (err error) {
1606 defer func() {
1607 if ss.trInfo != nil {
1608 ss.mu.Lock()
1609 if ss.trInfo.tr != nil {
1610 if err == nil {
1611 ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
1612 } else {
1613 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1614 ss.trInfo.tr.SetError()
1615 }
1616 }
1617 ss.mu.Unlock()
1618 }
1619 if err != nil && err != io.EOF {
1620 st, _ := status.FromError(toRPCErr(err))
1621 ss.t.WriteStatus(ss.s, st)
1622 // Non-user specified status was sent out. This should be an error
1623 // case (as a server side Cancel maybe).
1624 //
1625 // This is not handled specifically now. User will return a final
1626 // status from the service handler, we will log that error instead.
1627 // This behavior is similar to an interceptor.
1628 }
1629 if channelz.IsOn() && err == nil {
1630 ss.t.IncrMsgSent()
1631 }
1632 }()
1633
Akash Kankanala761955c2024-02-21 19:32:20 +05301634 // Server handler could have set new compressor by calling SetSendCompressor.
1635 // In case it is set, we need to use it for compressing outbound message.
1636 if sendCompressorsName := ss.s.SendCompress(); sendCompressorsName != ss.sendCompressorName {
1637 ss.comp = encoding.GetCompressor(sendCompressorsName)
1638 ss.sendCompressorName = sendCompressorsName
1639 }
1640
khenaidoo5fc5cea2021-08-11 17:39:16 -04001641 // load hdr, payload, data
1642 hdr, payload, data, err := prepareMsg(m, ss.codec, ss.cp, ss.comp)
1643 if err != nil {
1644 return err
1645 }
1646
1647 // TODO(dfawley): should we be checking len(data) instead?
1648 if len(payload) > ss.maxSendMessageSize {
1649 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize)
1650 }
1651 if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil {
1652 return toRPCErr(err)
1653 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301654 if len(ss.binlogs) != 0 {
khenaidoo5fc5cea2021-08-11 17:39:16 -04001655 if !ss.serverHeaderBinlogged {
1656 h, _ := ss.s.Header()
Akash Kankanala761955c2024-02-21 19:32:20 +05301657 sh := &binarylog.ServerHeader{
khenaidoo5fc5cea2021-08-11 17:39:16 -04001658 Header: h,
Akash Kankanala761955c2024-02-21 19:32:20 +05301659 }
khenaidoo5fc5cea2021-08-11 17:39:16 -04001660 ss.serverHeaderBinlogged = true
Akash Kankanala761955c2024-02-21 19:32:20 +05301661 for _, binlog := range ss.binlogs {
1662 binlog.Log(ss.ctx, sh)
1663 }
khenaidoo5fc5cea2021-08-11 17:39:16 -04001664 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301665 sm := &binarylog.ServerMessage{
khenaidoo5fc5cea2021-08-11 17:39:16 -04001666 Message: data,
Akash Kankanala761955c2024-02-21 19:32:20 +05301667 }
1668 for _, binlog := range ss.binlogs {
1669 binlog.Log(ss.ctx, sm)
1670 }
khenaidoo5fc5cea2021-08-11 17:39:16 -04001671 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301672 if len(ss.statsHandler) != 0 {
1673 for _, sh := range ss.statsHandler {
1674 sh.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
1675 }
khenaidoo5fc5cea2021-08-11 17:39:16 -04001676 }
1677 return nil
1678}
1679
1680func (ss *serverStream) RecvMsg(m interface{}) (err error) {
1681 defer func() {
1682 if ss.trInfo != nil {
1683 ss.mu.Lock()
1684 if ss.trInfo.tr != nil {
1685 if err == nil {
1686 ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
1687 } else if err != io.EOF {
1688 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1689 ss.trInfo.tr.SetError()
1690 }
1691 }
1692 ss.mu.Unlock()
1693 }
1694 if err != nil && err != io.EOF {
1695 st, _ := status.FromError(toRPCErr(err))
1696 ss.t.WriteStatus(ss.s, st)
1697 // Non-user specified status was sent out. This should be an error
1698 // case (as a server side Cancel maybe).
1699 //
1700 // This is not handled specifically now. User will return a final
1701 // status from the service handler, we will log that error instead.
1702 // This behavior is similar to an interceptor.
1703 }
1704 if channelz.IsOn() && err == nil {
1705 ss.t.IncrMsgRecv()
1706 }
1707 }()
1708 var payInfo *payloadInfo
Akash Kankanala761955c2024-02-21 19:32:20 +05301709 if len(ss.statsHandler) != 0 || len(ss.binlogs) != 0 {
khenaidoo5fc5cea2021-08-11 17:39:16 -04001710 payInfo = &payloadInfo{}
1711 }
1712 if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil {
1713 if err == io.EOF {
Akash Kankanala761955c2024-02-21 19:32:20 +05301714 if len(ss.binlogs) != 0 {
1715 chc := &binarylog.ClientHalfClose{}
1716 for _, binlog := range ss.binlogs {
1717 binlog.Log(ss.ctx, chc)
1718 }
khenaidoo5fc5cea2021-08-11 17:39:16 -04001719 }
1720 return err
1721 }
1722 if err == io.ErrUnexpectedEOF {
1723 err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
1724 }
1725 return toRPCErr(err)
1726 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301727 if len(ss.statsHandler) != 0 {
1728 for _, sh := range ss.statsHandler {
1729 sh.HandleRPC(ss.s.Context(), &stats.InPayload{
1730 RecvTime: time.Now(),
1731 Payload: m,
1732 // TODO truncate large payload.
1733 Data: payInfo.uncompressedBytes,
1734 Length: len(payInfo.uncompressedBytes),
1735 WireLength: payInfo.compressedLength + headerLen,
1736 CompressedLength: payInfo.compressedLength,
1737 })
1738 }
khenaidoo5fc5cea2021-08-11 17:39:16 -04001739 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301740 if len(ss.binlogs) != 0 {
1741 cm := &binarylog.ClientMessage{
khenaidoo5fc5cea2021-08-11 17:39:16 -04001742 Message: payInfo.uncompressedBytes,
Akash Kankanala761955c2024-02-21 19:32:20 +05301743 }
1744 for _, binlog := range ss.binlogs {
1745 binlog.Log(ss.ctx, cm)
1746 }
khenaidoo5fc5cea2021-08-11 17:39:16 -04001747 }
1748 return nil
1749}
1750
1751// MethodFromServerStream returns the method string for the input stream.
1752// The returned string is in the format of "/service/method".
1753func MethodFromServerStream(stream ServerStream) (string, bool) {
1754 return Method(stream.Context())
1755}
1756
1757// prepareMsg returns the hdr, payload and data
1758// using the compressors passed or using the
1759// passed preparedmsg
1760func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) {
1761 if preparedMsg, ok := m.(*PreparedMsg); ok {
1762 return preparedMsg.hdr, preparedMsg.payload, preparedMsg.encodedData, nil
1763 }
1764 // The input interface is not a prepared msg.
1765 // Marshal and Compress the data at this point
1766 data, err = encode(codec, m)
1767 if err != nil {
1768 return nil, nil, nil, err
1769 }
1770 compData, err := compress(data, cp, comp)
1771 if err != nil {
1772 return nil, nil, nil, err
1773 }
1774 hdr, payload = msgHeader(data, compData)
1775 return hdr, payload, data, nil
1776}