blob: 0f33c9ca8f777885829ce0876745a18be0470d6b [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package transport defines and implements message oriented communication
20// channel to complete various transactions (e.g., an RPC). It is meant for
21// grpc-internal usage and is not intended to be imported directly by users.
22package transport
23
24import (
Abhilash S.L3b494632019-07-16 15:51:09 +053025 "bytes"
William Kurkianea869482019-04-09 15:16:11 -040026 "context"
27 "errors"
28 "fmt"
29 "io"
30 "net"
31 "sync"
32 "sync/atomic"
33
34 "google.golang.org/grpc/codes"
35 "google.golang.org/grpc/credentials"
36 "google.golang.org/grpc/keepalive"
37 "google.golang.org/grpc/metadata"
38 "google.golang.org/grpc/stats"
39 "google.golang.org/grpc/status"
40 "google.golang.org/grpc/tap"
41)
42
Abhilash S.L3b494632019-07-16 15:51:09 +053043type bufferPool struct {
44 pool sync.Pool
45}
46
47func newBufferPool() *bufferPool {
48 return &bufferPool{
49 pool: sync.Pool{
50 New: func() interface{} {
51 return new(bytes.Buffer)
52 },
53 },
54 }
55}
56
57func (p *bufferPool) get() *bytes.Buffer {
58 return p.pool.Get().(*bytes.Buffer)
59}
60
61func (p *bufferPool) put(b *bytes.Buffer) {
62 p.pool.Put(b)
63}
64
William Kurkianea869482019-04-09 15:16:11 -040065// recvMsg represents the received msg from the transport. All transport
66// protocol specific info has been removed.
67type recvMsg struct {
Abhilash S.L3b494632019-07-16 15:51:09 +053068 buffer *bytes.Buffer
William Kurkianea869482019-04-09 15:16:11 -040069 // nil: received some data
70 // io.EOF: stream is completed. data is nil.
71 // other non-nil error: transport failure. data is nil.
72 err error
73}
74
75// recvBuffer is an unbounded channel of recvMsg structs.
76// Note recvBuffer differs from controlBuffer only in that recvBuffer
77// holds a channel of only recvMsg structs instead of objects implementing "item" interface.
78// recvBuffer is written to much more often than
79// controlBuffer and using strict recvMsg structs helps avoid allocation in "recvBuffer.put"
80type recvBuffer struct {
81 c chan recvMsg
82 mu sync.Mutex
83 backlog []recvMsg
84 err error
85}
86
87func newRecvBuffer() *recvBuffer {
88 b := &recvBuffer{
89 c: make(chan recvMsg, 1),
90 }
91 return b
92}
93
94func (b *recvBuffer) put(r recvMsg) {
95 b.mu.Lock()
96 if b.err != nil {
97 b.mu.Unlock()
98 // An error had occurred earlier, don't accept more
99 // data or errors.
100 return
101 }
102 b.err = r.err
103 if len(b.backlog) == 0 {
104 select {
105 case b.c <- r:
106 b.mu.Unlock()
107 return
108 default:
109 }
110 }
111 b.backlog = append(b.backlog, r)
112 b.mu.Unlock()
113}
114
115func (b *recvBuffer) load() {
116 b.mu.Lock()
117 if len(b.backlog) > 0 {
118 select {
119 case b.c <- b.backlog[0]:
120 b.backlog[0] = recvMsg{}
121 b.backlog = b.backlog[1:]
122 default:
123 }
124 }
125 b.mu.Unlock()
126}
127
128// get returns the channel that receives a recvMsg in the buffer.
129//
130// Upon receipt of a recvMsg, the caller should call load to send another
131// recvMsg onto the channel if there is any.
132func (b *recvBuffer) get() <-chan recvMsg {
133 return b.c
134}
135
136// recvBufferReader implements io.Reader interface to read the data from
137// recvBuffer.
138type recvBufferReader struct {
139 closeStream func(error) // Closes the client transport stream with the given error and nil trailer metadata.
140 ctx context.Context
141 ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
142 recv *recvBuffer
Abhilash S.L3b494632019-07-16 15:51:09 +0530143 last *bytes.Buffer // Stores the remaining data in the previous calls.
William Kurkianea869482019-04-09 15:16:11 -0400144 err error
Abhilash S.L3b494632019-07-16 15:51:09 +0530145 freeBuffer func(*bytes.Buffer)
William Kurkianea869482019-04-09 15:16:11 -0400146}
147
148// Read reads the next len(p) bytes from last. If last is drained, it tries to
149// read additional data from recv. It blocks if there no additional data available
150// in recv. If Read returns any non-nil error, it will continue to return that error.
151func (r *recvBufferReader) Read(p []byte) (n int, err error) {
152 if r.err != nil {
153 return 0, r.err
154 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530155 if r.last != nil {
William Kurkianea869482019-04-09 15:16:11 -0400156 // Read remaining data left in last call.
Abhilash S.L3b494632019-07-16 15:51:09 +0530157 copied, _ := r.last.Read(p)
158 if r.last.Len() == 0 {
159 r.freeBuffer(r.last)
160 r.last = nil
161 }
William Kurkianea869482019-04-09 15:16:11 -0400162 return copied, nil
163 }
164 if r.closeStream != nil {
165 n, r.err = r.readClient(p)
166 } else {
167 n, r.err = r.read(p)
168 }
169 return n, r.err
170}
171
172func (r *recvBufferReader) read(p []byte) (n int, err error) {
173 select {
174 case <-r.ctxDone:
175 return 0, ContextErr(r.ctx.Err())
176 case m := <-r.recv.get():
177 return r.readAdditional(m, p)
178 }
179}
180
181func (r *recvBufferReader) readClient(p []byte) (n int, err error) {
182 // If the context is canceled, then closes the stream with nil metadata.
183 // closeStream writes its error parameter to r.recv as a recvMsg.
184 // r.readAdditional acts on that message and returns the necessary error.
185 select {
186 case <-r.ctxDone:
187 r.closeStream(ContextErr(r.ctx.Err()))
188 m := <-r.recv.get()
189 return r.readAdditional(m, p)
190 case m := <-r.recv.get():
191 return r.readAdditional(m, p)
192 }
193}
194
195func (r *recvBufferReader) readAdditional(m recvMsg, p []byte) (n int, err error) {
196 r.recv.load()
197 if m.err != nil {
198 return 0, m.err
199 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530200 copied, _ := m.buffer.Read(p)
201 if m.buffer.Len() == 0 {
202 r.freeBuffer(m.buffer)
203 r.last = nil
204 } else {
205 r.last = m.buffer
206 }
William Kurkianea869482019-04-09 15:16:11 -0400207 return copied, nil
208}
209
210type streamState uint32
211
212const (
213 streamActive streamState = iota
214 streamWriteDone // EndStream sent
215 streamReadDone // EndStream received
216 streamDone // the entire stream is finished.
217)
218
219// Stream represents an RPC in the transport layer.
220type Stream struct {
221 id uint32
222 st ServerTransport // nil for client side Stream
223 ctx context.Context // the associated context of the stream
224 cancel context.CancelFunc // always nil for client side Stream
225 done chan struct{} // closed at the end of stream to unblock writers. On the client side.
226 ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance)
227 method string // the associated RPC method of the stream
228 recvCompress string
229 sendCompress string
230 buf *recvBuffer
231 trReader io.Reader
232 fc *inFlow
233 wq *writeQuota
234
235 // Callback to state application's intentions to read data. This
236 // is used to adjust flow control, if needed.
237 requestRead func(int)
238
Abhilash S.L3b494632019-07-16 15:51:09 +0530239 headerChan chan struct{} // closed to indicate the end of header metadata.
240 headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
William Kurkianea869482019-04-09 15:16:11 -0400241
242 // hdrMu protects header and trailer metadata on the server-side.
243 hdrMu sync.Mutex
244 // On client side, header keeps the received header metadata.
245 //
246 // On server side, header keeps the header set by SetHeader(). The complete
247 // header will merged into this after t.WriteHeader() is called.
248 header metadata.MD
249 trailer metadata.MD // the key-value map of trailer metadata.
250
251 noHeaders bool // set if the client never received headers (set only after the stream is done).
252
253 // On the server-side, headerSent is atomically set to 1 when the headers are sent out.
254 headerSent uint32
255
256 state streamState
257
258 // On client-side it is the status error received from the server.
259 // On server-side it is unused.
260 status *status.Status
261
262 bytesReceived uint32 // indicates whether any bytes have been received on this stream
263 unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream
264
265 // contentSubtype is the content-subtype for requests.
266 // this must be lowercase or the behavior is undefined.
267 contentSubtype string
268}
269
270// isHeaderSent is only valid on the server-side.
271func (s *Stream) isHeaderSent() bool {
272 return atomic.LoadUint32(&s.headerSent) == 1
273}
274
275// updateHeaderSent updates headerSent and returns true
276// if it was alreay set. It is valid only on server-side.
277func (s *Stream) updateHeaderSent() bool {
278 return atomic.SwapUint32(&s.headerSent, 1) == 1
279}
280
281func (s *Stream) swapState(st streamState) streamState {
282 return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
283}
284
285func (s *Stream) compareAndSwapState(oldState, newState streamState) bool {
286 return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState))
287}
288
289func (s *Stream) getState() streamState {
290 return streamState(atomic.LoadUint32((*uint32)(&s.state)))
291}
292
293func (s *Stream) waitOnHeader() error {
294 if s.headerChan == nil {
295 // On the server headerChan is always nil since a stream originates
296 // only after having received headers.
297 return nil
298 }
299 select {
300 case <-s.ctx.Done():
301 return ContextErr(s.ctx.Err())
302 case <-s.headerChan:
303 return nil
304 }
305}
306
307// RecvCompress returns the compression algorithm applied to the inbound
308// message. It is empty string if there is no compression applied.
309func (s *Stream) RecvCompress() string {
310 if err := s.waitOnHeader(); err != nil {
311 return ""
312 }
313 return s.recvCompress
314}
315
316// SetSendCompress sets the compression algorithm to the stream.
317func (s *Stream) SetSendCompress(str string) {
318 s.sendCompress = str
319}
320
321// Done returns a channel which is closed when it receives the final status
322// from the server.
323func (s *Stream) Done() <-chan struct{} {
324 return s.done
325}
326
327// Header returns the header metadata of the stream.
328//
329// On client side, it acquires the key-value pairs of header metadata once it is
330// available. It blocks until i) the metadata is ready or ii) there is no header
331// metadata or iii) the stream is canceled/expired.
332//
333// On server side, it returns the out header after t.WriteHeader is called.
334func (s *Stream) Header() (metadata.MD, error) {
335 if s.headerChan == nil && s.header != nil {
336 // On server side, return the header in stream. It will be the out
337 // header after t.WriteHeader is called.
338 return s.header.Copy(), nil
339 }
340 err := s.waitOnHeader()
341 // Even if the stream is closed, header is returned if available.
342 select {
343 case <-s.headerChan:
344 if s.header == nil {
345 return nil, nil
346 }
347 return s.header.Copy(), nil
348 default:
349 }
350 return nil, err
351}
352
353// TrailersOnly blocks until a header or trailers-only frame is received and
354// then returns true if the stream was trailers-only. If the stream ends
355// before headers are received, returns true, nil. If a context error happens
356// first, returns it as a status error. Client-side only.
357func (s *Stream) TrailersOnly() (bool, error) {
358 err := s.waitOnHeader()
359 if err != nil {
360 return false, err
361 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530362 return s.noHeaders, nil
William Kurkianea869482019-04-09 15:16:11 -0400363}
364
365// Trailer returns the cached trailer metedata. Note that if it is not called
366// after the entire stream is done, it could return an empty MD. Client
367// side only.
368// It can be safely read only after stream has ended that is either read
369// or write have returned io.EOF.
370func (s *Stream) Trailer() metadata.MD {
371 c := s.trailer.Copy()
372 return c
373}
374
375// ContentSubtype returns the content-subtype for a request. For example, a
376// content-subtype of "proto" will result in a content-type of
377// "application/grpc+proto". This will always be lowercase. See
378// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
379// more details.
380func (s *Stream) ContentSubtype() string {
381 return s.contentSubtype
382}
383
384// Context returns the context of the stream.
385func (s *Stream) Context() context.Context {
386 return s.ctx
387}
388
389// Method returns the method for the stream.
390func (s *Stream) Method() string {
391 return s.method
392}
393
394// Status returns the status received from the server.
395// Status can be read safely only after the stream has ended,
396// that is, after Done() is closed.
397func (s *Stream) Status() *status.Status {
398 return s.status
399}
400
401// SetHeader sets the header metadata. This can be called multiple times.
402// Server side only.
403// This should not be called in parallel to other data writes.
404func (s *Stream) SetHeader(md metadata.MD) error {
405 if md.Len() == 0 {
406 return nil
407 }
408 if s.isHeaderSent() || s.getState() == streamDone {
409 return ErrIllegalHeaderWrite
410 }
411 s.hdrMu.Lock()
412 s.header = metadata.Join(s.header, md)
413 s.hdrMu.Unlock()
414 return nil
415}
416
417// SendHeader sends the given header metadata. The given metadata is
418// combined with any metadata set by previous calls to SetHeader and
419// then written to the transport stream.
420func (s *Stream) SendHeader(md metadata.MD) error {
421 return s.st.WriteHeader(s, md)
422}
423
424// SetTrailer sets the trailer metadata which will be sent with the RPC status
425// by the server. This can be called multiple times. Server side only.
426// This should not be called parallel to other data writes.
427func (s *Stream) SetTrailer(md metadata.MD) error {
428 if md.Len() == 0 {
429 return nil
430 }
431 if s.getState() == streamDone {
432 return ErrIllegalHeaderWrite
433 }
434 s.hdrMu.Lock()
435 s.trailer = metadata.Join(s.trailer, md)
436 s.hdrMu.Unlock()
437 return nil
438}
439
440func (s *Stream) write(m recvMsg) {
441 s.buf.put(m)
442}
443
444// Read reads all p bytes from the wire for this stream.
445func (s *Stream) Read(p []byte) (n int, err error) {
446 // Don't request a read if there was an error earlier
447 if er := s.trReader.(*transportReader).er; er != nil {
448 return 0, er
449 }
450 s.requestRead(len(p))
451 return io.ReadFull(s.trReader, p)
452}
453
454// tranportReader reads all the data available for this Stream from the transport and
455// passes them into the decoder, which converts them into a gRPC message stream.
456// The error is io.EOF when the stream is done or another non-nil error if
457// the stream broke.
458type transportReader struct {
459 reader io.Reader
460 // The handler to control the window update procedure for both this
461 // particular stream and the associated transport.
462 windowHandler func(int)
463 er error
464}
465
466func (t *transportReader) Read(p []byte) (n int, err error) {
467 n, err = t.reader.Read(p)
468 if err != nil {
469 t.er = err
470 return
471 }
472 t.windowHandler(n)
473 return
474}
475
476// BytesReceived indicates whether any bytes have been received on this stream.
477func (s *Stream) BytesReceived() bool {
478 return atomic.LoadUint32(&s.bytesReceived) == 1
479}
480
481// Unprocessed indicates whether the server did not process this stream --
482// i.e. it sent a refused stream or GOAWAY including this stream ID.
483func (s *Stream) Unprocessed() bool {
484 return atomic.LoadUint32(&s.unprocessed) == 1
485}
486
487// GoString is implemented by Stream so context.String() won't
488// race when printing %#v.
489func (s *Stream) GoString() string {
490 return fmt.Sprintf("<stream: %p, %v>", s, s.method)
491}
492
493// state of transport
494type transportState int
495
496const (
497 reachable transportState = iota
498 closing
499 draining
500)
501
502// ServerConfig consists of all the configurations to establish a server transport.
503type ServerConfig struct {
504 MaxStreams uint32
505 AuthInfo credentials.AuthInfo
506 InTapHandle tap.ServerInHandle
507 StatsHandler stats.Handler
508 KeepaliveParams keepalive.ServerParameters
509 KeepalivePolicy keepalive.EnforcementPolicy
510 InitialWindowSize int32
511 InitialConnWindowSize int32
512 WriteBufferSize int
513 ReadBufferSize int
514 ChannelzParentID int64
515 MaxHeaderListSize *uint32
516}
517
518// NewServerTransport creates a ServerTransport with conn or non-nil error
519// if it fails.
520func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) {
521 return newHTTP2Server(conn, config)
522}
523
524// ConnectOptions covers all relevant options for communicating with the server.
525type ConnectOptions struct {
526 // UserAgent is the application user agent.
527 UserAgent string
528 // Dialer specifies how to dial a network address.
529 Dialer func(context.Context, string) (net.Conn, error)
530 // FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
531 FailOnNonTempDialError bool
532 // PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
533 PerRPCCredentials []credentials.PerRPCCredentials
534 // TransportCredentials stores the Authenticator required to setup a client
535 // connection. Only one of TransportCredentials and CredsBundle is non-nil.
536 TransportCredentials credentials.TransportCredentials
537 // CredsBundle is the credentials bundle to be used. Only one of
538 // TransportCredentials and CredsBundle is non-nil.
539 CredsBundle credentials.Bundle
540 // KeepaliveParams stores the keepalive parameters.
541 KeepaliveParams keepalive.ClientParameters
542 // StatsHandler stores the handler for stats.
543 StatsHandler stats.Handler
544 // InitialWindowSize sets the initial window size for a stream.
545 InitialWindowSize int32
546 // InitialConnWindowSize sets the initial window size for a connection.
547 InitialConnWindowSize int32
548 // WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
549 WriteBufferSize int
550 // ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
551 ReadBufferSize int
552 // ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
553 ChannelzParentID int64
554 // MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
555 MaxHeaderListSize *uint32
556}
557
558// TargetInfo contains the information of the target such as network address and metadata.
559type TargetInfo struct {
560 Addr string
561 Metadata interface{}
562 Authority string
563}
564
565// NewClientTransport establishes the transport with the required ConnectOptions
566// and returns it to the caller.
567func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
568 return newHTTP2Client(connectCtx, ctx, target, opts, onPrefaceReceipt, onGoAway, onClose)
569}
570
571// Options provides additional hints and information for message
572// transmission.
573type Options struct {
574 // Last indicates whether this write is the last piece for
575 // this stream.
576 Last bool
577}
578
579// CallHdr carries the information of a particular RPC.
580type CallHdr struct {
581 // Host specifies the peer's host.
582 Host string
583
584 // Method specifies the operation to perform.
585 Method string
586
587 // SendCompress specifies the compression algorithm applied on
588 // outbound message.
589 SendCompress string
590
591 // Creds specifies credentials.PerRPCCredentials for a call.
592 Creds credentials.PerRPCCredentials
593
594 // ContentSubtype specifies the content-subtype for a request. For example, a
595 // content-subtype of "proto" will result in a content-type of
596 // "application/grpc+proto". The value of ContentSubtype must be all
597 // lowercase, otherwise the behavior is undefined. See
598 // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
599 // for more details.
600 ContentSubtype string
601
602 PreviousAttempts int // value of grpc-previous-rpc-attempts header to set
603}
604
605// ClientTransport is the common interface for all gRPC client-side transport
606// implementations.
607type ClientTransport interface {
608 // Close tears down this transport. Once it returns, the transport
609 // should not be accessed any more. The caller must make sure this
610 // is called only once.
611 Close() error
612
Abhilash S.L3b494632019-07-16 15:51:09 +0530613 // GracefulClose starts to tear down the transport: the transport will stop
614 // accepting new RPCs and NewStream will return error. Once all streams are
615 // finished, the transport will close.
616 //
617 // It does not block.
618 GracefulClose()
William Kurkianea869482019-04-09 15:16:11 -0400619
620 // Write sends the data for the given stream. A nil stream indicates
621 // the write is to be performed on the transport as a whole.
622 Write(s *Stream, hdr []byte, data []byte, opts *Options) error
623
624 // NewStream creates a Stream for an RPC.
625 NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
626
627 // CloseStream clears the footprint of a stream when the stream is
628 // not needed any more. The err indicates the error incurred when
629 // CloseStream is called. Must be called when a stream is finished
630 // unless the associated transport is closing.
631 CloseStream(stream *Stream, err error)
632
633 // Error returns a channel that is closed when some I/O error
634 // happens. Typically the caller should have a goroutine to monitor
635 // this in order to take action (e.g., close the current transport
636 // and create a new one) in error case. It should not return nil
637 // once the transport is initiated.
638 Error() <-chan struct{}
639
640 // GoAway returns a channel that is closed when ClientTransport
641 // receives the draining signal from the server (e.g., GOAWAY frame in
642 // HTTP/2).
643 GoAway() <-chan struct{}
644
645 // GetGoAwayReason returns the reason why GoAway frame was received.
646 GetGoAwayReason() GoAwayReason
647
Abhilash S.L3b494632019-07-16 15:51:09 +0530648 // RemoteAddr returns the remote network address.
649 RemoteAddr() net.Addr
650
William Kurkianea869482019-04-09 15:16:11 -0400651 // IncrMsgSent increments the number of message sent through this transport.
652 IncrMsgSent()
653
654 // IncrMsgRecv increments the number of message received through this transport.
655 IncrMsgRecv()
656}
657
658// ServerTransport is the common interface for all gRPC server-side transport
659// implementations.
660//
661// Methods may be called concurrently from multiple goroutines, but
662// Write methods for a given Stream will be called serially.
663type ServerTransport interface {
664 // HandleStreams receives incoming streams using the given handler.
665 HandleStreams(func(*Stream), func(context.Context, string) context.Context)
666
667 // WriteHeader sends the header metadata for the given stream.
668 // WriteHeader may not be called on all streams.
669 WriteHeader(s *Stream, md metadata.MD) error
670
671 // Write sends the data for the given stream.
672 // Write may not be called on all streams.
673 Write(s *Stream, hdr []byte, data []byte, opts *Options) error
674
675 // WriteStatus sends the status of a stream to the client. WriteStatus is
676 // the final call made on a stream and always occurs.
677 WriteStatus(s *Stream, st *status.Status) error
678
679 // Close tears down the transport. Once it is called, the transport
680 // should not be accessed any more. All the pending streams and their
681 // handlers will be terminated asynchronously.
682 Close() error
683
684 // RemoteAddr returns the remote network address.
685 RemoteAddr() net.Addr
686
687 // Drain notifies the client this ServerTransport stops accepting new RPCs.
688 Drain()
689
690 // IncrMsgSent increments the number of message sent through this transport.
691 IncrMsgSent()
692
693 // IncrMsgRecv increments the number of message received through this transport.
694 IncrMsgRecv()
695}
696
697// connectionErrorf creates an ConnectionError with the specified error description.
698func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
699 return ConnectionError{
700 Desc: fmt.Sprintf(format, a...),
701 temp: temp,
702 err: e,
703 }
704}
705
706// ConnectionError is an error that results in the termination of the
707// entire connection and the retry of all the active streams.
708type ConnectionError struct {
709 Desc string
710 temp bool
711 err error
712}
713
714func (e ConnectionError) Error() string {
715 return fmt.Sprintf("connection error: desc = %q", e.Desc)
716}
717
718// Temporary indicates if this connection error is temporary or fatal.
719func (e ConnectionError) Temporary() bool {
720 return e.temp
721}
722
723// Origin returns the original error of this connection error.
724func (e ConnectionError) Origin() error {
725 // Never return nil error here.
726 // If the original error is nil, return itself.
727 if e.err == nil {
728 return e
729 }
730 return e.err
731}
732
733var (
734 // ErrConnClosing indicates that the transport is closing.
735 ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
736 // errStreamDrain indicates that the stream is rejected because the
737 // connection is draining. This could be caused by goaway or balancer
738 // removing the address.
739 errStreamDrain = status.Error(codes.Unavailable, "the connection is draining")
740 // errStreamDone is returned from write at the client side to indiacte application
741 // layer of an error.
742 errStreamDone = errors.New("the stream is done")
743 // StatusGoAway indicates that the server sent a GOAWAY that included this
744 // stream's ID in unprocessed RPCs.
745 statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
746)
747
748// GoAwayReason contains the reason for the GoAway frame received.
749type GoAwayReason uint8
750
751const (
752 // GoAwayInvalid indicates that no GoAway frame is received.
753 GoAwayInvalid GoAwayReason = 0
754 // GoAwayNoReason is the default value when GoAway frame is received.
755 GoAwayNoReason GoAwayReason = 1
756 // GoAwayTooManyPings indicates that a GoAway frame with
757 // ErrCodeEnhanceYourCalm was received and that the debug data said
758 // "too_many_pings".
759 GoAwayTooManyPings GoAwayReason = 2
760)
761
762// channelzData is used to store channelz related data for http2Client and http2Server.
763// These fields cannot be embedded in the original structs (e.g. http2Client), since to do atomic
764// operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment.
765// Here, by grouping those int64 fields inside a struct, we are enforcing the alignment.
766type channelzData struct {
767 kpCount int64
768 // The number of streams that have started, including already finished ones.
769 streamsStarted int64
770 // Client side: The number of streams that have ended successfully by receiving
771 // EoS bit set frame from server.
772 // Server side: The number of streams that have ended successfully by sending
773 // frame with EoS bit set.
774 streamsSucceeded int64
775 streamsFailed int64
776 // lastStreamCreatedTime stores the timestamp that the last stream gets created. It is of int64 type
777 // instead of time.Time since it's more costly to atomically update time.Time variable than int64
778 // variable. The same goes for lastMsgSentTime and lastMsgRecvTime.
779 lastStreamCreatedTime int64
780 msgSent int64
781 msgRecv int64
782 lastMsgSentTime int64
783 lastMsgRecvTime int64
784}
785
786// ContextErr converts the error from context package into a status error.
787func ContextErr(err error) error {
788 switch err {
789 case context.DeadlineExceeded:
790 return status.Error(codes.DeadlineExceeded, err.Error())
791 case context.Canceled:
792 return status.Error(codes.Canceled, err.Error())
793 }
794 return status.Errorf(codes.Internal, "Unexpected error from context packet: %v", err)
795}