blob: 1c1d106709ac0611ab794c96c4e26ada11e4065c [file] [log] [blame]
Zack Williamse940c7a2019-08-21 14:25:39 -07001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package transport defines and implements message oriented communication
20// channel to complete various transactions (e.g., an RPC). It is meant for
21// grpc-internal usage and is not intended to be imported directly by users.
22package transport
23
24import (
divyadesai19009132020-03-04 12:58:08 +000025 "bytes"
Zack Williamse940c7a2019-08-21 14:25:39 -070026 "context"
27 "errors"
28 "fmt"
29 "io"
30 "net"
31 "sync"
32 "sync/atomic"
33
34 "google.golang.org/grpc/codes"
35 "google.golang.org/grpc/credentials"
36 "google.golang.org/grpc/keepalive"
37 "google.golang.org/grpc/metadata"
38 "google.golang.org/grpc/stats"
39 "google.golang.org/grpc/status"
40 "google.golang.org/grpc/tap"
41)
42
divyadesai19009132020-03-04 12:58:08 +000043type bufferPool struct {
44 pool sync.Pool
45}
46
47func newBufferPool() *bufferPool {
48 return &bufferPool{
49 pool: sync.Pool{
50 New: func() interface{} {
51 return new(bytes.Buffer)
52 },
53 },
54 }
55}
56
57func (p *bufferPool) get() *bytes.Buffer {
58 return p.pool.Get().(*bytes.Buffer)
59}
60
61func (p *bufferPool) put(b *bytes.Buffer) {
62 p.pool.Put(b)
63}
64
Zack Williamse940c7a2019-08-21 14:25:39 -070065// recvMsg represents the received msg from the transport. All transport
66// protocol specific info has been removed.
67type recvMsg struct {
divyadesai19009132020-03-04 12:58:08 +000068 buffer *bytes.Buffer
Zack Williamse940c7a2019-08-21 14:25:39 -070069 // nil: received some data
70 // io.EOF: stream is completed. data is nil.
71 // other non-nil error: transport failure. data is nil.
72 err error
73}
74
75// recvBuffer is an unbounded channel of recvMsg structs.
76// Note recvBuffer differs from controlBuffer only in that recvBuffer
77// holds a channel of only recvMsg structs instead of objects implementing "item" interface.
78// recvBuffer is written to much more often than
79// controlBuffer and using strict recvMsg structs helps avoid allocation in "recvBuffer.put"
80type recvBuffer struct {
81 c chan recvMsg
82 mu sync.Mutex
83 backlog []recvMsg
84 err error
85}
86
87func newRecvBuffer() *recvBuffer {
88 b := &recvBuffer{
89 c: make(chan recvMsg, 1),
90 }
91 return b
92}
93
94func (b *recvBuffer) put(r recvMsg) {
95 b.mu.Lock()
96 if b.err != nil {
97 b.mu.Unlock()
98 // An error had occurred earlier, don't accept more
99 // data or errors.
100 return
101 }
102 b.err = r.err
103 if len(b.backlog) == 0 {
104 select {
105 case b.c <- r:
106 b.mu.Unlock()
107 return
108 default:
109 }
110 }
111 b.backlog = append(b.backlog, r)
112 b.mu.Unlock()
113}
114
115func (b *recvBuffer) load() {
116 b.mu.Lock()
117 if len(b.backlog) > 0 {
118 select {
119 case b.c <- b.backlog[0]:
120 b.backlog[0] = recvMsg{}
121 b.backlog = b.backlog[1:]
122 default:
123 }
124 }
125 b.mu.Unlock()
126}
127
128// get returns the channel that receives a recvMsg in the buffer.
129//
130// Upon receipt of a recvMsg, the caller should call load to send another
131// recvMsg onto the channel if there is any.
132func (b *recvBuffer) get() <-chan recvMsg {
133 return b.c
134}
135
136// recvBufferReader implements io.Reader interface to read the data from
137// recvBuffer.
138type recvBufferReader struct {
139 closeStream func(error) // Closes the client transport stream with the given error and nil trailer metadata.
140 ctx context.Context
141 ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
142 recv *recvBuffer
divyadesai19009132020-03-04 12:58:08 +0000143 last *bytes.Buffer // Stores the remaining data in the previous calls.
Zack Williamse940c7a2019-08-21 14:25:39 -0700144 err error
divyadesai19009132020-03-04 12:58:08 +0000145 freeBuffer func(*bytes.Buffer)
Zack Williamse940c7a2019-08-21 14:25:39 -0700146}
147
148// Read reads the next len(p) bytes from last. If last is drained, it tries to
149// read additional data from recv. It blocks if there no additional data available
150// in recv. If Read returns any non-nil error, it will continue to return that error.
151func (r *recvBufferReader) Read(p []byte) (n int, err error) {
152 if r.err != nil {
153 return 0, r.err
154 }
divyadesai19009132020-03-04 12:58:08 +0000155 if r.last != nil {
Zack Williamse940c7a2019-08-21 14:25:39 -0700156 // Read remaining data left in last call.
divyadesai19009132020-03-04 12:58:08 +0000157 copied, _ := r.last.Read(p)
158 if r.last.Len() == 0 {
159 r.freeBuffer(r.last)
160 r.last = nil
161 }
Zack Williamse940c7a2019-08-21 14:25:39 -0700162 return copied, nil
163 }
164 if r.closeStream != nil {
165 n, r.err = r.readClient(p)
166 } else {
167 n, r.err = r.read(p)
168 }
169 return n, r.err
170}
171
172func (r *recvBufferReader) read(p []byte) (n int, err error) {
173 select {
174 case <-r.ctxDone:
175 return 0, ContextErr(r.ctx.Err())
176 case m := <-r.recv.get():
177 return r.readAdditional(m, p)
178 }
179}
180
181func (r *recvBufferReader) readClient(p []byte) (n int, err error) {
182 // If the context is canceled, then closes the stream with nil metadata.
183 // closeStream writes its error parameter to r.recv as a recvMsg.
184 // r.readAdditional acts on that message and returns the necessary error.
185 select {
186 case <-r.ctxDone:
divyadesai19009132020-03-04 12:58:08 +0000187 // Note that this adds the ctx error to the end of recv buffer, and
188 // reads from the head. This will delay the error until recv buffer is
189 // empty, thus will delay ctx cancellation in Recv().
190 //
191 // It's done this way to fix a race between ctx cancel and trailer. The
192 // race was, stream.Recv() may return ctx error if ctxDone wins the
193 // race, but stream.Trailer() may return a non-nil md because the stream
194 // was not marked as done when trailer is received. This closeStream
195 // call will mark stream as done, thus fix the race.
196 //
197 // TODO: delaying ctx error seems like a unnecessary side effect. What
198 // we really want is to mark the stream as done, and return ctx error
199 // faster.
Zack Williamse940c7a2019-08-21 14:25:39 -0700200 r.closeStream(ContextErr(r.ctx.Err()))
201 m := <-r.recv.get()
202 return r.readAdditional(m, p)
203 case m := <-r.recv.get():
204 return r.readAdditional(m, p)
205 }
206}
207
208func (r *recvBufferReader) readAdditional(m recvMsg, p []byte) (n int, err error) {
209 r.recv.load()
210 if m.err != nil {
211 return 0, m.err
212 }
divyadesai19009132020-03-04 12:58:08 +0000213 copied, _ := m.buffer.Read(p)
214 if m.buffer.Len() == 0 {
215 r.freeBuffer(m.buffer)
216 r.last = nil
217 } else {
218 r.last = m.buffer
219 }
Zack Williamse940c7a2019-08-21 14:25:39 -0700220 return copied, nil
221}
222
223type streamState uint32
224
225const (
226 streamActive streamState = iota
227 streamWriteDone // EndStream sent
228 streamReadDone // EndStream received
229 streamDone // the entire stream is finished.
230)
231
232// Stream represents an RPC in the transport layer.
233type Stream struct {
234 id uint32
235 st ServerTransport // nil for client side Stream
236 ctx context.Context // the associated context of the stream
237 cancel context.CancelFunc // always nil for client side Stream
238 done chan struct{} // closed at the end of stream to unblock writers. On the client side.
239 ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance)
240 method string // the associated RPC method of the stream
241 recvCompress string
242 sendCompress string
243 buf *recvBuffer
244 trReader io.Reader
245 fc *inFlow
246 wq *writeQuota
247
248 // Callback to state application's intentions to read data. This
249 // is used to adjust flow control, if needed.
250 requestRead func(int)
251
divyadesai19009132020-03-04 12:58:08 +0000252 headerChan chan struct{} // closed to indicate the end of header metadata.
253 headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
Zack Williamse940c7a2019-08-21 14:25:39 -0700254
255 // hdrMu protects header and trailer metadata on the server-side.
256 hdrMu sync.Mutex
257 // On client side, header keeps the received header metadata.
258 //
259 // On server side, header keeps the header set by SetHeader(). The complete
260 // header will merged into this after t.WriteHeader() is called.
261 header metadata.MD
262 trailer metadata.MD // the key-value map of trailer metadata.
263
264 noHeaders bool // set if the client never received headers (set only after the stream is done).
265
266 // On the server-side, headerSent is atomically set to 1 when the headers are sent out.
267 headerSent uint32
268
269 state streamState
270
271 // On client-side it is the status error received from the server.
272 // On server-side it is unused.
273 status *status.Status
274
275 bytesReceived uint32 // indicates whether any bytes have been received on this stream
276 unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream
277
278 // contentSubtype is the content-subtype for requests.
279 // this must be lowercase or the behavior is undefined.
280 contentSubtype string
281}
282
283// isHeaderSent is only valid on the server-side.
284func (s *Stream) isHeaderSent() bool {
285 return atomic.LoadUint32(&s.headerSent) == 1
286}
287
288// updateHeaderSent updates headerSent and returns true
289// if it was alreay set. It is valid only on server-side.
290func (s *Stream) updateHeaderSent() bool {
291 return atomic.SwapUint32(&s.headerSent, 1) == 1
292}
293
294func (s *Stream) swapState(st streamState) streamState {
295 return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
296}
297
298func (s *Stream) compareAndSwapState(oldState, newState streamState) bool {
299 return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState))
300}
301
302func (s *Stream) getState() streamState {
303 return streamState(atomic.LoadUint32((*uint32)(&s.state)))
304}
305
306func (s *Stream) waitOnHeader() error {
307 if s.headerChan == nil {
308 // On the server headerChan is always nil since a stream originates
309 // only after having received headers.
310 return nil
311 }
312 select {
313 case <-s.ctx.Done():
divyadesai19009132020-03-04 12:58:08 +0000314 // We prefer success over failure when reading messages because we delay
315 // context error in stream.Read(). To keep behavior consistent, we also
316 // prefer success here.
317 select {
318 case <-s.headerChan:
319 return nil
320 default:
321 }
Zack Williamse940c7a2019-08-21 14:25:39 -0700322 return ContextErr(s.ctx.Err())
323 case <-s.headerChan:
324 return nil
325 }
326}
327
328// RecvCompress returns the compression algorithm applied to the inbound
329// message. It is empty string if there is no compression applied.
330func (s *Stream) RecvCompress() string {
331 if err := s.waitOnHeader(); err != nil {
332 return ""
333 }
334 return s.recvCompress
335}
336
337// SetSendCompress sets the compression algorithm to the stream.
338func (s *Stream) SetSendCompress(str string) {
339 s.sendCompress = str
340}
341
342// Done returns a channel which is closed when it receives the final status
343// from the server.
344func (s *Stream) Done() <-chan struct{} {
345 return s.done
346}
347
348// Header returns the header metadata of the stream.
349//
350// On client side, it acquires the key-value pairs of header metadata once it is
351// available. It blocks until i) the metadata is ready or ii) there is no header
352// metadata or iii) the stream is canceled/expired.
353//
354// On server side, it returns the out header after t.WriteHeader is called.
355func (s *Stream) Header() (metadata.MD, error) {
356 if s.headerChan == nil && s.header != nil {
357 // On server side, return the header in stream. It will be the out
358 // header after t.WriteHeader is called.
359 return s.header.Copy(), nil
360 }
361 err := s.waitOnHeader()
362 // Even if the stream is closed, header is returned if available.
363 select {
364 case <-s.headerChan:
365 if s.header == nil {
366 return nil, nil
367 }
368 return s.header.Copy(), nil
369 default:
370 }
371 return nil, err
372}
373
374// TrailersOnly blocks until a header or trailers-only frame is received and
375// then returns true if the stream was trailers-only. If the stream ends
376// before headers are received, returns true, nil. If a context error happens
377// first, returns it as a status error. Client-side only.
378func (s *Stream) TrailersOnly() (bool, error) {
379 err := s.waitOnHeader()
380 if err != nil {
381 return false, err
382 }
383 return s.noHeaders, nil
384}
385
386// Trailer returns the cached trailer metedata. Note that if it is not called
387// after the entire stream is done, it could return an empty MD. Client
388// side only.
389// It can be safely read only after stream has ended that is either read
390// or write have returned io.EOF.
391func (s *Stream) Trailer() metadata.MD {
392 c := s.trailer.Copy()
393 return c
394}
395
396// ContentSubtype returns the content-subtype for a request. For example, a
397// content-subtype of "proto" will result in a content-type of
398// "application/grpc+proto". This will always be lowercase. See
399// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
400// more details.
401func (s *Stream) ContentSubtype() string {
402 return s.contentSubtype
403}
404
405// Context returns the context of the stream.
406func (s *Stream) Context() context.Context {
407 return s.ctx
408}
409
410// Method returns the method for the stream.
411func (s *Stream) Method() string {
412 return s.method
413}
414
415// Status returns the status received from the server.
416// Status can be read safely only after the stream has ended,
417// that is, after Done() is closed.
418func (s *Stream) Status() *status.Status {
419 return s.status
420}
421
422// SetHeader sets the header metadata. This can be called multiple times.
423// Server side only.
424// This should not be called in parallel to other data writes.
425func (s *Stream) SetHeader(md metadata.MD) error {
426 if md.Len() == 0 {
427 return nil
428 }
429 if s.isHeaderSent() || s.getState() == streamDone {
430 return ErrIllegalHeaderWrite
431 }
432 s.hdrMu.Lock()
433 s.header = metadata.Join(s.header, md)
434 s.hdrMu.Unlock()
435 return nil
436}
437
438// SendHeader sends the given header metadata. The given metadata is
439// combined with any metadata set by previous calls to SetHeader and
440// then written to the transport stream.
441func (s *Stream) SendHeader(md metadata.MD) error {
442 return s.st.WriteHeader(s, md)
443}
444
445// SetTrailer sets the trailer metadata which will be sent with the RPC status
446// by the server. This can be called multiple times. Server side only.
447// This should not be called parallel to other data writes.
448func (s *Stream) SetTrailer(md metadata.MD) error {
449 if md.Len() == 0 {
450 return nil
451 }
452 if s.getState() == streamDone {
453 return ErrIllegalHeaderWrite
454 }
455 s.hdrMu.Lock()
456 s.trailer = metadata.Join(s.trailer, md)
457 s.hdrMu.Unlock()
458 return nil
459}
460
461func (s *Stream) write(m recvMsg) {
462 s.buf.put(m)
463}
464
465// Read reads all p bytes from the wire for this stream.
466func (s *Stream) Read(p []byte) (n int, err error) {
467 // Don't request a read if there was an error earlier
468 if er := s.trReader.(*transportReader).er; er != nil {
469 return 0, er
470 }
471 s.requestRead(len(p))
472 return io.ReadFull(s.trReader, p)
473}
474
475// tranportReader reads all the data available for this Stream from the transport and
476// passes them into the decoder, which converts them into a gRPC message stream.
477// The error is io.EOF when the stream is done or another non-nil error if
478// the stream broke.
479type transportReader struct {
480 reader io.Reader
481 // The handler to control the window update procedure for both this
482 // particular stream and the associated transport.
483 windowHandler func(int)
484 er error
485}
486
487func (t *transportReader) Read(p []byte) (n int, err error) {
488 n, err = t.reader.Read(p)
489 if err != nil {
490 t.er = err
491 return
492 }
493 t.windowHandler(n)
494 return
495}
496
497// BytesReceived indicates whether any bytes have been received on this stream.
498func (s *Stream) BytesReceived() bool {
499 return atomic.LoadUint32(&s.bytesReceived) == 1
500}
501
502// Unprocessed indicates whether the server did not process this stream --
503// i.e. it sent a refused stream or GOAWAY including this stream ID.
504func (s *Stream) Unprocessed() bool {
505 return atomic.LoadUint32(&s.unprocessed) == 1
506}
507
508// GoString is implemented by Stream so context.String() won't
509// race when printing %#v.
510func (s *Stream) GoString() string {
511 return fmt.Sprintf("<stream: %p, %v>", s, s.method)
512}
513
514// state of transport
515type transportState int
516
517const (
518 reachable transportState = iota
519 closing
520 draining
521)
522
523// ServerConfig consists of all the configurations to establish a server transport.
524type ServerConfig struct {
525 MaxStreams uint32
526 AuthInfo credentials.AuthInfo
527 InTapHandle tap.ServerInHandle
528 StatsHandler stats.Handler
529 KeepaliveParams keepalive.ServerParameters
530 KeepalivePolicy keepalive.EnforcementPolicy
531 InitialWindowSize int32
532 InitialConnWindowSize int32
533 WriteBufferSize int
534 ReadBufferSize int
535 ChannelzParentID int64
536 MaxHeaderListSize *uint32
537}
538
539// NewServerTransport creates a ServerTransport with conn or non-nil error
540// if it fails.
541func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) {
542 return newHTTP2Server(conn, config)
543}
544
545// ConnectOptions covers all relevant options for communicating with the server.
546type ConnectOptions struct {
547 // UserAgent is the application user agent.
548 UserAgent string
549 // Dialer specifies how to dial a network address.
550 Dialer func(context.Context, string) (net.Conn, error)
551 // FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
552 FailOnNonTempDialError bool
553 // PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
554 PerRPCCredentials []credentials.PerRPCCredentials
555 // TransportCredentials stores the Authenticator required to setup a client
556 // connection. Only one of TransportCredentials and CredsBundle is non-nil.
557 TransportCredentials credentials.TransportCredentials
558 // CredsBundle is the credentials bundle to be used. Only one of
559 // TransportCredentials and CredsBundle is non-nil.
560 CredsBundle credentials.Bundle
561 // KeepaliveParams stores the keepalive parameters.
562 KeepaliveParams keepalive.ClientParameters
563 // StatsHandler stores the handler for stats.
564 StatsHandler stats.Handler
565 // InitialWindowSize sets the initial window size for a stream.
566 InitialWindowSize int32
567 // InitialConnWindowSize sets the initial window size for a connection.
568 InitialConnWindowSize int32
569 // WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
570 WriteBufferSize int
571 // ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
572 ReadBufferSize int
573 // ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
574 ChannelzParentID int64
575 // MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
576 MaxHeaderListSize *uint32
577}
578
579// TargetInfo contains the information of the target such as network address and metadata.
580type TargetInfo struct {
581 Addr string
582 Metadata interface{}
583 Authority string
584}
585
586// NewClientTransport establishes the transport with the required ConnectOptions
587// and returns it to the caller.
588func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
589 return newHTTP2Client(connectCtx, ctx, target, opts, onPrefaceReceipt, onGoAway, onClose)
590}
591
592// Options provides additional hints and information for message
593// transmission.
594type Options struct {
595 // Last indicates whether this write is the last piece for
596 // this stream.
597 Last bool
598}
599
600// CallHdr carries the information of a particular RPC.
601type CallHdr struct {
602 // Host specifies the peer's host.
603 Host string
604
605 // Method specifies the operation to perform.
606 Method string
607
608 // SendCompress specifies the compression algorithm applied on
609 // outbound message.
610 SendCompress string
611
612 // Creds specifies credentials.PerRPCCredentials for a call.
613 Creds credentials.PerRPCCredentials
614
615 // ContentSubtype specifies the content-subtype for a request. For example, a
616 // content-subtype of "proto" will result in a content-type of
617 // "application/grpc+proto". The value of ContentSubtype must be all
618 // lowercase, otherwise the behavior is undefined. See
619 // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
620 // for more details.
621 ContentSubtype string
622
623 PreviousAttempts int // value of grpc-previous-rpc-attempts header to set
624}
625
626// ClientTransport is the common interface for all gRPC client-side transport
627// implementations.
628type ClientTransport interface {
629 // Close tears down this transport. Once it returns, the transport
630 // should not be accessed any more. The caller must make sure this
631 // is called only once.
632 Close() error
633
634 // GracefulClose starts to tear down the transport: the transport will stop
635 // accepting new RPCs and NewStream will return error. Once all streams are
636 // finished, the transport will close.
637 //
638 // It does not block.
639 GracefulClose()
640
641 // Write sends the data for the given stream. A nil stream indicates
642 // the write is to be performed on the transport as a whole.
643 Write(s *Stream, hdr []byte, data []byte, opts *Options) error
644
645 // NewStream creates a Stream for an RPC.
646 NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
647
648 // CloseStream clears the footprint of a stream when the stream is
649 // not needed any more. The err indicates the error incurred when
650 // CloseStream is called. Must be called when a stream is finished
651 // unless the associated transport is closing.
652 CloseStream(stream *Stream, err error)
653
654 // Error returns a channel that is closed when some I/O error
655 // happens. Typically the caller should have a goroutine to monitor
656 // this in order to take action (e.g., close the current transport
657 // and create a new one) in error case. It should not return nil
658 // once the transport is initiated.
659 Error() <-chan struct{}
660
661 // GoAway returns a channel that is closed when ClientTransport
662 // receives the draining signal from the server (e.g., GOAWAY frame in
663 // HTTP/2).
664 GoAway() <-chan struct{}
665
666 // GetGoAwayReason returns the reason why GoAway frame was received.
667 GetGoAwayReason() GoAwayReason
668
669 // RemoteAddr returns the remote network address.
670 RemoteAddr() net.Addr
671
672 // IncrMsgSent increments the number of message sent through this transport.
673 IncrMsgSent()
674
675 // IncrMsgRecv increments the number of message received through this transport.
676 IncrMsgRecv()
677}
678
679// ServerTransport is the common interface for all gRPC server-side transport
680// implementations.
681//
682// Methods may be called concurrently from multiple goroutines, but
683// Write methods for a given Stream will be called serially.
684type ServerTransport interface {
685 // HandleStreams receives incoming streams using the given handler.
686 HandleStreams(func(*Stream), func(context.Context, string) context.Context)
687
688 // WriteHeader sends the header metadata for the given stream.
689 // WriteHeader may not be called on all streams.
690 WriteHeader(s *Stream, md metadata.MD) error
691
692 // Write sends the data for the given stream.
693 // Write may not be called on all streams.
694 Write(s *Stream, hdr []byte, data []byte, opts *Options) error
695
696 // WriteStatus sends the status of a stream to the client. WriteStatus is
697 // the final call made on a stream and always occurs.
698 WriteStatus(s *Stream, st *status.Status) error
699
700 // Close tears down the transport. Once it is called, the transport
701 // should not be accessed any more. All the pending streams and their
702 // handlers will be terminated asynchronously.
703 Close() error
704
705 // RemoteAddr returns the remote network address.
706 RemoteAddr() net.Addr
707
708 // Drain notifies the client this ServerTransport stops accepting new RPCs.
709 Drain()
710
711 // IncrMsgSent increments the number of message sent through this transport.
712 IncrMsgSent()
713
714 // IncrMsgRecv increments the number of message received through this transport.
715 IncrMsgRecv()
716}
717
718// connectionErrorf creates an ConnectionError with the specified error description.
719func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
720 return ConnectionError{
721 Desc: fmt.Sprintf(format, a...),
722 temp: temp,
723 err: e,
724 }
725}
726
727// ConnectionError is an error that results in the termination of the
728// entire connection and the retry of all the active streams.
729type ConnectionError struct {
730 Desc string
731 temp bool
732 err error
733}
734
735func (e ConnectionError) Error() string {
736 return fmt.Sprintf("connection error: desc = %q", e.Desc)
737}
738
739// Temporary indicates if this connection error is temporary or fatal.
740func (e ConnectionError) Temporary() bool {
741 return e.temp
742}
743
744// Origin returns the original error of this connection error.
745func (e ConnectionError) Origin() error {
746 // Never return nil error here.
747 // If the original error is nil, return itself.
748 if e.err == nil {
749 return e
750 }
751 return e.err
752}
753
754var (
755 // ErrConnClosing indicates that the transport is closing.
756 ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
757 // errStreamDrain indicates that the stream is rejected because the
758 // connection is draining. This could be caused by goaway or balancer
759 // removing the address.
760 errStreamDrain = status.Error(codes.Unavailable, "the connection is draining")
761 // errStreamDone is returned from write at the client side to indiacte application
762 // layer of an error.
763 errStreamDone = errors.New("the stream is done")
764 // StatusGoAway indicates that the server sent a GOAWAY that included this
765 // stream's ID in unprocessed RPCs.
766 statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
767)
768
769// GoAwayReason contains the reason for the GoAway frame received.
770type GoAwayReason uint8
771
772const (
773 // GoAwayInvalid indicates that no GoAway frame is received.
774 GoAwayInvalid GoAwayReason = 0
775 // GoAwayNoReason is the default value when GoAway frame is received.
776 GoAwayNoReason GoAwayReason = 1
777 // GoAwayTooManyPings indicates that a GoAway frame with
778 // ErrCodeEnhanceYourCalm was received and that the debug data said
779 // "too_many_pings".
780 GoAwayTooManyPings GoAwayReason = 2
781)
782
783// channelzData is used to store channelz related data for http2Client and http2Server.
784// These fields cannot be embedded in the original structs (e.g. http2Client), since to do atomic
785// operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment.
786// Here, by grouping those int64 fields inside a struct, we are enforcing the alignment.
787type channelzData struct {
788 kpCount int64
789 // The number of streams that have started, including already finished ones.
790 streamsStarted int64
791 // Client side: The number of streams that have ended successfully by receiving
792 // EoS bit set frame from server.
793 // Server side: The number of streams that have ended successfully by sending
794 // frame with EoS bit set.
795 streamsSucceeded int64
796 streamsFailed int64
797 // lastStreamCreatedTime stores the timestamp that the last stream gets created. It is of int64 type
798 // instead of time.Time since it's more costly to atomically update time.Time variable than int64
799 // variable. The same goes for lastMsgSentTime and lastMsgRecvTime.
800 lastStreamCreatedTime int64
801 msgSent int64
802 msgRecv int64
803 lastMsgSentTime int64
804 lastMsgRecvTime int64
805}
806
807// ContextErr converts the error from context package into a status error.
808func ContextErr(err error) error {
809 switch err {
810 case context.DeadlineExceeded:
811 return status.Error(codes.DeadlineExceeded, err.Error())
812 case context.Canceled:
813 return status.Error(codes.Canceled, err.Error())
814 }
815 return status.Errorf(codes.Internal, "Unexpected error from context packet: %v", err)
816}