blob: a30da9eb324f5439b7afc39e406b06d81a8b3479 [file] [log] [blame]
Scott Baker105df152020-04-13 15:55:14 -07001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package transport defines and implements message oriented communication
20// channel to complete various transactions (e.g., an RPC). It is meant for
21// grpc-internal usage and is not intended to be imported directly by users.
22package transport
23
24import (
25 "bytes"
26 "context"
27 "errors"
28 "fmt"
29 "io"
30 "net"
31 "sync"
32 "sync/atomic"
33
34 "google.golang.org/grpc/codes"
35 "google.golang.org/grpc/credentials"
36 "google.golang.org/grpc/keepalive"
37 "google.golang.org/grpc/metadata"
38 "google.golang.org/grpc/stats"
39 "google.golang.org/grpc/status"
40 "google.golang.org/grpc/tap"
41)
42
43type bufferPool struct {
44 pool sync.Pool
45}
46
47func newBufferPool() *bufferPool {
48 return &bufferPool{
49 pool: sync.Pool{
50 New: func() interface{} {
51 return new(bytes.Buffer)
52 },
53 },
54 }
55}
56
57func (p *bufferPool) get() *bytes.Buffer {
58 return p.pool.Get().(*bytes.Buffer)
59}
60
61func (p *bufferPool) put(b *bytes.Buffer) {
62 p.pool.Put(b)
63}
64
65// recvMsg represents the received msg from the transport. All transport
66// protocol specific info has been removed.
67type recvMsg struct {
68 buffer *bytes.Buffer
69 // nil: received some data
70 // io.EOF: stream is completed. data is nil.
71 // other non-nil error: transport failure. data is nil.
72 err error
73}
74
75// recvBuffer is an unbounded channel of recvMsg structs.
76//
77// Note: recvBuffer differs from buffer.Unbounded only in the fact that it
78// holds a channel of recvMsg structs instead of objects implementing "item"
79// interface. recvBuffer is written to much more often and using strict recvMsg
80// structs helps avoid allocation in "recvBuffer.put"
81type recvBuffer struct {
82 c chan recvMsg
83 mu sync.Mutex
84 backlog []recvMsg
85 err error
86}
87
88func newRecvBuffer() *recvBuffer {
89 b := &recvBuffer{
90 c: make(chan recvMsg, 1),
91 }
92 return b
93}
94
95func (b *recvBuffer) put(r recvMsg) {
96 b.mu.Lock()
97 if b.err != nil {
98 b.mu.Unlock()
99 // An error had occurred earlier, don't accept more
100 // data or errors.
101 return
102 }
103 b.err = r.err
104 if len(b.backlog) == 0 {
105 select {
106 case b.c <- r:
107 b.mu.Unlock()
108 return
109 default:
110 }
111 }
112 b.backlog = append(b.backlog, r)
113 b.mu.Unlock()
114}
115
116func (b *recvBuffer) load() {
117 b.mu.Lock()
118 if len(b.backlog) > 0 {
119 select {
120 case b.c <- b.backlog[0]:
121 b.backlog[0] = recvMsg{}
122 b.backlog = b.backlog[1:]
123 default:
124 }
125 }
126 b.mu.Unlock()
127}
128
129// get returns the channel that receives a recvMsg in the buffer.
130//
131// Upon receipt of a recvMsg, the caller should call load to send another
132// recvMsg onto the channel if there is any.
133func (b *recvBuffer) get() <-chan recvMsg {
134 return b.c
135}
136
137// recvBufferReader implements io.Reader interface to read the data from
138// recvBuffer.
139type recvBufferReader struct {
140 closeStream func(error) // Closes the client transport stream with the given error and nil trailer metadata.
141 ctx context.Context
142 ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
143 recv *recvBuffer
144 last *bytes.Buffer // Stores the remaining data in the previous calls.
145 err error
146 freeBuffer func(*bytes.Buffer)
147}
148
149// Read reads the next len(p) bytes from last. If last is drained, it tries to
150// read additional data from recv. It blocks if there no additional data available
151// in recv. If Read returns any non-nil error, it will continue to return that error.
152func (r *recvBufferReader) Read(p []byte) (n int, err error) {
153 if r.err != nil {
154 return 0, r.err
155 }
156 if r.last != nil {
157 // Read remaining data left in last call.
158 copied, _ := r.last.Read(p)
159 if r.last.Len() == 0 {
160 r.freeBuffer(r.last)
161 r.last = nil
162 }
163 return copied, nil
164 }
165 if r.closeStream != nil {
166 n, r.err = r.readClient(p)
167 } else {
168 n, r.err = r.read(p)
169 }
170 return n, r.err
171}
172
173func (r *recvBufferReader) read(p []byte) (n int, err error) {
174 select {
175 case <-r.ctxDone:
176 return 0, ContextErr(r.ctx.Err())
177 case m := <-r.recv.get():
178 return r.readAdditional(m, p)
179 }
180}
181
182func (r *recvBufferReader) readClient(p []byte) (n int, err error) {
183 // If the context is canceled, then closes the stream with nil metadata.
184 // closeStream writes its error parameter to r.recv as a recvMsg.
185 // r.readAdditional acts on that message and returns the necessary error.
186 select {
187 case <-r.ctxDone:
188 // Note that this adds the ctx error to the end of recv buffer, and
189 // reads from the head. This will delay the error until recv buffer is
190 // empty, thus will delay ctx cancellation in Recv().
191 //
192 // It's done this way to fix a race between ctx cancel and trailer. The
193 // race was, stream.Recv() may return ctx error if ctxDone wins the
194 // race, but stream.Trailer() may return a non-nil md because the stream
195 // was not marked as done when trailer is received. This closeStream
196 // call will mark stream as done, thus fix the race.
197 //
198 // TODO: delaying ctx error seems like a unnecessary side effect. What
199 // we really want is to mark the stream as done, and return ctx error
200 // faster.
201 r.closeStream(ContextErr(r.ctx.Err()))
202 m := <-r.recv.get()
203 return r.readAdditional(m, p)
204 case m := <-r.recv.get():
205 return r.readAdditional(m, p)
206 }
207}
208
209func (r *recvBufferReader) readAdditional(m recvMsg, p []byte) (n int, err error) {
210 r.recv.load()
211 if m.err != nil {
212 return 0, m.err
213 }
214 copied, _ := m.buffer.Read(p)
215 if m.buffer.Len() == 0 {
216 r.freeBuffer(m.buffer)
217 r.last = nil
218 } else {
219 r.last = m.buffer
220 }
221 return copied, nil
222}
223
224type streamState uint32
225
226const (
227 streamActive streamState = iota
228 streamWriteDone // EndStream sent
229 streamReadDone // EndStream received
230 streamDone // the entire stream is finished.
231)
232
233// Stream represents an RPC in the transport layer.
234type Stream struct {
235 id uint32
236 st ServerTransport // nil for client side Stream
237 ct *http2Client // nil for server side Stream
238 ctx context.Context // the associated context of the stream
239 cancel context.CancelFunc // always nil for client side Stream
240 done chan struct{} // closed at the end of stream to unblock writers. On the client side.
241 ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance)
242 method string // the associated RPC method of the stream
243 recvCompress string
244 sendCompress string
245 buf *recvBuffer
246 trReader io.Reader
247 fc *inFlow
248 wq *writeQuota
249
250 // Callback to state application's intentions to read data. This
251 // is used to adjust flow control, if needed.
252 requestRead func(int)
253
254 headerChan chan struct{} // closed to indicate the end of header metadata.
255 headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
256 // headerValid indicates whether a valid header was received. Only
257 // meaningful after headerChan is closed (always call waitOnHeader() before
258 // reading its value). Not valid on server side.
259 headerValid bool
260
261 // hdrMu protects header and trailer metadata on the server-side.
262 hdrMu sync.Mutex
263 // On client side, header keeps the received header metadata.
264 //
265 // On server side, header keeps the header set by SetHeader(). The complete
266 // header will merged into this after t.WriteHeader() is called.
267 header metadata.MD
268 trailer metadata.MD // the key-value map of trailer metadata.
269
270 noHeaders bool // set if the client never received headers (set only after the stream is done).
271
272 // On the server-side, headerSent is atomically set to 1 when the headers are sent out.
273 headerSent uint32
274
275 state streamState
276
277 // On client-side it is the status error received from the server.
278 // On server-side it is unused.
279 status *status.Status
280
281 bytesReceived uint32 // indicates whether any bytes have been received on this stream
282 unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream
283
284 // contentSubtype is the content-subtype for requests.
285 // this must be lowercase or the behavior is undefined.
286 contentSubtype string
287}
288
289// isHeaderSent is only valid on the server-side.
290func (s *Stream) isHeaderSent() bool {
291 return atomic.LoadUint32(&s.headerSent) == 1
292}
293
294// updateHeaderSent updates headerSent and returns true
295// if it was alreay set. It is valid only on server-side.
296func (s *Stream) updateHeaderSent() bool {
297 return atomic.SwapUint32(&s.headerSent, 1) == 1
298}
299
300func (s *Stream) swapState(st streamState) streamState {
301 return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
302}
303
304func (s *Stream) compareAndSwapState(oldState, newState streamState) bool {
305 return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState))
306}
307
308func (s *Stream) getState() streamState {
309 return streamState(atomic.LoadUint32((*uint32)(&s.state)))
310}
311
312func (s *Stream) waitOnHeader() {
313 if s.headerChan == nil {
314 // On the server headerChan is always nil since a stream originates
315 // only after having received headers.
316 return
317 }
318 select {
319 case <-s.ctx.Done():
320 // Close the stream to prevent headers/trailers from changing after
321 // this function returns.
322 s.ct.CloseStream(s, ContextErr(s.ctx.Err()))
323 // headerChan could possibly not be closed yet if closeStream raced
324 // with operateHeaders; wait until it is closed explicitly here.
325 <-s.headerChan
326 case <-s.headerChan:
327 }
328}
329
330// RecvCompress returns the compression algorithm applied to the inbound
331// message. It is empty string if there is no compression applied.
332func (s *Stream) RecvCompress() string {
333 s.waitOnHeader()
334 return s.recvCompress
335}
336
337// SetSendCompress sets the compression algorithm to the stream.
338func (s *Stream) SetSendCompress(str string) {
339 s.sendCompress = str
340}
341
342// Done returns a channel which is closed when it receives the final status
343// from the server.
344func (s *Stream) Done() <-chan struct{} {
345 return s.done
346}
347
348// Header returns the header metadata of the stream.
349//
350// On client side, it acquires the key-value pairs of header metadata once it is
351// available. It blocks until i) the metadata is ready or ii) there is no header
352// metadata or iii) the stream is canceled/expired.
353//
354// On server side, it returns the out header after t.WriteHeader is called. It
355// does not block and must not be called until after WriteHeader.
356func (s *Stream) Header() (metadata.MD, error) {
357 if s.headerChan == nil {
358 // On server side, return the header in stream. It will be the out
359 // header after t.WriteHeader is called.
360 return s.header.Copy(), nil
361 }
362 s.waitOnHeader()
363 if !s.headerValid {
364 return nil, s.status.Err()
365 }
366 return s.header.Copy(), nil
367}
368
369// TrailersOnly blocks until a header or trailers-only frame is received and
370// then returns true if the stream was trailers-only. If the stream ends
371// before headers are received, returns true, nil. Client-side only.
372func (s *Stream) TrailersOnly() bool {
373 s.waitOnHeader()
374 return s.noHeaders
375}
376
377// Trailer returns the cached trailer metedata. Note that if it is not called
378// after the entire stream is done, it could return an empty MD. Client
379// side only.
380// It can be safely read only after stream has ended that is either read
381// or write have returned io.EOF.
382func (s *Stream) Trailer() metadata.MD {
383 c := s.trailer.Copy()
384 return c
385}
386
387// ContentSubtype returns the content-subtype for a request. For example, a
388// content-subtype of "proto" will result in a content-type of
389// "application/grpc+proto". This will always be lowercase. See
390// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
391// more details.
392func (s *Stream) ContentSubtype() string {
393 return s.contentSubtype
394}
395
396// Context returns the context of the stream.
397func (s *Stream) Context() context.Context {
398 return s.ctx
399}
400
401// Method returns the method for the stream.
402func (s *Stream) Method() string {
403 return s.method
404}
405
406// Status returns the status received from the server.
407// Status can be read safely only after the stream has ended,
408// that is, after Done() is closed.
409func (s *Stream) Status() *status.Status {
410 return s.status
411}
412
413// SetHeader sets the header metadata. This can be called multiple times.
414// Server side only.
415// This should not be called in parallel to other data writes.
416func (s *Stream) SetHeader(md metadata.MD) error {
417 if md.Len() == 0 {
418 return nil
419 }
420 if s.isHeaderSent() || s.getState() == streamDone {
421 return ErrIllegalHeaderWrite
422 }
423 s.hdrMu.Lock()
424 s.header = metadata.Join(s.header, md)
425 s.hdrMu.Unlock()
426 return nil
427}
428
429// SendHeader sends the given header metadata. The given metadata is
430// combined with any metadata set by previous calls to SetHeader and
431// then written to the transport stream.
432func (s *Stream) SendHeader(md metadata.MD) error {
433 return s.st.WriteHeader(s, md)
434}
435
436// SetTrailer sets the trailer metadata which will be sent with the RPC status
437// by the server. This can be called multiple times. Server side only.
438// This should not be called parallel to other data writes.
439func (s *Stream) SetTrailer(md metadata.MD) error {
440 if md.Len() == 0 {
441 return nil
442 }
443 if s.getState() == streamDone {
444 return ErrIllegalHeaderWrite
445 }
446 s.hdrMu.Lock()
447 s.trailer = metadata.Join(s.trailer, md)
448 s.hdrMu.Unlock()
449 return nil
450}
451
452func (s *Stream) write(m recvMsg) {
453 s.buf.put(m)
454}
455
456// Read reads all p bytes from the wire for this stream.
457func (s *Stream) Read(p []byte) (n int, err error) {
458 // Don't request a read if there was an error earlier
459 if er := s.trReader.(*transportReader).er; er != nil {
460 return 0, er
461 }
462 s.requestRead(len(p))
463 return io.ReadFull(s.trReader, p)
464}
465
466// tranportReader reads all the data available for this Stream from the transport and
467// passes them into the decoder, which converts them into a gRPC message stream.
468// The error is io.EOF when the stream is done or another non-nil error if
469// the stream broke.
470type transportReader struct {
471 reader io.Reader
472 // The handler to control the window update procedure for both this
473 // particular stream and the associated transport.
474 windowHandler func(int)
475 er error
476}
477
478func (t *transportReader) Read(p []byte) (n int, err error) {
479 n, err = t.reader.Read(p)
480 if err != nil {
481 t.er = err
482 return
483 }
484 t.windowHandler(n)
485 return
486}
487
488// BytesReceived indicates whether any bytes have been received on this stream.
489func (s *Stream) BytesReceived() bool {
490 return atomic.LoadUint32(&s.bytesReceived) == 1
491}
492
493// Unprocessed indicates whether the server did not process this stream --
494// i.e. it sent a refused stream or GOAWAY including this stream ID.
495func (s *Stream) Unprocessed() bool {
496 return atomic.LoadUint32(&s.unprocessed) == 1
497}
498
499// GoString is implemented by Stream so context.String() won't
500// race when printing %#v.
501func (s *Stream) GoString() string {
502 return fmt.Sprintf("<stream: %p, %v>", s, s.method)
503}
504
505// state of transport
506type transportState int
507
508const (
509 reachable transportState = iota
510 closing
511 draining
512)
513
514// ServerConfig consists of all the configurations to establish a server transport.
515type ServerConfig struct {
516 MaxStreams uint32
517 AuthInfo credentials.AuthInfo
518 InTapHandle tap.ServerInHandle
519 StatsHandler stats.Handler
520 KeepaliveParams keepalive.ServerParameters
521 KeepalivePolicy keepalive.EnforcementPolicy
522 InitialWindowSize int32
523 InitialConnWindowSize int32
524 WriteBufferSize int
525 ReadBufferSize int
526 ChannelzParentID int64
527 MaxHeaderListSize *uint32
528 HeaderTableSize *uint32
529}
530
531// NewServerTransport creates a ServerTransport with conn or non-nil error
532// if it fails.
533func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) {
534 return newHTTP2Server(conn, config)
535}
536
537// ConnectOptions covers all relevant options for communicating with the server.
538type ConnectOptions struct {
539 // UserAgent is the application user agent.
540 UserAgent string
541 // Dialer specifies how to dial a network address.
542 Dialer func(context.Context, string) (net.Conn, error)
543 // FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
544 FailOnNonTempDialError bool
545 // PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
546 PerRPCCredentials []credentials.PerRPCCredentials
547 // TransportCredentials stores the Authenticator required to setup a client
548 // connection. Only one of TransportCredentials and CredsBundle is non-nil.
549 TransportCredentials credentials.TransportCredentials
550 // CredsBundle is the credentials bundle to be used. Only one of
551 // TransportCredentials and CredsBundle is non-nil.
552 CredsBundle credentials.Bundle
553 // KeepaliveParams stores the keepalive parameters.
554 KeepaliveParams keepalive.ClientParameters
555 // StatsHandler stores the handler for stats.
556 StatsHandler stats.Handler
557 // InitialWindowSize sets the initial window size for a stream.
558 InitialWindowSize int32
559 // InitialConnWindowSize sets the initial window size for a connection.
560 InitialConnWindowSize int32
561 // WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
562 WriteBufferSize int
563 // ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
564 ReadBufferSize int
565 // ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
566 ChannelzParentID int64
567 // MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
568 MaxHeaderListSize *uint32
569}
570
571// TargetInfo contains the information of the target such as network address and metadata.
572type TargetInfo struct {
573 Addr string
574 Metadata interface{}
575 Authority string
576}
577
578// NewClientTransport establishes the transport with the required ConnectOptions
579// and returns it to the caller.
580func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
581 return newHTTP2Client(connectCtx, ctx, target, opts, onPrefaceReceipt, onGoAway, onClose)
582}
583
584// Options provides additional hints and information for message
585// transmission.
586type Options struct {
587 // Last indicates whether this write is the last piece for
588 // this stream.
589 Last bool
590}
591
592// CallHdr carries the information of a particular RPC.
593type CallHdr struct {
594 // Host specifies the peer's host.
595 Host string
596
597 // Method specifies the operation to perform.
598 Method string
599
600 // SendCompress specifies the compression algorithm applied on
601 // outbound message.
602 SendCompress string
603
604 // Creds specifies credentials.PerRPCCredentials for a call.
605 Creds credentials.PerRPCCredentials
606
607 // ContentSubtype specifies the content-subtype for a request. For example, a
608 // content-subtype of "proto" will result in a content-type of
609 // "application/grpc+proto". The value of ContentSubtype must be all
610 // lowercase, otherwise the behavior is undefined. See
611 // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
612 // for more details.
613 ContentSubtype string
614
615 PreviousAttempts int // value of grpc-previous-rpc-attempts header to set
616}
617
618// ClientTransport is the common interface for all gRPC client-side transport
619// implementations.
620type ClientTransport interface {
621 // Close tears down this transport. Once it returns, the transport
622 // should not be accessed any more. The caller must make sure this
623 // is called only once.
624 Close() error
625
626 // GracefulClose starts to tear down the transport: the transport will stop
627 // accepting new RPCs and NewStream will return error. Once all streams are
628 // finished, the transport will close.
629 //
630 // It does not block.
631 GracefulClose()
632
633 // Write sends the data for the given stream. A nil stream indicates
634 // the write is to be performed on the transport as a whole.
635 Write(s *Stream, hdr []byte, data []byte, opts *Options) error
636
637 // NewStream creates a Stream for an RPC.
638 NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
639
640 // CloseStream clears the footprint of a stream when the stream is
641 // not needed any more. The err indicates the error incurred when
642 // CloseStream is called. Must be called when a stream is finished
643 // unless the associated transport is closing.
644 CloseStream(stream *Stream, err error)
645
646 // Error returns a channel that is closed when some I/O error
647 // happens. Typically the caller should have a goroutine to monitor
648 // this in order to take action (e.g., close the current transport
649 // and create a new one) in error case. It should not return nil
650 // once the transport is initiated.
651 Error() <-chan struct{}
652
653 // GoAway returns a channel that is closed when ClientTransport
654 // receives the draining signal from the server (e.g., GOAWAY frame in
655 // HTTP/2).
656 GoAway() <-chan struct{}
657
658 // GetGoAwayReason returns the reason why GoAway frame was received.
659 GetGoAwayReason() GoAwayReason
660
661 // RemoteAddr returns the remote network address.
662 RemoteAddr() net.Addr
663
664 // IncrMsgSent increments the number of message sent through this transport.
665 IncrMsgSent()
666
667 // IncrMsgRecv increments the number of message received through this transport.
668 IncrMsgRecv()
669}
670
671// ServerTransport is the common interface for all gRPC server-side transport
672// implementations.
673//
674// Methods may be called concurrently from multiple goroutines, but
675// Write methods for a given Stream will be called serially.
676type ServerTransport interface {
677 // HandleStreams receives incoming streams using the given handler.
678 HandleStreams(func(*Stream), func(context.Context, string) context.Context)
679
680 // WriteHeader sends the header metadata for the given stream.
681 // WriteHeader may not be called on all streams.
682 WriteHeader(s *Stream, md metadata.MD) error
683
684 // Write sends the data for the given stream.
685 // Write may not be called on all streams.
686 Write(s *Stream, hdr []byte, data []byte, opts *Options) error
687
688 // WriteStatus sends the status of a stream to the client. WriteStatus is
689 // the final call made on a stream and always occurs.
690 WriteStatus(s *Stream, st *status.Status) error
691
692 // Close tears down the transport. Once it is called, the transport
693 // should not be accessed any more. All the pending streams and their
694 // handlers will be terminated asynchronously.
695 Close() error
696
697 // RemoteAddr returns the remote network address.
698 RemoteAddr() net.Addr
699
700 // Drain notifies the client this ServerTransport stops accepting new RPCs.
701 Drain()
702
703 // IncrMsgSent increments the number of message sent through this transport.
704 IncrMsgSent()
705
706 // IncrMsgRecv increments the number of message received through this transport.
707 IncrMsgRecv()
708}
709
710// connectionErrorf creates an ConnectionError with the specified error description.
711func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
712 return ConnectionError{
713 Desc: fmt.Sprintf(format, a...),
714 temp: temp,
715 err: e,
716 }
717}
718
719// ConnectionError is an error that results in the termination of the
720// entire connection and the retry of all the active streams.
721type ConnectionError struct {
722 Desc string
723 temp bool
724 err error
725}
726
727func (e ConnectionError) Error() string {
728 return fmt.Sprintf("connection error: desc = %q", e.Desc)
729}
730
731// Temporary indicates if this connection error is temporary or fatal.
732func (e ConnectionError) Temporary() bool {
733 return e.temp
734}
735
736// Origin returns the original error of this connection error.
737func (e ConnectionError) Origin() error {
738 // Never return nil error here.
739 // If the original error is nil, return itself.
740 if e.err == nil {
741 return e
742 }
743 return e.err
744}
745
746var (
747 // ErrConnClosing indicates that the transport is closing.
748 ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
749 // errStreamDrain indicates that the stream is rejected because the
750 // connection is draining. This could be caused by goaway or balancer
751 // removing the address.
752 errStreamDrain = status.Error(codes.Unavailable, "the connection is draining")
753 // errStreamDone is returned from write at the client side to indiacte application
754 // layer of an error.
755 errStreamDone = errors.New("the stream is done")
756 // StatusGoAway indicates that the server sent a GOAWAY that included this
757 // stream's ID in unprocessed RPCs.
758 statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
759)
760
761// GoAwayReason contains the reason for the GoAway frame received.
762type GoAwayReason uint8
763
764const (
765 // GoAwayInvalid indicates that no GoAway frame is received.
766 GoAwayInvalid GoAwayReason = 0
767 // GoAwayNoReason is the default value when GoAway frame is received.
768 GoAwayNoReason GoAwayReason = 1
769 // GoAwayTooManyPings indicates that a GoAway frame with
770 // ErrCodeEnhanceYourCalm was received and that the debug data said
771 // "too_many_pings".
772 GoAwayTooManyPings GoAwayReason = 2
773)
774
775// channelzData is used to store channelz related data for http2Client and http2Server.
776// These fields cannot be embedded in the original structs (e.g. http2Client), since to do atomic
777// operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment.
778// Here, by grouping those int64 fields inside a struct, we are enforcing the alignment.
779type channelzData struct {
780 kpCount int64
781 // The number of streams that have started, including already finished ones.
782 streamsStarted int64
783 // Client side: The number of streams that have ended successfully by receiving
784 // EoS bit set frame from server.
785 // Server side: The number of streams that have ended successfully by sending
786 // frame with EoS bit set.
787 streamsSucceeded int64
788 streamsFailed int64
789 // lastStreamCreatedTime stores the timestamp that the last stream gets created. It is of int64 type
790 // instead of time.Time since it's more costly to atomically update time.Time variable than int64
791 // variable. The same goes for lastMsgSentTime and lastMsgRecvTime.
792 lastStreamCreatedTime int64
793 msgSent int64
794 msgRecv int64
795 lastMsgSentTime int64
796 lastMsgRecvTime int64
797}
798
799// ContextErr converts the error from context package into a status error.
800func ContextErr(err error) error {
801 switch err {
802 case context.DeadlineExceeded:
803 return status.Error(codes.DeadlineExceeded, err.Error())
804 case context.Canceled:
805 return status.Error(codes.Canceled, err.Error())
806 }
807 return status.Errorf(codes.Internal, "Unexpected error from context packet: %v", err)
808}