blob: 846147a64e508f56a9d80087d000f4c5f98709d2 [file] [log] [blame]
Zack Williamse940c7a2019-08-21 14:25:39 -07001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package transport defines and implements message oriented communication
20// channel to complete various transactions (e.g., an RPC). It is meant for
21// grpc-internal usage and is not intended to be imported directly by users.
22package transport
23
24import (
25 "context"
26 "errors"
27 "fmt"
28 "io"
29 "net"
30 "sync"
31 "sync/atomic"
32
33 "google.golang.org/grpc/codes"
34 "google.golang.org/grpc/credentials"
35 "google.golang.org/grpc/keepalive"
36 "google.golang.org/grpc/metadata"
37 "google.golang.org/grpc/stats"
38 "google.golang.org/grpc/status"
39 "google.golang.org/grpc/tap"
40)
41
42// recvMsg represents the received msg from the transport. All transport
43// protocol specific info has been removed.
44type recvMsg struct {
45 data []byte
46 // nil: received some data
47 // io.EOF: stream is completed. data is nil.
48 // other non-nil error: transport failure. data is nil.
49 err error
50}
51
52// recvBuffer is an unbounded channel of recvMsg structs.
53// Note recvBuffer differs from controlBuffer only in that recvBuffer
54// holds a channel of only recvMsg structs instead of objects implementing "item" interface.
55// recvBuffer is written to much more often than
56// controlBuffer and using strict recvMsg structs helps avoid allocation in "recvBuffer.put"
57type recvBuffer struct {
58 c chan recvMsg
59 mu sync.Mutex
60 backlog []recvMsg
61 err error
62}
63
64func newRecvBuffer() *recvBuffer {
65 b := &recvBuffer{
66 c: make(chan recvMsg, 1),
67 }
68 return b
69}
70
71func (b *recvBuffer) put(r recvMsg) {
72 b.mu.Lock()
73 if b.err != nil {
74 b.mu.Unlock()
75 // An error had occurred earlier, don't accept more
76 // data or errors.
77 return
78 }
79 b.err = r.err
80 if len(b.backlog) == 0 {
81 select {
82 case b.c <- r:
83 b.mu.Unlock()
84 return
85 default:
86 }
87 }
88 b.backlog = append(b.backlog, r)
89 b.mu.Unlock()
90}
91
92func (b *recvBuffer) load() {
93 b.mu.Lock()
94 if len(b.backlog) > 0 {
95 select {
96 case b.c <- b.backlog[0]:
97 b.backlog[0] = recvMsg{}
98 b.backlog = b.backlog[1:]
99 default:
100 }
101 }
102 b.mu.Unlock()
103}
104
105// get returns the channel that receives a recvMsg in the buffer.
106//
107// Upon receipt of a recvMsg, the caller should call load to send another
108// recvMsg onto the channel if there is any.
109func (b *recvBuffer) get() <-chan recvMsg {
110 return b.c
111}
112
113// recvBufferReader implements io.Reader interface to read the data from
114// recvBuffer.
115type recvBufferReader struct {
116 closeStream func(error) // Closes the client transport stream with the given error and nil trailer metadata.
117 ctx context.Context
118 ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
119 recv *recvBuffer
120 last []byte // Stores the remaining data in the previous calls.
121 err error
122}
123
124// Read reads the next len(p) bytes from last. If last is drained, it tries to
125// read additional data from recv. It blocks if there no additional data available
126// in recv. If Read returns any non-nil error, it will continue to return that error.
127func (r *recvBufferReader) Read(p []byte) (n int, err error) {
128 if r.err != nil {
129 return 0, r.err
130 }
131 if r.last != nil && len(r.last) > 0 {
132 // Read remaining data left in last call.
133 copied := copy(p, r.last)
134 r.last = r.last[copied:]
135 return copied, nil
136 }
137 if r.closeStream != nil {
138 n, r.err = r.readClient(p)
139 } else {
140 n, r.err = r.read(p)
141 }
142 return n, r.err
143}
144
145func (r *recvBufferReader) read(p []byte) (n int, err error) {
146 select {
147 case <-r.ctxDone:
148 return 0, ContextErr(r.ctx.Err())
149 case m := <-r.recv.get():
150 return r.readAdditional(m, p)
151 }
152}
153
154func (r *recvBufferReader) readClient(p []byte) (n int, err error) {
155 // If the context is canceled, then closes the stream with nil metadata.
156 // closeStream writes its error parameter to r.recv as a recvMsg.
157 // r.readAdditional acts on that message and returns the necessary error.
158 select {
159 case <-r.ctxDone:
160 r.closeStream(ContextErr(r.ctx.Err()))
161 m := <-r.recv.get()
162 return r.readAdditional(m, p)
163 case m := <-r.recv.get():
164 return r.readAdditional(m, p)
165 }
166}
167
168func (r *recvBufferReader) readAdditional(m recvMsg, p []byte) (n int, err error) {
169 r.recv.load()
170 if m.err != nil {
171 return 0, m.err
172 }
173 copied := copy(p, m.data)
174 r.last = m.data[copied:]
175 return copied, nil
176}
177
178type streamState uint32
179
180const (
181 streamActive streamState = iota
182 streamWriteDone // EndStream sent
183 streamReadDone // EndStream received
184 streamDone // the entire stream is finished.
185)
186
187// Stream represents an RPC in the transport layer.
188type Stream struct {
189 id uint32
190 st ServerTransport // nil for client side Stream
191 ctx context.Context // the associated context of the stream
192 cancel context.CancelFunc // always nil for client side Stream
193 done chan struct{} // closed at the end of stream to unblock writers. On the client side.
194 ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance)
195 method string // the associated RPC method of the stream
196 recvCompress string
197 sendCompress string
198 buf *recvBuffer
199 trReader io.Reader
200 fc *inFlow
201 wq *writeQuota
202
203 // Callback to state application's intentions to read data. This
204 // is used to adjust flow control, if needed.
205 requestRead func(int)
206
207 headerChan chan struct{} // closed to indicate the end of header metadata.
208 headerDone uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
209
210 // hdrMu protects header and trailer metadata on the server-side.
211 hdrMu sync.Mutex
212 // On client side, header keeps the received header metadata.
213 //
214 // On server side, header keeps the header set by SetHeader(). The complete
215 // header will merged into this after t.WriteHeader() is called.
216 header metadata.MD
217 trailer metadata.MD // the key-value map of trailer metadata.
218
219 noHeaders bool // set if the client never received headers (set only after the stream is done).
220
221 // On the server-side, headerSent is atomically set to 1 when the headers are sent out.
222 headerSent uint32
223
224 state streamState
225
226 // On client-side it is the status error received from the server.
227 // On server-side it is unused.
228 status *status.Status
229
230 bytesReceived uint32 // indicates whether any bytes have been received on this stream
231 unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream
232
233 // contentSubtype is the content-subtype for requests.
234 // this must be lowercase or the behavior is undefined.
235 contentSubtype string
236}
237
238// isHeaderSent is only valid on the server-side.
239func (s *Stream) isHeaderSent() bool {
240 return atomic.LoadUint32(&s.headerSent) == 1
241}
242
243// updateHeaderSent updates headerSent and returns true
244// if it was alreay set. It is valid only on server-side.
245func (s *Stream) updateHeaderSent() bool {
246 return atomic.SwapUint32(&s.headerSent, 1) == 1
247}
248
249func (s *Stream) swapState(st streamState) streamState {
250 return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
251}
252
253func (s *Stream) compareAndSwapState(oldState, newState streamState) bool {
254 return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState))
255}
256
257func (s *Stream) getState() streamState {
258 return streamState(atomic.LoadUint32((*uint32)(&s.state)))
259}
260
261func (s *Stream) waitOnHeader() error {
262 if s.headerChan == nil {
263 // On the server headerChan is always nil since a stream originates
264 // only after having received headers.
265 return nil
266 }
267 select {
268 case <-s.ctx.Done():
269 return ContextErr(s.ctx.Err())
270 case <-s.headerChan:
271 return nil
272 }
273}
274
275// RecvCompress returns the compression algorithm applied to the inbound
276// message. It is empty string if there is no compression applied.
277func (s *Stream) RecvCompress() string {
278 if err := s.waitOnHeader(); err != nil {
279 return ""
280 }
281 return s.recvCompress
282}
283
284// SetSendCompress sets the compression algorithm to the stream.
285func (s *Stream) SetSendCompress(str string) {
286 s.sendCompress = str
287}
288
289// Done returns a channel which is closed when it receives the final status
290// from the server.
291func (s *Stream) Done() <-chan struct{} {
292 return s.done
293}
294
295// Header returns the header metadata of the stream.
296//
297// On client side, it acquires the key-value pairs of header metadata once it is
298// available. It blocks until i) the metadata is ready or ii) there is no header
299// metadata or iii) the stream is canceled/expired.
300//
301// On server side, it returns the out header after t.WriteHeader is called.
302func (s *Stream) Header() (metadata.MD, error) {
303 if s.headerChan == nil && s.header != nil {
304 // On server side, return the header in stream. It will be the out
305 // header after t.WriteHeader is called.
306 return s.header.Copy(), nil
307 }
308 err := s.waitOnHeader()
309 // Even if the stream is closed, header is returned if available.
310 select {
311 case <-s.headerChan:
312 if s.header == nil {
313 return nil, nil
314 }
315 return s.header.Copy(), nil
316 default:
317 }
318 return nil, err
319}
320
321// TrailersOnly blocks until a header or trailers-only frame is received and
322// then returns true if the stream was trailers-only. If the stream ends
323// before headers are received, returns true, nil. If a context error happens
324// first, returns it as a status error. Client-side only.
325func (s *Stream) TrailersOnly() (bool, error) {
326 err := s.waitOnHeader()
327 if err != nil {
328 return false, err
329 }
330 return s.noHeaders, nil
331}
332
333// Trailer returns the cached trailer metedata. Note that if it is not called
334// after the entire stream is done, it could return an empty MD. Client
335// side only.
336// It can be safely read only after stream has ended that is either read
337// or write have returned io.EOF.
338func (s *Stream) Trailer() metadata.MD {
339 c := s.trailer.Copy()
340 return c
341}
342
343// ContentSubtype returns the content-subtype for a request. For example, a
344// content-subtype of "proto" will result in a content-type of
345// "application/grpc+proto". This will always be lowercase. See
346// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
347// more details.
348func (s *Stream) ContentSubtype() string {
349 return s.contentSubtype
350}
351
352// Context returns the context of the stream.
353func (s *Stream) Context() context.Context {
354 return s.ctx
355}
356
357// Method returns the method for the stream.
358func (s *Stream) Method() string {
359 return s.method
360}
361
362// Status returns the status received from the server.
363// Status can be read safely only after the stream has ended,
364// that is, after Done() is closed.
365func (s *Stream) Status() *status.Status {
366 return s.status
367}
368
369// SetHeader sets the header metadata. This can be called multiple times.
370// Server side only.
371// This should not be called in parallel to other data writes.
372func (s *Stream) SetHeader(md metadata.MD) error {
373 if md.Len() == 0 {
374 return nil
375 }
376 if s.isHeaderSent() || s.getState() == streamDone {
377 return ErrIllegalHeaderWrite
378 }
379 s.hdrMu.Lock()
380 s.header = metadata.Join(s.header, md)
381 s.hdrMu.Unlock()
382 return nil
383}
384
385// SendHeader sends the given header metadata. The given metadata is
386// combined with any metadata set by previous calls to SetHeader and
387// then written to the transport stream.
388func (s *Stream) SendHeader(md metadata.MD) error {
389 return s.st.WriteHeader(s, md)
390}
391
392// SetTrailer sets the trailer metadata which will be sent with the RPC status
393// by the server. This can be called multiple times. Server side only.
394// This should not be called parallel to other data writes.
395func (s *Stream) SetTrailer(md metadata.MD) error {
396 if md.Len() == 0 {
397 return nil
398 }
399 if s.getState() == streamDone {
400 return ErrIllegalHeaderWrite
401 }
402 s.hdrMu.Lock()
403 s.trailer = metadata.Join(s.trailer, md)
404 s.hdrMu.Unlock()
405 return nil
406}
407
408func (s *Stream) write(m recvMsg) {
409 s.buf.put(m)
410}
411
412// Read reads all p bytes from the wire for this stream.
413func (s *Stream) Read(p []byte) (n int, err error) {
414 // Don't request a read if there was an error earlier
415 if er := s.trReader.(*transportReader).er; er != nil {
416 return 0, er
417 }
418 s.requestRead(len(p))
419 return io.ReadFull(s.trReader, p)
420}
421
422// tranportReader reads all the data available for this Stream from the transport and
423// passes them into the decoder, which converts them into a gRPC message stream.
424// The error is io.EOF when the stream is done or another non-nil error if
425// the stream broke.
426type transportReader struct {
427 reader io.Reader
428 // The handler to control the window update procedure for both this
429 // particular stream and the associated transport.
430 windowHandler func(int)
431 er error
432}
433
434func (t *transportReader) Read(p []byte) (n int, err error) {
435 n, err = t.reader.Read(p)
436 if err != nil {
437 t.er = err
438 return
439 }
440 t.windowHandler(n)
441 return
442}
443
444// BytesReceived indicates whether any bytes have been received on this stream.
445func (s *Stream) BytesReceived() bool {
446 return atomic.LoadUint32(&s.bytesReceived) == 1
447}
448
449// Unprocessed indicates whether the server did not process this stream --
450// i.e. it sent a refused stream or GOAWAY including this stream ID.
451func (s *Stream) Unprocessed() bool {
452 return atomic.LoadUint32(&s.unprocessed) == 1
453}
454
455// GoString is implemented by Stream so context.String() won't
456// race when printing %#v.
457func (s *Stream) GoString() string {
458 return fmt.Sprintf("<stream: %p, %v>", s, s.method)
459}
460
461// state of transport
462type transportState int
463
464const (
465 reachable transportState = iota
466 closing
467 draining
468)
469
470// ServerConfig consists of all the configurations to establish a server transport.
471type ServerConfig struct {
472 MaxStreams uint32
473 AuthInfo credentials.AuthInfo
474 InTapHandle tap.ServerInHandle
475 StatsHandler stats.Handler
476 KeepaliveParams keepalive.ServerParameters
477 KeepalivePolicy keepalive.EnforcementPolicy
478 InitialWindowSize int32
479 InitialConnWindowSize int32
480 WriteBufferSize int
481 ReadBufferSize int
482 ChannelzParentID int64
483 MaxHeaderListSize *uint32
484}
485
486// NewServerTransport creates a ServerTransport with conn or non-nil error
487// if it fails.
488func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) {
489 return newHTTP2Server(conn, config)
490}
491
492// ConnectOptions covers all relevant options for communicating with the server.
493type ConnectOptions struct {
494 // UserAgent is the application user agent.
495 UserAgent string
496 // Dialer specifies how to dial a network address.
497 Dialer func(context.Context, string) (net.Conn, error)
498 // FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
499 FailOnNonTempDialError bool
500 // PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
501 PerRPCCredentials []credentials.PerRPCCredentials
502 // TransportCredentials stores the Authenticator required to setup a client
503 // connection. Only one of TransportCredentials and CredsBundle is non-nil.
504 TransportCredentials credentials.TransportCredentials
505 // CredsBundle is the credentials bundle to be used. Only one of
506 // TransportCredentials and CredsBundle is non-nil.
507 CredsBundle credentials.Bundle
508 // KeepaliveParams stores the keepalive parameters.
509 KeepaliveParams keepalive.ClientParameters
510 // StatsHandler stores the handler for stats.
511 StatsHandler stats.Handler
512 // InitialWindowSize sets the initial window size for a stream.
513 InitialWindowSize int32
514 // InitialConnWindowSize sets the initial window size for a connection.
515 InitialConnWindowSize int32
516 // WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
517 WriteBufferSize int
518 // ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
519 ReadBufferSize int
520 // ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
521 ChannelzParentID int64
522 // MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
523 MaxHeaderListSize *uint32
524}
525
526// TargetInfo contains the information of the target such as network address and metadata.
527type TargetInfo struct {
528 Addr string
529 Metadata interface{}
530 Authority string
531}
532
533// NewClientTransport establishes the transport with the required ConnectOptions
534// and returns it to the caller.
535func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
536 return newHTTP2Client(connectCtx, ctx, target, opts, onPrefaceReceipt, onGoAway, onClose)
537}
538
539// Options provides additional hints and information for message
540// transmission.
541type Options struct {
542 // Last indicates whether this write is the last piece for
543 // this stream.
544 Last bool
545}
546
547// CallHdr carries the information of a particular RPC.
548type CallHdr struct {
549 // Host specifies the peer's host.
550 Host string
551
552 // Method specifies the operation to perform.
553 Method string
554
555 // SendCompress specifies the compression algorithm applied on
556 // outbound message.
557 SendCompress string
558
559 // Creds specifies credentials.PerRPCCredentials for a call.
560 Creds credentials.PerRPCCredentials
561
562 // ContentSubtype specifies the content-subtype for a request. For example, a
563 // content-subtype of "proto" will result in a content-type of
564 // "application/grpc+proto". The value of ContentSubtype must be all
565 // lowercase, otherwise the behavior is undefined. See
566 // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
567 // for more details.
568 ContentSubtype string
569
570 PreviousAttempts int // value of grpc-previous-rpc-attempts header to set
571}
572
573// ClientTransport is the common interface for all gRPC client-side transport
574// implementations.
575type ClientTransport interface {
576 // Close tears down this transport. Once it returns, the transport
577 // should not be accessed any more. The caller must make sure this
578 // is called only once.
579 Close() error
580
581 // GracefulClose starts to tear down the transport: the transport will stop
582 // accepting new RPCs and NewStream will return error. Once all streams are
583 // finished, the transport will close.
584 //
585 // It does not block.
586 GracefulClose()
587
588 // Write sends the data for the given stream. A nil stream indicates
589 // the write is to be performed on the transport as a whole.
590 Write(s *Stream, hdr []byte, data []byte, opts *Options) error
591
592 // NewStream creates a Stream for an RPC.
593 NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
594
595 // CloseStream clears the footprint of a stream when the stream is
596 // not needed any more. The err indicates the error incurred when
597 // CloseStream is called. Must be called when a stream is finished
598 // unless the associated transport is closing.
599 CloseStream(stream *Stream, err error)
600
601 // Error returns a channel that is closed when some I/O error
602 // happens. Typically the caller should have a goroutine to monitor
603 // this in order to take action (e.g., close the current transport
604 // and create a new one) in error case. It should not return nil
605 // once the transport is initiated.
606 Error() <-chan struct{}
607
608 // GoAway returns a channel that is closed when ClientTransport
609 // receives the draining signal from the server (e.g., GOAWAY frame in
610 // HTTP/2).
611 GoAway() <-chan struct{}
612
613 // GetGoAwayReason returns the reason why GoAway frame was received.
614 GetGoAwayReason() GoAwayReason
615
616 // RemoteAddr returns the remote network address.
617 RemoteAddr() net.Addr
618
619 // IncrMsgSent increments the number of message sent through this transport.
620 IncrMsgSent()
621
622 // IncrMsgRecv increments the number of message received through this transport.
623 IncrMsgRecv()
624}
625
626// ServerTransport is the common interface for all gRPC server-side transport
627// implementations.
628//
629// Methods may be called concurrently from multiple goroutines, but
630// Write methods for a given Stream will be called serially.
631type ServerTransport interface {
632 // HandleStreams receives incoming streams using the given handler.
633 HandleStreams(func(*Stream), func(context.Context, string) context.Context)
634
635 // WriteHeader sends the header metadata for the given stream.
636 // WriteHeader may not be called on all streams.
637 WriteHeader(s *Stream, md metadata.MD) error
638
639 // Write sends the data for the given stream.
640 // Write may not be called on all streams.
641 Write(s *Stream, hdr []byte, data []byte, opts *Options) error
642
643 // WriteStatus sends the status of a stream to the client. WriteStatus is
644 // the final call made on a stream and always occurs.
645 WriteStatus(s *Stream, st *status.Status) error
646
647 // Close tears down the transport. Once it is called, the transport
648 // should not be accessed any more. All the pending streams and their
649 // handlers will be terminated asynchronously.
650 Close() error
651
652 // RemoteAddr returns the remote network address.
653 RemoteAddr() net.Addr
654
655 // Drain notifies the client this ServerTransport stops accepting new RPCs.
656 Drain()
657
658 // IncrMsgSent increments the number of message sent through this transport.
659 IncrMsgSent()
660
661 // IncrMsgRecv increments the number of message received through this transport.
662 IncrMsgRecv()
663}
664
665// connectionErrorf creates an ConnectionError with the specified error description.
666func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
667 return ConnectionError{
668 Desc: fmt.Sprintf(format, a...),
669 temp: temp,
670 err: e,
671 }
672}
673
674// ConnectionError is an error that results in the termination of the
675// entire connection and the retry of all the active streams.
676type ConnectionError struct {
677 Desc string
678 temp bool
679 err error
680}
681
682func (e ConnectionError) Error() string {
683 return fmt.Sprintf("connection error: desc = %q", e.Desc)
684}
685
686// Temporary indicates if this connection error is temporary or fatal.
687func (e ConnectionError) Temporary() bool {
688 return e.temp
689}
690
691// Origin returns the original error of this connection error.
692func (e ConnectionError) Origin() error {
693 // Never return nil error here.
694 // If the original error is nil, return itself.
695 if e.err == nil {
696 return e
697 }
698 return e.err
699}
700
701var (
702 // ErrConnClosing indicates that the transport is closing.
703 ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
704 // errStreamDrain indicates that the stream is rejected because the
705 // connection is draining. This could be caused by goaway or balancer
706 // removing the address.
707 errStreamDrain = status.Error(codes.Unavailable, "the connection is draining")
708 // errStreamDone is returned from write at the client side to indiacte application
709 // layer of an error.
710 errStreamDone = errors.New("the stream is done")
711 // StatusGoAway indicates that the server sent a GOAWAY that included this
712 // stream's ID in unprocessed RPCs.
713 statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
714)
715
716// GoAwayReason contains the reason for the GoAway frame received.
717type GoAwayReason uint8
718
719const (
720 // GoAwayInvalid indicates that no GoAway frame is received.
721 GoAwayInvalid GoAwayReason = 0
722 // GoAwayNoReason is the default value when GoAway frame is received.
723 GoAwayNoReason GoAwayReason = 1
724 // GoAwayTooManyPings indicates that a GoAway frame with
725 // ErrCodeEnhanceYourCalm was received and that the debug data said
726 // "too_many_pings".
727 GoAwayTooManyPings GoAwayReason = 2
728)
729
730// channelzData is used to store channelz related data for http2Client and http2Server.
731// These fields cannot be embedded in the original structs (e.g. http2Client), since to do atomic
732// operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment.
733// Here, by grouping those int64 fields inside a struct, we are enforcing the alignment.
734type channelzData struct {
735 kpCount int64
736 // The number of streams that have started, including already finished ones.
737 streamsStarted int64
738 // Client side: The number of streams that have ended successfully by receiving
739 // EoS bit set frame from server.
740 // Server side: The number of streams that have ended successfully by sending
741 // frame with EoS bit set.
742 streamsSucceeded int64
743 streamsFailed int64
744 // lastStreamCreatedTime stores the timestamp that the last stream gets created. It is of int64 type
745 // instead of time.Time since it's more costly to atomically update time.Time variable than int64
746 // variable. The same goes for lastMsgSentTime and lastMsgRecvTime.
747 lastStreamCreatedTime int64
748 msgSent int64
749 msgRecv int64
750 lastMsgSentTime int64
751 lastMsgRecvTime int64
752}
753
754// ContextErr converts the error from context package into a status error.
755func ContextErr(err error) error {
756 switch err {
757 case context.DeadlineExceeded:
758 return status.Error(codes.DeadlineExceeded, err.Error())
759 case context.Canceled:
760 return status.Error(codes.Canceled, err.Error())
761 }
762 return status.Errorf(codes.Internal, "Unexpected error from context packet: %v", err)
763}