blob: bfab940bd0de23057730562e5adc6a905f7e4445 [file] [log] [blame]
Don Newton98fd8812019-09-23 15:15:02 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package transport defines and implements message oriented communication
20// channel to complete various transactions (e.g., an RPC). It is meant for
21// grpc-internal usage and is not intended to be imported directly by users.
22package transport
23
24import (
25 "bytes"
26 "context"
27 "errors"
28 "fmt"
29 "io"
30 "net"
31 "sync"
32 "sync/atomic"
33
34 "google.golang.org/grpc/codes"
35 "google.golang.org/grpc/credentials"
36 "google.golang.org/grpc/keepalive"
37 "google.golang.org/grpc/metadata"
38 "google.golang.org/grpc/stats"
39 "google.golang.org/grpc/status"
40 "google.golang.org/grpc/tap"
41)
42
43type bufferPool struct {
44 pool sync.Pool
45}
46
47func newBufferPool() *bufferPool {
48 return &bufferPool{
49 pool: sync.Pool{
50 New: func() interface{} {
51 return new(bytes.Buffer)
52 },
53 },
54 }
55}
56
57func (p *bufferPool) get() *bytes.Buffer {
58 return p.pool.Get().(*bytes.Buffer)
59}
60
61func (p *bufferPool) put(b *bytes.Buffer) {
62 p.pool.Put(b)
63}
64
65// recvMsg represents the received msg from the transport. All transport
66// protocol specific info has been removed.
67type recvMsg struct {
68 buffer *bytes.Buffer
69 // nil: received some data
70 // io.EOF: stream is completed. data is nil.
71 // other non-nil error: transport failure. data is nil.
72 err error
73}
74
75// recvBuffer is an unbounded channel of recvMsg structs.
76// Note recvBuffer differs from controlBuffer only in that recvBuffer
77// holds a channel of only recvMsg structs instead of objects implementing "item" interface.
78// recvBuffer is written to much more often than
79// controlBuffer and using strict recvMsg structs helps avoid allocation in "recvBuffer.put"
80type recvBuffer struct {
81 c chan recvMsg
82 mu sync.Mutex
83 backlog []recvMsg
84 err error
85}
86
87func newRecvBuffer() *recvBuffer {
88 b := &recvBuffer{
89 c: make(chan recvMsg, 1),
90 }
91 return b
92}
93
94func (b *recvBuffer) put(r recvMsg) {
95 b.mu.Lock()
96 if b.err != nil {
97 b.mu.Unlock()
98 // An error had occurred earlier, don't accept more
99 // data or errors.
100 return
101 }
102 b.err = r.err
103 if len(b.backlog) == 0 {
104 select {
105 case b.c <- r:
106 b.mu.Unlock()
107 return
108 default:
109 }
110 }
111 b.backlog = append(b.backlog, r)
112 b.mu.Unlock()
113}
114
115func (b *recvBuffer) load() {
116 b.mu.Lock()
117 if len(b.backlog) > 0 {
118 select {
119 case b.c <- b.backlog[0]:
120 b.backlog[0] = recvMsg{}
121 b.backlog = b.backlog[1:]
122 default:
123 }
124 }
125 b.mu.Unlock()
126}
127
128// get returns the channel that receives a recvMsg in the buffer.
129//
130// Upon receipt of a recvMsg, the caller should call load to send another
131// recvMsg onto the channel if there is any.
132func (b *recvBuffer) get() <-chan recvMsg {
133 return b.c
134}
135
136// recvBufferReader implements io.Reader interface to read the data from
137// recvBuffer.
138type recvBufferReader struct {
139 closeStream func(error) // Closes the client transport stream with the given error and nil trailer metadata.
140 ctx context.Context
141 ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
142 recv *recvBuffer
143 last *bytes.Buffer // Stores the remaining data in the previous calls.
144 err error
145 freeBuffer func(*bytes.Buffer)
146}
147
148// Read reads the next len(p) bytes from last. If last is drained, it tries to
149// read additional data from recv. It blocks if there no additional data available
150// in recv. If Read returns any non-nil error, it will continue to return that error.
151func (r *recvBufferReader) Read(p []byte) (n int, err error) {
152 if r.err != nil {
153 return 0, r.err
154 }
155 if r.last != nil {
156 // Read remaining data left in last call.
157 copied, _ := r.last.Read(p)
158 if r.last.Len() == 0 {
159 r.freeBuffer(r.last)
160 r.last = nil
161 }
162 return copied, nil
163 }
164 if r.closeStream != nil {
165 n, r.err = r.readClient(p)
166 } else {
167 n, r.err = r.read(p)
168 }
169 return n, r.err
170}
171
172func (r *recvBufferReader) read(p []byte) (n int, err error) {
173 select {
174 case <-r.ctxDone:
175 return 0, ContextErr(r.ctx.Err())
176 case m := <-r.recv.get():
177 return r.readAdditional(m, p)
178 }
179}
180
181func (r *recvBufferReader) readClient(p []byte) (n int, err error) {
182 // If the context is canceled, then closes the stream with nil metadata.
183 // closeStream writes its error parameter to r.recv as a recvMsg.
184 // r.readAdditional acts on that message and returns the necessary error.
185 select {
186 case <-r.ctxDone:
187 // Note that this adds the ctx error to the end of recv buffer, and
188 // reads from the head. This will delay the error until recv buffer is
189 // empty, thus will delay ctx cancellation in Recv().
190 //
191 // It's done this way to fix a race between ctx cancel and trailer. The
192 // race was, stream.Recv() may return ctx error if ctxDone wins the
193 // race, but stream.Trailer() may return a non-nil md because the stream
194 // was not marked as done when trailer is received. This closeStream
195 // call will mark stream as done, thus fix the race.
196 //
197 // TODO: delaying ctx error seems like a unnecessary side effect. What
198 // we really want is to mark the stream as done, and return ctx error
199 // faster.
200 r.closeStream(ContextErr(r.ctx.Err()))
201 m := <-r.recv.get()
202 return r.readAdditional(m, p)
203 case m := <-r.recv.get():
204 return r.readAdditional(m, p)
205 }
206}
207
208func (r *recvBufferReader) readAdditional(m recvMsg, p []byte) (n int, err error) {
209 r.recv.load()
210 if m.err != nil {
211 return 0, m.err
212 }
213 copied, _ := m.buffer.Read(p)
214 if m.buffer.Len() == 0 {
215 r.freeBuffer(m.buffer)
216 r.last = nil
217 } else {
218 r.last = m.buffer
219 }
220 return copied, nil
221}
222
223type streamState uint32
224
225const (
226 streamActive streamState = iota
227 streamWriteDone // EndStream sent
228 streamReadDone // EndStream received
229 streamDone // the entire stream is finished.
230)
231
232// Stream represents an RPC in the transport layer.
233type Stream struct {
234 id uint32
235 st ServerTransport // nil for client side Stream
Don Newtone0d34a82019-11-14 10:58:06 -0500236 ct *http2Client // nil for server side Stream
Don Newton98fd8812019-09-23 15:15:02 -0400237 ctx context.Context // the associated context of the stream
238 cancel context.CancelFunc // always nil for client side Stream
239 done chan struct{} // closed at the end of stream to unblock writers. On the client side.
240 ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance)
241 method string // the associated RPC method of the stream
242 recvCompress string
243 sendCompress string
244 buf *recvBuffer
245 trReader io.Reader
246 fc *inFlow
247 wq *writeQuota
248
249 // Callback to state application's intentions to read data. This
250 // is used to adjust flow control, if needed.
251 requestRead func(int)
252
253 headerChan chan struct{} // closed to indicate the end of header metadata.
254 headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
Don Newtone0d34a82019-11-14 10:58:06 -0500255 // headerValid indicates whether a valid header was received. Only
256 // meaningful after headerChan is closed (always call waitOnHeader() before
257 // reading its value). Not valid on server side.
258 headerValid bool
Don Newton98fd8812019-09-23 15:15:02 -0400259
260 // hdrMu protects header and trailer metadata on the server-side.
261 hdrMu sync.Mutex
262 // On client side, header keeps the received header metadata.
263 //
264 // On server side, header keeps the header set by SetHeader(). The complete
265 // header will merged into this after t.WriteHeader() is called.
266 header metadata.MD
267 trailer metadata.MD // the key-value map of trailer metadata.
268
269 noHeaders bool // set if the client never received headers (set only after the stream is done).
270
271 // On the server-side, headerSent is atomically set to 1 when the headers are sent out.
272 headerSent uint32
273
274 state streamState
275
276 // On client-side it is the status error received from the server.
277 // On server-side it is unused.
278 status *status.Status
279
280 bytesReceived uint32 // indicates whether any bytes have been received on this stream
281 unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream
282
283 // contentSubtype is the content-subtype for requests.
284 // this must be lowercase or the behavior is undefined.
285 contentSubtype string
286}
287
288// isHeaderSent is only valid on the server-side.
289func (s *Stream) isHeaderSent() bool {
290 return atomic.LoadUint32(&s.headerSent) == 1
291}
292
293// updateHeaderSent updates headerSent and returns true
294// if it was alreay set. It is valid only on server-side.
295func (s *Stream) updateHeaderSent() bool {
296 return atomic.SwapUint32(&s.headerSent, 1) == 1
297}
298
299func (s *Stream) swapState(st streamState) streamState {
300 return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
301}
302
303func (s *Stream) compareAndSwapState(oldState, newState streamState) bool {
304 return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState))
305}
306
307func (s *Stream) getState() streamState {
308 return streamState(atomic.LoadUint32((*uint32)(&s.state)))
309}
310
Don Newtone0d34a82019-11-14 10:58:06 -0500311func (s *Stream) waitOnHeader() {
Don Newton98fd8812019-09-23 15:15:02 -0400312 if s.headerChan == nil {
313 // On the server headerChan is always nil since a stream originates
314 // only after having received headers.
Don Newtone0d34a82019-11-14 10:58:06 -0500315 return
Don Newton98fd8812019-09-23 15:15:02 -0400316 }
317 select {
318 case <-s.ctx.Done():
Don Newtone0d34a82019-11-14 10:58:06 -0500319 // Close the stream to prevent headers/trailers from changing after
320 // this function returns.
321 s.ct.CloseStream(s, ContextErr(s.ctx.Err()))
322 // headerChan could possibly not be closed yet if closeStream raced
323 // with operateHeaders; wait until it is closed explicitly here.
324 <-s.headerChan
Don Newton98fd8812019-09-23 15:15:02 -0400325 case <-s.headerChan:
Don Newton98fd8812019-09-23 15:15:02 -0400326 }
327}
328
329// RecvCompress returns the compression algorithm applied to the inbound
330// message. It is empty string if there is no compression applied.
331func (s *Stream) RecvCompress() string {
Don Newtone0d34a82019-11-14 10:58:06 -0500332 s.waitOnHeader()
Don Newton98fd8812019-09-23 15:15:02 -0400333 return s.recvCompress
334}
335
336// SetSendCompress sets the compression algorithm to the stream.
337func (s *Stream) SetSendCompress(str string) {
338 s.sendCompress = str
339}
340
341// Done returns a channel which is closed when it receives the final status
342// from the server.
343func (s *Stream) Done() <-chan struct{} {
344 return s.done
345}
346
347// Header returns the header metadata of the stream.
348//
349// On client side, it acquires the key-value pairs of header metadata once it is
350// available. It blocks until i) the metadata is ready or ii) there is no header
351// metadata or iii) the stream is canceled/expired.
352//
Don Newtone0d34a82019-11-14 10:58:06 -0500353// On server side, it returns the out header after t.WriteHeader is called. It
354// does not block and must not be called until after WriteHeader.
Don Newton98fd8812019-09-23 15:15:02 -0400355func (s *Stream) Header() (metadata.MD, error) {
Don Newtone0d34a82019-11-14 10:58:06 -0500356 if s.headerChan == nil {
Don Newton98fd8812019-09-23 15:15:02 -0400357 // On server side, return the header in stream. It will be the out
358 // header after t.WriteHeader is called.
359 return s.header.Copy(), nil
360 }
Don Newtone0d34a82019-11-14 10:58:06 -0500361 s.waitOnHeader()
362 if !s.headerValid {
363 return nil, s.status.Err()
Don Newton98fd8812019-09-23 15:15:02 -0400364 }
Don Newtone0d34a82019-11-14 10:58:06 -0500365 return s.header.Copy(), nil
Don Newton98fd8812019-09-23 15:15:02 -0400366}
367
368// TrailersOnly blocks until a header or trailers-only frame is received and
369// then returns true if the stream was trailers-only. If the stream ends
Don Newtone0d34a82019-11-14 10:58:06 -0500370// before headers are received, returns true, nil. Client-side only.
371func (s *Stream) TrailersOnly() bool {
372 s.waitOnHeader()
373 return s.noHeaders
Don Newton98fd8812019-09-23 15:15:02 -0400374}
375
376// Trailer returns the cached trailer metedata. Note that if it is not called
377// after the entire stream is done, it could return an empty MD. Client
378// side only.
379// It can be safely read only after stream has ended that is either read
380// or write have returned io.EOF.
381func (s *Stream) Trailer() metadata.MD {
382 c := s.trailer.Copy()
383 return c
384}
385
386// ContentSubtype returns the content-subtype for a request. For example, a
387// content-subtype of "proto" will result in a content-type of
388// "application/grpc+proto". This will always be lowercase. See
389// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
390// more details.
391func (s *Stream) ContentSubtype() string {
392 return s.contentSubtype
393}
394
395// Context returns the context of the stream.
396func (s *Stream) Context() context.Context {
397 return s.ctx
398}
399
400// Method returns the method for the stream.
401func (s *Stream) Method() string {
402 return s.method
403}
404
405// Status returns the status received from the server.
406// Status can be read safely only after the stream has ended,
407// that is, after Done() is closed.
408func (s *Stream) Status() *status.Status {
409 return s.status
410}
411
412// SetHeader sets the header metadata. This can be called multiple times.
413// Server side only.
414// This should not be called in parallel to other data writes.
415func (s *Stream) SetHeader(md metadata.MD) error {
416 if md.Len() == 0 {
417 return nil
418 }
419 if s.isHeaderSent() || s.getState() == streamDone {
420 return ErrIllegalHeaderWrite
421 }
422 s.hdrMu.Lock()
423 s.header = metadata.Join(s.header, md)
424 s.hdrMu.Unlock()
425 return nil
426}
427
428// SendHeader sends the given header metadata. The given metadata is
429// combined with any metadata set by previous calls to SetHeader and
430// then written to the transport stream.
431func (s *Stream) SendHeader(md metadata.MD) error {
432 return s.st.WriteHeader(s, md)
433}
434
435// SetTrailer sets the trailer metadata which will be sent with the RPC status
436// by the server. This can be called multiple times. Server side only.
437// This should not be called parallel to other data writes.
438func (s *Stream) SetTrailer(md metadata.MD) error {
439 if md.Len() == 0 {
440 return nil
441 }
442 if s.getState() == streamDone {
443 return ErrIllegalHeaderWrite
444 }
445 s.hdrMu.Lock()
446 s.trailer = metadata.Join(s.trailer, md)
447 s.hdrMu.Unlock()
448 return nil
449}
450
451func (s *Stream) write(m recvMsg) {
452 s.buf.put(m)
453}
454
455// Read reads all p bytes from the wire for this stream.
456func (s *Stream) Read(p []byte) (n int, err error) {
457 // Don't request a read if there was an error earlier
458 if er := s.trReader.(*transportReader).er; er != nil {
459 return 0, er
460 }
461 s.requestRead(len(p))
462 return io.ReadFull(s.trReader, p)
463}
464
465// tranportReader reads all the data available for this Stream from the transport and
466// passes them into the decoder, which converts them into a gRPC message stream.
467// The error is io.EOF when the stream is done or another non-nil error if
468// the stream broke.
469type transportReader struct {
470 reader io.Reader
471 // The handler to control the window update procedure for both this
472 // particular stream and the associated transport.
473 windowHandler func(int)
474 er error
475}
476
477func (t *transportReader) Read(p []byte) (n int, err error) {
478 n, err = t.reader.Read(p)
479 if err != nil {
480 t.er = err
481 return
482 }
483 t.windowHandler(n)
484 return
485}
486
487// BytesReceived indicates whether any bytes have been received on this stream.
488func (s *Stream) BytesReceived() bool {
489 return atomic.LoadUint32(&s.bytesReceived) == 1
490}
491
492// Unprocessed indicates whether the server did not process this stream --
493// i.e. it sent a refused stream or GOAWAY including this stream ID.
494func (s *Stream) Unprocessed() bool {
495 return atomic.LoadUint32(&s.unprocessed) == 1
496}
497
498// GoString is implemented by Stream so context.String() won't
499// race when printing %#v.
500func (s *Stream) GoString() string {
501 return fmt.Sprintf("<stream: %p, %v>", s, s.method)
502}
503
504// state of transport
505type transportState int
506
507const (
508 reachable transportState = iota
509 closing
510 draining
511)
512
513// ServerConfig consists of all the configurations to establish a server transport.
514type ServerConfig struct {
515 MaxStreams uint32
516 AuthInfo credentials.AuthInfo
517 InTapHandle tap.ServerInHandle
518 StatsHandler stats.Handler
519 KeepaliveParams keepalive.ServerParameters
520 KeepalivePolicy keepalive.EnforcementPolicy
521 InitialWindowSize int32
522 InitialConnWindowSize int32
523 WriteBufferSize int
524 ReadBufferSize int
525 ChannelzParentID int64
526 MaxHeaderListSize *uint32
Don Newtone0d34a82019-11-14 10:58:06 -0500527 HeaderTableSize *uint32
Don Newton98fd8812019-09-23 15:15:02 -0400528}
529
530// NewServerTransport creates a ServerTransport with conn or non-nil error
531// if it fails.
532func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) {
533 return newHTTP2Server(conn, config)
534}
535
536// ConnectOptions covers all relevant options for communicating with the server.
537type ConnectOptions struct {
538 // UserAgent is the application user agent.
539 UserAgent string
540 // Dialer specifies how to dial a network address.
541 Dialer func(context.Context, string) (net.Conn, error)
542 // FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
543 FailOnNonTempDialError bool
544 // PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
545 PerRPCCredentials []credentials.PerRPCCredentials
546 // TransportCredentials stores the Authenticator required to setup a client
547 // connection. Only one of TransportCredentials and CredsBundle is non-nil.
548 TransportCredentials credentials.TransportCredentials
549 // CredsBundle is the credentials bundle to be used. Only one of
550 // TransportCredentials and CredsBundle is non-nil.
551 CredsBundle credentials.Bundle
552 // KeepaliveParams stores the keepalive parameters.
553 KeepaliveParams keepalive.ClientParameters
554 // StatsHandler stores the handler for stats.
555 StatsHandler stats.Handler
556 // InitialWindowSize sets the initial window size for a stream.
557 InitialWindowSize int32
558 // InitialConnWindowSize sets the initial window size for a connection.
559 InitialConnWindowSize int32
560 // WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
561 WriteBufferSize int
562 // ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
563 ReadBufferSize int
564 // ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
565 ChannelzParentID int64
566 // MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
567 MaxHeaderListSize *uint32
568}
569
570// TargetInfo contains the information of the target such as network address and metadata.
571type TargetInfo struct {
572 Addr string
573 Metadata interface{}
574 Authority string
575}
576
577// NewClientTransport establishes the transport with the required ConnectOptions
578// and returns it to the caller.
579func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
580 return newHTTP2Client(connectCtx, ctx, target, opts, onPrefaceReceipt, onGoAway, onClose)
581}
582
583// Options provides additional hints and information for message
584// transmission.
585type Options struct {
586 // Last indicates whether this write is the last piece for
587 // this stream.
588 Last bool
589}
590
591// CallHdr carries the information of a particular RPC.
592type CallHdr struct {
593 // Host specifies the peer's host.
594 Host string
595
596 // Method specifies the operation to perform.
597 Method string
598
599 // SendCompress specifies the compression algorithm applied on
600 // outbound message.
601 SendCompress string
602
603 // Creds specifies credentials.PerRPCCredentials for a call.
604 Creds credentials.PerRPCCredentials
605
606 // ContentSubtype specifies the content-subtype for a request. For example, a
607 // content-subtype of "proto" will result in a content-type of
608 // "application/grpc+proto". The value of ContentSubtype must be all
609 // lowercase, otherwise the behavior is undefined. See
610 // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
611 // for more details.
612 ContentSubtype string
613
614 PreviousAttempts int // value of grpc-previous-rpc-attempts header to set
615}
616
617// ClientTransport is the common interface for all gRPC client-side transport
618// implementations.
619type ClientTransport interface {
620 // Close tears down this transport. Once it returns, the transport
621 // should not be accessed any more. The caller must make sure this
622 // is called only once.
623 Close() error
624
625 // GracefulClose starts to tear down the transport: the transport will stop
626 // accepting new RPCs and NewStream will return error. Once all streams are
627 // finished, the transport will close.
628 //
629 // It does not block.
630 GracefulClose()
631
632 // Write sends the data for the given stream. A nil stream indicates
633 // the write is to be performed on the transport as a whole.
634 Write(s *Stream, hdr []byte, data []byte, opts *Options) error
635
636 // NewStream creates a Stream for an RPC.
637 NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
638
639 // CloseStream clears the footprint of a stream when the stream is
640 // not needed any more. The err indicates the error incurred when
641 // CloseStream is called. Must be called when a stream is finished
642 // unless the associated transport is closing.
643 CloseStream(stream *Stream, err error)
644
645 // Error returns a channel that is closed when some I/O error
646 // happens. Typically the caller should have a goroutine to monitor
647 // this in order to take action (e.g., close the current transport
648 // and create a new one) in error case. It should not return nil
649 // once the transport is initiated.
650 Error() <-chan struct{}
651
652 // GoAway returns a channel that is closed when ClientTransport
653 // receives the draining signal from the server (e.g., GOAWAY frame in
654 // HTTP/2).
655 GoAway() <-chan struct{}
656
657 // GetGoAwayReason returns the reason why GoAway frame was received.
658 GetGoAwayReason() GoAwayReason
659
660 // RemoteAddr returns the remote network address.
661 RemoteAddr() net.Addr
662
663 // IncrMsgSent increments the number of message sent through this transport.
664 IncrMsgSent()
665
666 // IncrMsgRecv increments the number of message received through this transport.
667 IncrMsgRecv()
668}
669
670// ServerTransport is the common interface for all gRPC server-side transport
671// implementations.
672//
673// Methods may be called concurrently from multiple goroutines, but
674// Write methods for a given Stream will be called serially.
675type ServerTransport interface {
676 // HandleStreams receives incoming streams using the given handler.
677 HandleStreams(func(*Stream), func(context.Context, string) context.Context)
678
679 // WriteHeader sends the header metadata for the given stream.
680 // WriteHeader may not be called on all streams.
681 WriteHeader(s *Stream, md metadata.MD) error
682
683 // Write sends the data for the given stream.
684 // Write may not be called on all streams.
685 Write(s *Stream, hdr []byte, data []byte, opts *Options) error
686
687 // WriteStatus sends the status of a stream to the client. WriteStatus is
688 // the final call made on a stream and always occurs.
689 WriteStatus(s *Stream, st *status.Status) error
690
691 // Close tears down the transport. Once it is called, the transport
692 // should not be accessed any more. All the pending streams and their
693 // handlers will be terminated asynchronously.
694 Close() error
695
696 // RemoteAddr returns the remote network address.
697 RemoteAddr() net.Addr
698
699 // Drain notifies the client this ServerTransport stops accepting new RPCs.
700 Drain()
701
702 // IncrMsgSent increments the number of message sent through this transport.
703 IncrMsgSent()
704
705 // IncrMsgRecv increments the number of message received through this transport.
706 IncrMsgRecv()
707}
708
709// connectionErrorf creates an ConnectionError with the specified error description.
710func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
711 return ConnectionError{
712 Desc: fmt.Sprintf(format, a...),
713 temp: temp,
714 err: e,
715 }
716}
717
718// ConnectionError is an error that results in the termination of the
719// entire connection and the retry of all the active streams.
720type ConnectionError struct {
721 Desc string
722 temp bool
723 err error
724}
725
726func (e ConnectionError) Error() string {
727 return fmt.Sprintf("connection error: desc = %q", e.Desc)
728}
729
730// Temporary indicates if this connection error is temporary or fatal.
731func (e ConnectionError) Temporary() bool {
732 return e.temp
733}
734
735// Origin returns the original error of this connection error.
736func (e ConnectionError) Origin() error {
737 // Never return nil error here.
738 // If the original error is nil, return itself.
739 if e.err == nil {
740 return e
741 }
742 return e.err
743}
744
745var (
746 // ErrConnClosing indicates that the transport is closing.
747 ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
748 // errStreamDrain indicates that the stream is rejected because the
749 // connection is draining. This could be caused by goaway or balancer
750 // removing the address.
751 errStreamDrain = status.Error(codes.Unavailable, "the connection is draining")
752 // errStreamDone is returned from write at the client side to indiacte application
753 // layer of an error.
754 errStreamDone = errors.New("the stream is done")
755 // StatusGoAway indicates that the server sent a GOAWAY that included this
756 // stream's ID in unprocessed RPCs.
757 statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
758)
759
760// GoAwayReason contains the reason for the GoAway frame received.
761type GoAwayReason uint8
762
763const (
764 // GoAwayInvalid indicates that no GoAway frame is received.
765 GoAwayInvalid GoAwayReason = 0
766 // GoAwayNoReason is the default value when GoAway frame is received.
767 GoAwayNoReason GoAwayReason = 1
768 // GoAwayTooManyPings indicates that a GoAway frame with
769 // ErrCodeEnhanceYourCalm was received and that the debug data said
770 // "too_many_pings".
771 GoAwayTooManyPings GoAwayReason = 2
772)
773
774// channelzData is used to store channelz related data for http2Client and http2Server.
775// These fields cannot be embedded in the original structs (e.g. http2Client), since to do atomic
776// operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment.
777// Here, by grouping those int64 fields inside a struct, we are enforcing the alignment.
778type channelzData struct {
779 kpCount int64
780 // The number of streams that have started, including already finished ones.
781 streamsStarted int64
782 // Client side: The number of streams that have ended successfully by receiving
783 // EoS bit set frame from server.
784 // Server side: The number of streams that have ended successfully by sending
785 // frame with EoS bit set.
786 streamsSucceeded int64
787 streamsFailed int64
788 // lastStreamCreatedTime stores the timestamp that the last stream gets created. It is of int64 type
789 // instead of time.Time since it's more costly to atomically update time.Time variable than int64
790 // variable. The same goes for lastMsgSentTime and lastMsgRecvTime.
791 lastStreamCreatedTime int64
792 msgSent int64
793 msgRecv int64
794 lastMsgSentTime int64
795 lastMsgRecvTime int64
796}
797
798// ContextErr converts the error from context package into a status error.
799func ContextErr(err error) error {
800 switch err {
801 case context.DeadlineExceeded:
802 return status.Error(codes.DeadlineExceeded, err.Error())
803 case context.Canceled:
804 return status.Error(codes.Canceled, err.Error())
805 }
806 return status.Errorf(codes.Internal, "Unexpected error from context packet: %v", err)
807}