blob: 4d7e89067c22d663140397309874cb53d8740f29 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package transport defines and implements message oriented communication
20// channel to complete various transactions (e.g., an RPC). It is meant for
21// grpc-internal usage and is not intended to be imported directly by users.
22package transport
23
24import (
25 "context"
26 "errors"
27 "fmt"
28 "io"
29 "net"
30 "sync"
31 "sync/atomic"
32
33 "google.golang.org/grpc/codes"
34 "google.golang.org/grpc/credentials"
35 "google.golang.org/grpc/keepalive"
36 "google.golang.org/grpc/metadata"
37 "google.golang.org/grpc/stats"
38 "google.golang.org/grpc/status"
39 "google.golang.org/grpc/tap"
40)
41
42// recvMsg represents the received msg from the transport. All transport
43// protocol specific info has been removed.
44type recvMsg struct {
45 data []byte
46 // nil: received some data
47 // io.EOF: stream is completed. data is nil.
48 // other non-nil error: transport failure. data is nil.
49 err error
50}
51
52// recvBuffer is an unbounded channel of recvMsg structs.
53// Note recvBuffer differs from controlBuffer only in that recvBuffer
54// holds a channel of only recvMsg structs instead of objects implementing "item" interface.
55// recvBuffer is written to much more often than
56// controlBuffer and using strict recvMsg structs helps avoid allocation in "recvBuffer.put"
57type recvBuffer struct {
58 c chan recvMsg
59 mu sync.Mutex
60 backlog []recvMsg
61 err error
62}
63
64func newRecvBuffer() *recvBuffer {
65 b := &recvBuffer{
66 c: make(chan recvMsg, 1),
67 }
68 return b
69}
70
71func (b *recvBuffer) put(r recvMsg) {
72 b.mu.Lock()
73 if b.err != nil {
74 b.mu.Unlock()
75 // An error had occurred earlier, don't accept more
76 // data or errors.
77 return
78 }
79 b.err = r.err
80 if len(b.backlog) == 0 {
81 select {
82 case b.c <- r:
83 b.mu.Unlock()
84 return
85 default:
86 }
87 }
88 b.backlog = append(b.backlog, r)
89 b.mu.Unlock()
90}
91
92func (b *recvBuffer) load() {
93 b.mu.Lock()
94 if len(b.backlog) > 0 {
95 select {
96 case b.c <- b.backlog[0]:
97 b.backlog[0] = recvMsg{}
98 b.backlog = b.backlog[1:]
99 default:
100 }
101 }
102 b.mu.Unlock()
103}
104
105// get returns the channel that receives a recvMsg in the buffer.
106//
107// Upon receipt of a recvMsg, the caller should call load to send another
108// recvMsg onto the channel if there is any.
109func (b *recvBuffer) get() <-chan recvMsg {
110 return b.c
111}
112
113//
114// recvBufferReader implements io.Reader interface to read the data from
115// recvBuffer.
116type recvBufferReader struct {
117 ctx context.Context
118 ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
119 recv *recvBuffer
120 last []byte // Stores the remaining data in the previous calls.
121 err error
122}
123
124// Read reads the next len(p) bytes from last. If last is drained, it tries to
125// read additional data from recv. It blocks if there no additional data available
126// in recv. If Read returns any non-nil error, it will continue to return that error.
127func (r *recvBufferReader) Read(p []byte) (n int, err error) {
128 if r.err != nil {
129 return 0, r.err
130 }
131 n, r.err = r.read(p)
132 return n, r.err
133}
134
135func (r *recvBufferReader) read(p []byte) (n int, err error) {
136 if r.last != nil && len(r.last) > 0 {
137 // Read remaining data left in last call.
138 copied := copy(p, r.last)
139 r.last = r.last[copied:]
140 return copied, nil
141 }
142 select {
143 case <-r.ctxDone:
144 return 0, ContextErr(r.ctx.Err())
145 case m := <-r.recv.get():
146 r.recv.load()
147 if m.err != nil {
148 return 0, m.err
149 }
150 copied := copy(p, m.data)
151 r.last = m.data[copied:]
152 return copied, nil
153 }
154}
155
156type streamState uint32
157
158const (
159 streamActive streamState = iota
160 streamWriteDone // EndStream sent
161 streamReadDone // EndStream received
162 streamDone // the entire stream is finished.
163)
164
165// Stream represents an RPC in the transport layer.
166type Stream struct {
167 id uint32
168 st ServerTransport // nil for client side Stream
169 ctx context.Context // the associated context of the stream
170 cancel context.CancelFunc // always nil for client side Stream
171 done chan struct{} // closed at the end of stream to unblock writers. On the client side.
172 ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance)
173 method string // the associated RPC method of the stream
174 recvCompress string
175 sendCompress string
176 buf *recvBuffer
177 trReader io.Reader
178 fc *inFlow
179 wq *writeQuota
180
181 // Callback to state application's intentions to read data. This
182 // is used to adjust flow control, if needed.
183 requestRead func(int)
184
185 headerChan chan struct{} // closed to indicate the end of header metadata.
186 headerDone uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
187
188 // hdrMu protects header and trailer metadata on the server-side.
189 hdrMu sync.Mutex
190 // On client side, header keeps the received header metadata.
191 //
192 // On server side, header keeps the header set by SetHeader(). The complete
193 // header will merged into this after t.WriteHeader() is called.
194 header metadata.MD
195 trailer metadata.MD // the key-value map of trailer metadata.
196
197 noHeaders bool // set if the client never received headers (set only after the stream is done).
198
199 // On the server-side, headerSent is atomically set to 1 when the headers are sent out.
200 headerSent uint32
201
202 state streamState
203
204 // On client-side it is the status error received from the server.
205 // On server-side it is unused.
206 status *status.Status
207
208 bytesReceived uint32 // indicates whether any bytes have been received on this stream
209 unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream
210
211 // contentSubtype is the content-subtype for requests.
212 // this must be lowercase or the behavior is undefined.
213 contentSubtype string
214}
215
216// isHeaderSent is only valid on the server-side.
217func (s *Stream) isHeaderSent() bool {
218 return atomic.LoadUint32(&s.headerSent) == 1
219}
220
221// updateHeaderSent updates headerSent and returns true
222// if it was alreay set. It is valid only on server-side.
223func (s *Stream) updateHeaderSent() bool {
224 return atomic.SwapUint32(&s.headerSent, 1) == 1
225}
226
227func (s *Stream) swapState(st streamState) streamState {
228 return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
229}
230
231func (s *Stream) compareAndSwapState(oldState, newState streamState) bool {
232 return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState))
233}
234
235func (s *Stream) getState() streamState {
236 return streamState(atomic.LoadUint32((*uint32)(&s.state)))
237}
238
239func (s *Stream) waitOnHeader() error {
240 if s.headerChan == nil {
241 // On the server headerChan is always nil since a stream originates
242 // only after having received headers.
243 return nil
244 }
245 select {
246 case <-s.ctx.Done():
247 return ContextErr(s.ctx.Err())
248 case <-s.headerChan:
249 return nil
250 }
251}
252
253// RecvCompress returns the compression algorithm applied to the inbound
254// message. It is empty string if there is no compression applied.
255func (s *Stream) RecvCompress() string {
256 if err := s.waitOnHeader(); err != nil {
257 return ""
258 }
259 return s.recvCompress
260}
261
262// SetSendCompress sets the compression algorithm to the stream.
263func (s *Stream) SetSendCompress(str string) {
264 s.sendCompress = str
265}
266
267// Done returns a channel which is closed when it receives the final status
268// from the server.
269func (s *Stream) Done() <-chan struct{} {
270 return s.done
271}
272
273// Header returns the header metadata of the stream.
274//
275// On client side, it acquires the key-value pairs of header metadata once it is
276// available. It blocks until i) the metadata is ready or ii) there is no header
277// metadata or iii) the stream is canceled/expired.
278//
279// On server side, it returns the out header after t.WriteHeader is called.
280func (s *Stream) Header() (metadata.MD, error) {
281 if s.headerChan == nil && s.header != nil {
282 // On server side, return the header in stream. It will be the out
283 // header after t.WriteHeader is called.
284 return s.header.Copy(), nil
285 }
286 err := s.waitOnHeader()
287 // Even if the stream is closed, header is returned if available.
288 select {
289 case <-s.headerChan:
290 if s.header == nil {
291 return nil, nil
292 }
293 return s.header.Copy(), nil
294 default:
295 }
296 return nil, err
297}
298
299// TrailersOnly blocks until a header or trailers-only frame is received and
300// then returns true if the stream was trailers-only. If the stream ends
301// before headers are received, returns true, nil. If a context error happens
302// first, returns it as a status error. Client-side only.
303func (s *Stream) TrailersOnly() (bool, error) {
304 err := s.waitOnHeader()
305 if err != nil {
306 return false, err
307 }
308 // if !headerDone, some other connection error occurred.
309 return s.noHeaders && atomic.LoadUint32(&s.headerDone) == 1, nil
310}
311
312// Trailer returns the cached trailer metedata. Note that if it is not called
313// after the entire stream is done, it could return an empty MD. Client
314// side only.
315// It can be safely read only after stream has ended that is either read
316// or write have returned io.EOF.
317func (s *Stream) Trailer() metadata.MD {
318 c := s.trailer.Copy()
319 return c
320}
321
322// ContentSubtype returns the content-subtype for a request. For example, a
323// content-subtype of "proto" will result in a content-type of
324// "application/grpc+proto". This will always be lowercase. See
325// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
326// more details.
327func (s *Stream) ContentSubtype() string {
328 return s.contentSubtype
329}
330
331// Context returns the context of the stream.
332func (s *Stream) Context() context.Context {
333 return s.ctx
334}
335
336// Method returns the method for the stream.
337func (s *Stream) Method() string {
338 return s.method
339}
340
341// Status returns the status received from the server.
342// Status can be read safely only after the stream has ended,
343// that is, after Done() is closed.
344func (s *Stream) Status() *status.Status {
345 return s.status
346}
347
348// SetHeader sets the header metadata. This can be called multiple times.
349// Server side only.
350// This should not be called in parallel to other data writes.
351func (s *Stream) SetHeader(md metadata.MD) error {
352 if md.Len() == 0 {
353 return nil
354 }
355 if s.isHeaderSent() || s.getState() == streamDone {
356 return ErrIllegalHeaderWrite
357 }
358 s.hdrMu.Lock()
359 s.header = metadata.Join(s.header, md)
360 s.hdrMu.Unlock()
361 return nil
362}
363
364// SendHeader sends the given header metadata. The given metadata is
365// combined with any metadata set by previous calls to SetHeader and
366// then written to the transport stream.
367func (s *Stream) SendHeader(md metadata.MD) error {
368 return s.st.WriteHeader(s, md)
369}
370
371// SetTrailer sets the trailer metadata which will be sent with the RPC status
372// by the server. This can be called multiple times. Server side only.
373// This should not be called parallel to other data writes.
374func (s *Stream) SetTrailer(md metadata.MD) error {
375 if md.Len() == 0 {
376 return nil
377 }
378 if s.getState() == streamDone {
379 return ErrIllegalHeaderWrite
380 }
381 s.hdrMu.Lock()
382 s.trailer = metadata.Join(s.trailer, md)
383 s.hdrMu.Unlock()
384 return nil
385}
386
387func (s *Stream) write(m recvMsg) {
388 s.buf.put(m)
389}
390
391// Read reads all p bytes from the wire for this stream.
392func (s *Stream) Read(p []byte) (n int, err error) {
393 // Don't request a read if there was an error earlier
394 if er := s.trReader.(*transportReader).er; er != nil {
395 return 0, er
396 }
397 s.requestRead(len(p))
398 return io.ReadFull(s.trReader, p)
399}
400
401// tranportReader reads all the data available for this Stream from the transport and
402// passes them into the decoder, which converts them into a gRPC message stream.
403// The error is io.EOF when the stream is done or another non-nil error if
404// the stream broke.
405type transportReader struct {
406 reader io.Reader
407 // The handler to control the window update procedure for both this
408 // particular stream and the associated transport.
409 windowHandler func(int)
410 er error
411}
412
413func (t *transportReader) Read(p []byte) (n int, err error) {
414 n, err = t.reader.Read(p)
415 if err != nil {
416 t.er = err
417 return
418 }
419 t.windowHandler(n)
420 return
421}
422
423// BytesReceived indicates whether any bytes have been received on this stream.
424func (s *Stream) BytesReceived() bool {
425 return atomic.LoadUint32(&s.bytesReceived) == 1
426}
427
428// Unprocessed indicates whether the server did not process this stream --
429// i.e. it sent a refused stream or GOAWAY including this stream ID.
430func (s *Stream) Unprocessed() bool {
431 return atomic.LoadUint32(&s.unprocessed) == 1
432}
433
434// GoString is implemented by Stream so context.String() won't
435// race when printing %#v.
436func (s *Stream) GoString() string {
437 return fmt.Sprintf("<stream: %p, %v>", s, s.method)
438}
439
440// state of transport
441type transportState int
442
443const (
444 reachable transportState = iota
445 closing
446 draining
447)
448
449// ServerConfig consists of all the configurations to establish a server transport.
450type ServerConfig struct {
451 MaxStreams uint32
452 AuthInfo credentials.AuthInfo
453 InTapHandle tap.ServerInHandle
454 StatsHandler stats.Handler
455 KeepaliveParams keepalive.ServerParameters
456 KeepalivePolicy keepalive.EnforcementPolicy
457 InitialWindowSize int32
458 InitialConnWindowSize int32
459 WriteBufferSize int
460 ReadBufferSize int
461 ChannelzParentID int64
462 MaxHeaderListSize *uint32
463}
464
465// NewServerTransport creates a ServerTransport with conn or non-nil error
466// if it fails.
467func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) {
468 return newHTTP2Server(conn, config)
469}
470
471// ConnectOptions covers all relevant options for communicating with the server.
472type ConnectOptions struct {
473 // UserAgent is the application user agent.
474 UserAgent string
475 // Dialer specifies how to dial a network address.
476 Dialer func(context.Context, string) (net.Conn, error)
477 // FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
478 FailOnNonTempDialError bool
479 // PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
480 PerRPCCredentials []credentials.PerRPCCredentials
481 // TransportCredentials stores the Authenticator required to setup a client
482 // connection. Only one of TransportCredentials and CredsBundle is non-nil.
483 TransportCredentials credentials.TransportCredentials
484 // CredsBundle is the credentials bundle to be used. Only one of
485 // TransportCredentials and CredsBundle is non-nil.
486 CredsBundle credentials.Bundle
487 // KeepaliveParams stores the keepalive parameters.
488 KeepaliveParams keepalive.ClientParameters
489 // StatsHandler stores the handler for stats.
490 StatsHandler stats.Handler
491 // InitialWindowSize sets the initial window size for a stream.
492 InitialWindowSize int32
493 // InitialConnWindowSize sets the initial window size for a connection.
494 InitialConnWindowSize int32
495 // WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
496 WriteBufferSize int
497 // ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
498 ReadBufferSize int
499 // ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
500 ChannelzParentID int64
501 // MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
502 MaxHeaderListSize *uint32
503}
504
505// TargetInfo contains the information of the target such as network address and metadata.
506type TargetInfo struct {
507 Addr string
508 Metadata interface{}
509 Authority string
510}
511
512// NewClientTransport establishes the transport with the required ConnectOptions
513// and returns it to the caller.
514func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onSuccess func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
515 return newHTTP2Client(connectCtx, ctx, target, opts, onSuccess, onGoAway, onClose)
516}
517
518// Options provides additional hints and information for message
519// transmission.
520type Options struct {
521 // Last indicates whether this write is the last piece for
522 // this stream.
523 Last bool
524}
525
526// CallHdr carries the information of a particular RPC.
527type CallHdr struct {
528 // Host specifies the peer's host.
529 Host string
530
531 // Method specifies the operation to perform.
532 Method string
533
534 // SendCompress specifies the compression algorithm applied on
535 // outbound message.
536 SendCompress string
537
538 // Creds specifies credentials.PerRPCCredentials for a call.
539 Creds credentials.PerRPCCredentials
540
541 // ContentSubtype specifies the content-subtype for a request. For example, a
542 // content-subtype of "proto" will result in a content-type of
543 // "application/grpc+proto". The value of ContentSubtype must be all
544 // lowercase, otherwise the behavior is undefined. See
545 // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
546 // for more details.
547 ContentSubtype string
548
549 PreviousAttempts int // value of grpc-previous-rpc-attempts header to set
550}
551
552// ClientTransport is the common interface for all gRPC client-side transport
553// implementations.
554type ClientTransport interface {
555 // Close tears down this transport. Once it returns, the transport
556 // should not be accessed any more. The caller must make sure this
557 // is called only once.
558 Close() error
559
560 // GracefulClose starts to tear down the transport. It stops accepting
561 // new RPCs and wait the completion of the pending RPCs.
562 GracefulClose() error
563
564 // Write sends the data for the given stream. A nil stream indicates
565 // the write is to be performed on the transport as a whole.
566 Write(s *Stream, hdr []byte, data []byte, opts *Options) error
567
568 // NewStream creates a Stream for an RPC.
569 NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
570
571 // CloseStream clears the footprint of a stream when the stream is
572 // not needed any more. The err indicates the error incurred when
573 // CloseStream is called. Must be called when a stream is finished
574 // unless the associated transport is closing.
575 CloseStream(stream *Stream, err error)
576
577 // Error returns a channel that is closed when some I/O error
578 // happens. Typically the caller should have a goroutine to monitor
579 // this in order to take action (e.g., close the current transport
580 // and create a new one) in error case. It should not return nil
581 // once the transport is initiated.
582 Error() <-chan struct{}
583
584 // GoAway returns a channel that is closed when ClientTransport
585 // receives the draining signal from the server (e.g., GOAWAY frame in
586 // HTTP/2).
587 GoAway() <-chan struct{}
588
589 // GetGoAwayReason returns the reason why GoAway frame was received.
590 GetGoAwayReason() GoAwayReason
591
592 // IncrMsgSent increments the number of message sent through this transport.
593 IncrMsgSent()
594
595 // IncrMsgRecv increments the number of message received through this transport.
596 IncrMsgRecv()
597}
598
599// ServerTransport is the common interface for all gRPC server-side transport
600// implementations.
601//
602// Methods may be called concurrently from multiple goroutines, but
603// Write methods for a given Stream will be called serially.
604type ServerTransport interface {
605 // HandleStreams receives incoming streams using the given handler.
606 HandleStreams(func(*Stream), func(context.Context, string) context.Context)
607
608 // WriteHeader sends the header metadata for the given stream.
609 // WriteHeader may not be called on all streams.
610 WriteHeader(s *Stream, md metadata.MD) error
611
612 // Write sends the data for the given stream.
613 // Write may not be called on all streams.
614 Write(s *Stream, hdr []byte, data []byte, opts *Options) error
615
616 // WriteStatus sends the status of a stream to the client. WriteStatus is
617 // the final call made on a stream and always occurs.
618 WriteStatus(s *Stream, st *status.Status) error
619
620 // Close tears down the transport. Once it is called, the transport
621 // should not be accessed any more. All the pending streams and their
622 // handlers will be terminated asynchronously.
623 Close() error
624
625 // RemoteAddr returns the remote network address.
626 RemoteAddr() net.Addr
627
628 // Drain notifies the client this ServerTransport stops accepting new RPCs.
629 Drain()
630
631 // IncrMsgSent increments the number of message sent through this transport.
632 IncrMsgSent()
633
634 // IncrMsgRecv increments the number of message received through this transport.
635 IncrMsgRecv()
636}
637
638// connectionErrorf creates an ConnectionError with the specified error description.
639func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
640 return ConnectionError{
641 Desc: fmt.Sprintf(format, a...),
642 temp: temp,
643 err: e,
644 }
645}
646
647// ConnectionError is an error that results in the termination of the
648// entire connection and the retry of all the active streams.
649type ConnectionError struct {
650 Desc string
651 temp bool
652 err error
653}
654
655func (e ConnectionError) Error() string {
656 return fmt.Sprintf("connection error: desc = %q", e.Desc)
657}
658
659// Temporary indicates if this connection error is temporary or fatal.
660func (e ConnectionError) Temporary() bool {
661 return e.temp
662}
663
664// Origin returns the original error of this connection error.
665func (e ConnectionError) Origin() error {
666 // Never return nil error here.
667 // If the original error is nil, return itself.
668 if e.err == nil {
669 return e
670 }
671 return e.err
672}
673
674var (
675 // ErrConnClosing indicates that the transport is closing.
676 ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
677 // errStreamDrain indicates that the stream is rejected because the
678 // connection is draining. This could be caused by goaway or balancer
679 // removing the address.
680 errStreamDrain = status.Error(codes.Unavailable, "the connection is draining")
681 // errStreamDone is returned from write at the client side to indiacte application
682 // layer of an error.
683 errStreamDone = errors.New("the stream is done")
684 // StatusGoAway indicates that the server sent a GOAWAY that included this
685 // stream's ID in unprocessed RPCs.
686 statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
687)
688
689// GoAwayReason contains the reason for the GoAway frame received.
690type GoAwayReason uint8
691
692const (
693 // GoAwayInvalid indicates that no GoAway frame is received.
694 GoAwayInvalid GoAwayReason = 0
695 // GoAwayNoReason is the default value when GoAway frame is received.
696 GoAwayNoReason GoAwayReason = 1
697 // GoAwayTooManyPings indicates that a GoAway frame with
698 // ErrCodeEnhanceYourCalm was received and that the debug data said
699 // "too_many_pings".
700 GoAwayTooManyPings GoAwayReason = 2
701)
702
703// channelzData is used to store channelz related data for http2Client and http2Server.
704// These fields cannot be embedded in the original structs (e.g. http2Client), since to do atomic
705// operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment.
706// Here, by grouping those int64 fields inside a struct, we are enforcing the alignment.
707type channelzData struct {
708 kpCount int64
709 // The number of streams that have started, including already finished ones.
710 streamsStarted int64
711 // Client side: The number of streams that have ended successfully by receiving
712 // EoS bit set frame from server.
713 // Server side: The number of streams that have ended successfully by sending
714 // frame with EoS bit set.
715 streamsSucceeded int64
716 streamsFailed int64
717 // lastStreamCreatedTime stores the timestamp that the last stream gets created. It is of int64 type
718 // instead of time.Time since it's more costly to atomically update time.Time variable than int64
719 // variable. The same goes for lastMsgSentTime and lastMsgRecvTime.
720 lastStreamCreatedTime int64
721 msgSent int64
722 msgRecv int64
723 lastMsgSentTime int64
724 lastMsgRecvTime int64
725}
726
727// ContextErr converts the error from context package into a status error.
728func ContextErr(err error) error {
729 switch err {
730 case context.DeadlineExceeded:
731 return status.Error(codes.DeadlineExceeded, err.Error())
732 case context.Canceled:
733 return status.Error(codes.Canceled, err.Error())
734 }
735 return status.Errorf(codes.Internal, "Unexpected error from context packet: %v", err)
736}