blob: 8394d252df03f1adb48f04dc8376ed47c62bc58c [file] [log] [blame]
khenaidoo5fc5cea2021-08-11 17:39:16 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "bytes"
23 "errors"
24 "fmt"
25 "runtime"
26 "strconv"
27 "sync"
28 "sync/atomic"
29
30 "golang.org/x/net/http2"
31 "golang.org/x/net/http2/hpack"
32 "google.golang.org/grpc/internal/grpcutil"
33 "google.golang.org/grpc/status"
34)
35
36var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
37 e.SetMaxDynamicTableSizeLimit(v)
38}
39
40type itemNode struct {
41 it interface{}
42 next *itemNode
43}
44
45type itemList struct {
46 head *itemNode
47 tail *itemNode
48}
49
50func (il *itemList) enqueue(i interface{}) {
51 n := &itemNode{it: i}
52 if il.tail == nil {
53 il.head, il.tail = n, n
54 return
55 }
56 il.tail.next = n
57 il.tail = n
58}
59
60// peek returns the first item in the list without removing it from the
61// list.
62func (il *itemList) peek() interface{} {
63 return il.head.it
64}
65
66func (il *itemList) dequeue() interface{} {
67 if il.head == nil {
68 return nil
69 }
70 i := il.head.it
71 il.head = il.head.next
72 if il.head == nil {
73 il.tail = nil
74 }
75 return i
76}
77
78func (il *itemList) dequeueAll() *itemNode {
79 h := il.head
80 il.head, il.tail = nil, nil
81 return h
82}
83
84func (il *itemList) isEmpty() bool {
85 return il.head == nil
86}
87
88// The following defines various control items which could flow through
89// the control buffer of transport. They represent different aspects of
90// control tasks, e.g., flow control, settings, streaming resetting, etc.
91
92// maxQueuedTransportResponseFrames is the most queued "transport response"
93// frames we will buffer before preventing new reads from occurring on the
94// transport. These are control frames sent in response to client requests,
95// such as RST_STREAM due to bad headers or settings acks.
96const maxQueuedTransportResponseFrames = 50
97
98type cbItem interface {
99 isTransportResponseFrame() bool
100}
101
102// registerStream is used to register an incoming stream with loopy writer.
103type registerStream struct {
104 streamID uint32
105 wq *writeQuota
106}
107
108func (*registerStream) isTransportResponseFrame() bool { return false }
109
110// headerFrame is also used to register stream on the client-side.
111type headerFrame struct {
112 streamID uint32
113 hf []hpack.HeaderField
114 endStream bool // Valid on server side.
115 initStream func(uint32) error // Used only on the client side.
116 onWrite func()
117 wq *writeQuota // write quota for the stream created.
118 cleanup *cleanupStream // Valid on the server side.
119 onOrphaned func(error) // Valid on client-side
120}
121
122func (h *headerFrame) isTransportResponseFrame() bool {
123 return h.cleanup != nil && h.cleanup.rst // Results in a RST_STREAM
124}
125
126type cleanupStream struct {
127 streamID uint32
128 rst bool
129 rstCode http2.ErrCode
130 onWrite func()
131}
132
133func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
134
135type earlyAbortStream struct {
khenaidoo5cb0d402021-12-08 14:09:16 -0500136 httpStatus uint32
khenaidoo5fc5cea2021-08-11 17:39:16 -0400137 streamID uint32
138 contentSubtype string
139 status *status.Status
140}
141
142func (*earlyAbortStream) isTransportResponseFrame() bool { return false }
143
144type dataFrame struct {
145 streamID uint32
146 endStream bool
147 h []byte
148 d []byte
149 // onEachWrite is called every time
150 // a part of d is written out.
151 onEachWrite func()
152}
153
154func (*dataFrame) isTransportResponseFrame() bool { return false }
155
156type incomingWindowUpdate struct {
157 streamID uint32
158 increment uint32
159}
160
161func (*incomingWindowUpdate) isTransportResponseFrame() bool { return false }
162
163type outgoingWindowUpdate struct {
164 streamID uint32
165 increment uint32
166}
167
168func (*outgoingWindowUpdate) isTransportResponseFrame() bool {
169 return false // window updates are throttled by thresholds
170}
171
172type incomingSettings struct {
173 ss []http2.Setting
174}
175
176func (*incomingSettings) isTransportResponseFrame() bool { return true } // Results in a settings ACK
177
178type outgoingSettings struct {
179 ss []http2.Setting
180}
181
182func (*outgoingSettings) isTransportResponseFrame() bool { return false }
183
184type incomingGoAway struct {
185}
186
187func (*incomingGoAway) isTransportResponseFrame() bool { return false }
188
189type goAway struct {
190 code http2.ErrCode
191 debugData []byte
192 headsUp bool
193 closeConn bool
194}
195
196func (*goAway) isTransportResponseFrame() bool { return false }
197
198type ping struct {
199 ack bool
200 data [8]byte
201}
202
203func (*ping) isTransportResponseFrame() bool { return true }
204
205type outFlowControlSizeRequest struct {
206 resp chan uint32
207}
208
209func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }
210
211type outStreamState int
212
213const (
214 active outStreamState = iota
215 empty
216 waitingOnStreamQuota
217)
218
219type outStream struct {
220 id uint32
221 state outStreamState
222 itl *itemList
223 bytesOutStanding int
224 wq *writeQuota
225
226 next *outStream
227 prev *outStream
228}
229
230func (s *outStream) deleteSelf() {
231 if s.prev != nil {
232 s.prev.next = s.next
233 }
234 if s.next != nil {
235 s.next.prev = s.prev
236 }
237 s.next, s.prev = nil, nil
238}
239
240type outStreamList struct {
241 // Following are sentinel objects that mark the
242 // beginning and end of the list. They do not
243 // contain any item lists. All valid objects are
244 // inserted in between them.
245 // This is needed so that an outStream object can
246 // deleteSelf() in O(1) time without knowing which
247 // list it belongs to.
248 head *outStream
249 tail *outStream
250}
251
252func newOutStreamList() *outStreamList {
253 head, tail := new(outStream), new(outStream)
254 head.next = tail
255 tail.prev = head
256 return &outStreamList{
257 head: head,
258 tail: tail,
259 }
260}
261
262func (l *outStreamList) enqueue(s *outStream) {
263 e := l.tail.prev
264 e.next = s
265 s.prev = e
266 s.next = l.tail
267 l.tail.prev = s
268}
269
270// remove from the beginning of the list.
271func (l *outStreamList) dequeue() *outStream {
272 b := l.head.next
273 if b == l.tail {
274 return nil
275 }
276 b.deleteSelf()
277 return b
278}
279
280// controlBuffer is a way to pass information to loopy.
281// Information is passed as specific struct types called control frames.
282// A control frame not only represents data, messages or headers to be sent out
283// but can also be used to instruct loopy to update its internal state.
284// It shouldn't be confused with an HTTP2 frame, although some of the control frames
285// like dataFrame and headerFrame do go out on wire as HTTP2 frames.
286type controlBuffer struct {
287 ch chan struct{}
288 done <-chan struct{}
289 mu sync.Mutex
290 consumerWaiting bool
291 list *itemList
292 err error
293
294 // transportResponseFrames counts the number of queued items that represent
295 // the response of an action initiated by the peer. trfChan is created
296 // when transportResponseFrames >= maxQueuedTransportResponseFrames and is
297 // closed and nilled when transportResponseFrames drops below the
298 // threshold. Both fields are protected by mu.
299 transportResponseFrames int
300 trfChan atomic.Value // chan struct{}
301}
302
303func newControlBuffer(done <-chan struct{}) *controlBuffer {
304 return &controlBuffer{
305 ch: make(chan struct{}, 1),
306 list: &itemList{},
307 done: done,
308 }
309}
310
311// throttle blocks if there are too many incomingSettings/cleanupStreams in the
312// controlbuf.
313func (c *controlBuffer) throttle() {
314 ch, _ := c.trfChan.Load().(chan struct{})
315 if ch != nil {
316 select {
317 case <-ch:
318 case <-c.done:
319 }
320 }
321}
322
323func (c *controlBuffer) put(it cbItem) error {
324 _, err := c.executeAndPut(nil, it)
325 return err
326}
327
328func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (bool, error) {
329 var wakeUp bool
330 c.mu.Lock()
331 if c.err != nil {
332 c.mu.Unlock()
333 return false, c.err
334 }
335 if f != nil {
336 if !f(it) { // f wasn't successful
337 c.mu.Unlock()
338 return false, nil
339 }
340 }
341 if c.consumerWaiting {
342 wakeUp = true
343 c.consumerWaiting = false
344 }
345 c.list.enqueue(it)
346 if it.isTransportResponseFrame() {
347 c.transportResponseFrames++
348 if c.transportResponseFrames == maxQueuedTransportResponseFrames {
349 // We are adding the frame that puts us over the threshold; create
350 // a throttling channel.
351 c.trfChan.Store(make(chan struct{}))
352 }
353 }
354 c.mu.Unlock()
355 if wakeUp {
356 select {
357 case c.ch <- struct{}{}:
358 default:
359 }
360 }
361 return true, nil
362}
363
364// Note argument f should never be nil.
365func (c *controlBuffer) execute(f func(it interface{}) bool, it interface{}) (bool, error) {
366 c.mu.Lock()
367 if c.err != nil {
368 c.mu.Unlock()
369 return false, c.err
370 }
371 if !f(it) { // f wasn't successful
372 c.mu.Unlock()
373 return false, nil
374 }
375 c.mu.Unlock()
376 return true, nil
377}
378
379func (c *controlBuffer) get(block bool) (interface{}, error) {
380 for {
381 c.mu.Lock()
382 if c.err != nil {
383 c.mu.Unlock()
384 return nil, c.err
385 }
386 if !c.list.isEmpty() {
387 h := c.list.dequeue().(cbItem)
388 if h.isTransportResponseFrame() {
389 if c.transportResponseFrames == maxQueuedTransportResponseFrames {
390 // We are removing the frame that put us over the
391 // threshold; close and clear the throttling channel.
392 ch := c.trfChan.Load().(chan struct{})
393 close(ch)
394 c.trfChan.Store((chan struct{})(nil))
395 }
396 c.transportResponseFrames--
397 }
398 c.mu.Unlock()
399 return h, nil
400 }
401 if !block {
402 c.mu.Unlock()
403 return nil, nil
404 }
405 c.consumerWaiting = true
406 c.mu.Unlock()
407 select {
408 case <-c.ch:
409 case <-c.done:
410 return nil, ErrConnClosing
411 }
412 }
413}
414
415func (c *controlBuffer) finish() {
416 c.mu.Lock()
417 if c.err != nil {
418 c.mu.Unlock()
419 return
420 }
421 c.err = ErrConnClosing
422 // There may be headers for streams in the control buffer.
423 // These streams need to be cleaned out since the transport
424 // is still not aware of these yet.
425 for head := c.list.dequeueAll(); head != nil; head = head.next {
426 hdr, ok := head.it.(*headerFrame)
427 if !ok {
428 continue
429 }
430 if hdr.onOrphaned != nil { // It will be nil on the server-side.
431 hdr.onOrphaned(ErrConnClosing)
432 }
433 }
434 // In case throttle() is currently in flight, it needs to be unblocked.
435 // Otherwise, the transport may not close, since the transport is closed by
436 // the reader encountering the connection error.
437 ch, _ := c.trfChan.Load().(chan struct{})
438 if ch != nil {
439 close(ch)
440 }
441 c.trfChan.Store((chan struct{})(nil))
442 c.mu.Unlock()
443}
444
445type side int
446
447const (
448 clientSide side = iota
449 serverSide
450)
451
452// Loopy receives frames from the control buffer.
453// Each frame is handled individually; most of the work done by loopy goes
454// into handling data frames. Loopy maintains a queue of active streams, and each
455// stream maintains a queue of data frames; as loopy receives data frames
456// it gets added to the queue of the relevant stream.
457// Loopy goes over this list of active streams by processing one node every iteration,
458// thereby closely resemebling to a round-robin scheduling over all streams. While
459// processing a stream, loopy writes out data bytes from this stream capped by the min
460// of http2MaxFrameLen, connection-level flow control and stream-level flow control.
461type loopyWriter struct {
462 side side
463 cbuf *controlBuffer
464 sendQuota uint32
465 oiws uint32 // outbound initial window size.
466 // estdStreams is map of all established streams that are not cleaned-up yet.
467 // On client-side, this is all streams whose headers were sent out.
468 // On server-side, this is all streams whose headers were received.
469 estdStreams map[uint32]*outStream // Established streams.
470 // activeStreams is a linked-list of all streams that have data to send and some
471 // stream-level flow control quota.
472 // Each of these streams internally have a list of data items(and perhaps trailers
473 // on the server-side) to be sent out.
474 activeStreams *outStreamList
475 framer *framer
476 hBuf *bytes.Buffer // The buffer for HPACK encoding.
477 hEnc *hpack.Encoder // HPACK encoder.
478 bdpEst *bdpEstimator
479 draining bool
480
481 // Side-specific handlers
482 ssGoAwayHandler func(*goAway) (bool, error)
483}
484
485func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator) *loopyWriter {
486 var buf bytes.Buffer
487 l := &loopyWriter{
488 side: s,
489 cbuf: cbuf,
490 sendQuota: defaultWindowSize,
491 oiws: defaultWindowSize,
492 estdStreams: make(map[uint32]*outStream),
493 activeStreams: newOutStreamList(),
494 framer: fr,
495 hBuf: &buf,
496 hEnc: hpack.NewEncoder(&buf),
497 bdpEst: bdpEst,
498 }
499 return l
500}
501
502const minBatchSize = 1000
503
504// run should be run in a separate goroutine.
505// It reads control frames from controlBuf and processes them by:
506// 1. Updating loopy's internal state, or/and
507// 2. Writing out HTTP2 frames on the wire.
508//
509// Loopy keeps all active streams with data to send in a linked-list.
510// All streams in the activeStreams linked-list must have both:
511// 1. Data to send, and
512// 2. Stream level flow control quota available.
513//
514// In each iteration of run loop, other than processing the incoming control
515// frame, loopy calls processData, which processes one node from the activeStreams linked-list.
516// This results in writing of HTTP2 frames into an underlying write buffer.
517// When there's no more control frames to read from controlBuf, loopy flushes the write buffer.
518// As an optimization, to increase the batch size for each flush, loopy yields the processor, once
519// if the batch size is too low to give stream goroutines a chance to fill it up.
520func (l *loopyWriter) run() (err error) {
521 defer func() {
522 if err == ErrConnClosing {
523 // Don't log ErrConnClosing as error since it happens
524 // 1. When the connection is closed by some other known issue.
525 // 2. User closed the connection.
526 // 3. A graceful close of connection.
527 if logger.V(logLevel) {
528 logger.Infof("transport: loopyWriter.run returning. %v", err)
529 }
530 err = nil
531 }
532 }()
533 for {
534 it, err := l.cbuf.get(true)
535 if err != nil {
536 return err
537 }
538 if err = l.handle(it); err != nil {
539 return err
540 }
541 if _, err = l.processData(); err != nil {
542 return err
543 }
544 gosched := true
545 hasdata:
546 for {
547 it, err := l.cbuf.get(false)
548 if err != nil {
549 return err
550 }
551 if it != nil {
552 if err = l.handle(it); err != nil {
553 return err
554 }
555 if _, err = l.processData(); err != nil {
556 return err
557 }
558 continue hasdata
559 }
560 isEmpty, err := l.processData()
561 if err != nil {
562 return err
563 }
564 if !isEmpty {
565 continue hasdata
566 }
567 if gosched {
568 gosched = false
569 if l.framer.writer.offset < minBatchSize {
570 runtime.Gosched()
571 continue hasdata
572 }
573 }
574 l.framer.writer.Flush()
575 break hasdata
576
577 }
578 }
579}
580
581func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error {
582 return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment)
583}
584
585func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) error {
586 // Otherwise update the quota.
587 if w.streamID == 0 {
588 l.sendQuota += w.increment
589 return nil
590 }
591 // Find the stream and update it.
592 if str, ok := l.estdStreams[w.streamID]; ok {
593 str.bytesOutStanding -= int(w.increment)
594 if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {
595 str.state = active
596 l.activeStreams.enqueue(str)
597 return nil
598 }
599 }
600 return nil
601}
602
603func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
604 return l.framer.fr.WriteSettings(s.ss...)
605}
606
607func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
608 if err := l.applySettings(s.ss); err != nil {
609 return err
610 }
611 return l.framer.fr.WriteSettingsAck()
612}
613
614func (l *loopyWriter) registerStreamHandler(h *registerStream) error {
615 str := &outStream{
616 id: h.streamID,
617 state: empty,
618 itl: &itemList{},
619 wq: h.wq,
620 }
621 l.estdStreams[h.streamID] = str
622 return nil
623}
624
625func (l *loopyWriter) headerHandler(h *headerFrame) error {
626 if l.side == serverSide {
627 str, ok := l.estdStreams[h.streamID]
628 if !ok {
629 if logger.V(logLevel) {
630 logger.Warningf("transport: loopy doesn't recognize the stream: %d", h.streamID)
631 }
632 return nil
633 }
634 // Case 1.A: Server is responding back with headers.
635 if !h.endStream {
636 return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
637 }
638 // else: Case 1.B: Server wants to close stream.
639
640 if str.state != empty { // either active or waiting on stream quota.
641 // add it str's list of items.
642 str.itl.enqueue(h)
643 return nil
644 }
645 if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
646 return err
647 }
648 return l.cleanupStreamHandler(h.cleanup)
649 }
650 // Case 2: Client wants to originate stream.
651 str := &outStream{
652 id: h.streamID,
653 state: empty,
654 itl: &itemList{},
655 wq: h.wq,
656 }
657 str.itl.enqueue(h)
658 return l.originateStream(str)
659}
660
661func (l *loopyWriter) originateStream(str *outStream) error {
662 hdr := str.itl.dequeue().(*headerFrame)
663 if err := hdr.initStream(str.id); err != nil {
664 if err == ErrConnClosing {
665 return err
666 }
667 // Other errors(errStreamDrain) need not close transport.
668 return nil
669 }
670 if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
671 return err
672 }
673 l.estdStreams[str.id] = str
674 return nil
675}
676
677func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error {
678 if onWrite != nil {
679 onWrite()
680 }
681 l.hBuf.Reset()
682 for _, f := range hf {
683 if err := l.hEnc.WriteField(f); err != nil {
684 if logger.V(logLevel) {
685 logger.Warningf("transport: loopyWriter.writeHeader encountered error while encoding headers: %v", err)
686 }
687 }
688 }
689 var (
690 err error
691 endHeaders, first bool
692 )
693 first = true
694 for !endHeaders {
695 size := l.hBuf.Len()
696 if size > http2MaxFrameLen {
697 size = http2MaxFrameLen
698 } else {
699 endHeaders = true
700 }
701 if first {
702 first = false
703 err = l.framer.fr.WriteHeaders(http2.HeadersFrameParam{
704 StreamID: streamID,
705 BlockFragment: l.hBuf.Next(size),
706 EndStream: endStream,
707 EndHeaders: endHeaders,
708 })
709 } else {
710 err = l.framer.fr.WriteContinuation(
711 streamID,
712 endHeaders,
713 l.hBuf.Next(size),
714 )
715 }
716 if err != nil {
717 return err
718 }
719 }
720 return nil
721}
722
723func (l *loopyWriter) preprocessData(df *dataFrame) error {
724 str, ok := l.estdStreams[df.streamID]
725 if !ok {
726 return nil
727 }
728 // If we got data for a stream it means that
729 // stream was originated and the headers were sent out.
730 str.itl.enqueue(df)
731 if str.state == empty {
732 str.state = active
733 l.activeStreams.enqueue(str)
734 }
735 return nil
736}
737
738func (l *loopyWriter) pingHandler(p *ping) error {
739 if !p.ack {
740 l.bdpEst.timesnap(p.data)
741 }
742 return l.framer.fr.WritePing(p.ack, p.data)
743
744}
745
746func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) error {
747 o.resp <- l.sendQuota
748 return nil
749}
750
751func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
752 c.onWrite()
753 if str, ok := l.estdStreams[c.streamID]; ok {
754 // On the server side it could be a trailers-only response or
755 // a RST_STREAM before stream initialization thus the stream might
756 // not be established yet.
757 delete(l.estdStreams, c.streamID)
758 str.deleteSelf()
759 }
760 if c.rst { // If RST_STREAM needs to be sent.
761 if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil {
762 return err
763 }
764 }
765 if l.side == clientSide && l.draining && len(l.estdStreams) == 0 {
766 return ErrConnClosing
767 }
768 return nil
769}
770
771func (l *loopyWriter) earlyAbortStreamHandler(eas *earlyAbortStream) error {
772 if l.side == clientSide {
773 return errors.New("earlyAbortStream not handled on client")
774 }
khenaidoo5cb0d402021-12-08 14:09:16 -0500775 // In case the caller forgets to set the http status, default to 200.
776 if eas.httpStatus == 0 {
777 eas.httpStatus = 200
778 }
khenaidoo5fc5cea2021-08-11 17:39:16 -0400779 headerFields := []hpack.HeaderField{
khenaidoo5cb0d402021-12-08 14:09:16 -0500780 {Name: ":status", Value: strconv.Itoa(int(eas.httpStatus))},
khenaidoo5fc5cea2021-08-11 17:39:16 -0400781 {Name: "content-type", Value: grpcutil.ContentType(eas.contentSubtype)},
782 {Name: "grpc-status", Value: strconv.Itoa(int(eas.status.Code()))},
783 {Name: "grpc-message", Value: encodeGrpcMessage(eas.status.Message())},
784 }
785
786 if err := l.writeHeader(eas.streamID, true, headerFields, nil); err != nil {
787 return err
788 }
789 return nil
790}
791
792func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
793 if l.side == clientSide {
794 l.draining = true
795 if len(l.estdStreams) == 0 {
796 return ErrConnClosing
797 }
798 }
799 return nil
800}
801
802func (l *loopyWriter) goAwayHandler(g *goAway) error {
803 // Handling of outgoing GoAway is very specific to side.
804 if l.ssGoAwayHandler != nil {
805 draining, err := l.ssGoAwayHandler(g)
806 if err != nil {
807 return err
808 }
809 l.draining = draining
810 }
811 return nil
812}
813
814func (l *loopyWriter) handle(i interface{}) error {
815 switch i := i.(type) {
816 case *incomingWindowUpdate:
817 return l.incomingWindowUpdateHandler(i)
818 case *outgoingWindowUpdate:
819 return l.outgoingWindowUpdateHandler(i)
820 case *incomingSettings:
821 return l.incomingSettingsHandler(i)
822 case *outgoingSettings:
823 return l.outgoingSettingsHandler(i)
824 case *headerFrame:
825 return l.headerHandler(i)
826 case *registerStream:
827 return l.registerStreamHandler(i)
828 case *cleanupStream:
829 return l.cleanupStreamHandler(i)
830 case *earlyAbortStream:
831 return l.earlyAbortStreamHandler(i)
832 case *incomingGoAway:
833 return l.incomingGoAwayHandler(i)
834 case *dataFrame:
835 return l.preprocessData(i)
836 case *ping:
837 return l.pingHandler(i)
838 case *goAway:
839 return l.goAwayHandler(i)
840 case *outFlowControlSizeRequest:
841 return l.outFlowControlSizeRequestHandler(i)
842 default:
843 return fmt.Errorf("transport: unknown control message type %T", i)
844 }
845}
846
847func (l *loopyWriter) applySettings(ss []http2.Setting) error {
848 for _, s := range ss {
849 switch s.ID {
850 case http2.SettingInitialWindowSize:
851 o := l.oiws
852 l.oiws = s.Val
853 if o < l.oiws {
854 // If the new limit is greater make all depleted streams active.
855 for _, stream := range l.estdStreams {
856 if stream.state == waitingOnStreamQuota {
857 stream.state = active
858 l.activeStreams.enqueue(stream)
859 }
860 }
861 }
862 case http2.SettingHeaderTableSize:
863 updateHeaderTblSize(l.hEnc, s.Val)
864 }
865 }
866 return nil
867}
868
869// processData removes the first stream from active streams, writes out at most 16KB
870// of its data and then puts it at the end of activeStreams if there's still more data
871// to be sent and stream has some stream-level flow control.
872func (l *loopyWriter) processData() (bool, error) {
873 if l.sendQuota == 0 {
874 return true, nil
875 }
876 str := l.activeStreams.dequeue() // Remove the first stream.
877 if str == nil {
878 return true, nil
879 }
880 dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
881 // A data item is represented by a dataFrame, since it later translates into
882 // multiple HTTP2 data frames.
883 // Every dataFrame has two buffers; h that keeps grpc-message header and d that is acutal data.
884 // As an optimization to keep wire traffic low, data from d is copied to h to make as big as the
885 // maximum possilbe HTTP2 frame size.
886
887 if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
888 // Client sends out empty data frame with endStream = true
889 if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
890 return false, err
891 }
892 str.itl.dequeue() // remove the empty data item from stream
893 if str.itl.isEmpty() {
894 str.state = empty
895 } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
896 if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
897 return false, err
898 }
899 if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
900 return false, nil
901 }
902 } else {
903 l.activeStreams.enqueue(str)
904 }
905 return false, nil
906 }
907 var (
908 buf []byte
909 )
910 // Figure out the maximum size we can send
911 maxSize := http2MaxFrameLen
912 if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.
913 str.state = waitingOnStreamQuota
914 return false, nil
915 } else if maxSize > strQuota {
916 maxSize = strQuota
917 }
918 if maxSize > int(l.sendQuota) { // connection-level flow control.
919 maxSize = int(l.sendQuota)
920 }
921 // Compute how much of the header and data we can send within quota and max frame length
922 hSize := min(maxSize, len(dataItem.h))
923 dSize := min(maxSize-hSize, len(dataItem.d))
924 if hSize != 0 {
925 if dSize == 0 {
926 buf = dataItem.h
927 } else {
928 // We can add some data to grpc message header to distribute bytes more equally across frames.
929 // Copy on the stack to avoid generating garbage
930 var localBuf [http2MaxFrameLen]byte
931 copy(localBuf[:hSize], dataItem.h)
932 copy(localBuf[hSize:], dataItem.d[:dSize])
933 buf = localBuf[:hSize+dSize]
934 }
935 } else {
936 buf = dataItem.d
937 }
938
939 size := hSize + dSize
940
941 // Now that outgoing flow controls are checked we can replenish str's write quota
942 str.wq.replenish(size)
943 var endStream bool
944 // If this is the last data message on this stream and all of it can be written in this iteration.
945 if dataItem.endStream && len(dataItem.h)+len(dataItem.d) <= size {
946 endStream = true
947 }
948 if dataItem.onEachWrite != nil {
949 dataItem.onEachWrite()
950 }
951 if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil {
952 return false, err
953 }
954 str.bytesOutStanding += size
955 l.sendQuota -= uint32(size)
956 dataItem.h = dataItem.h[hSize:]
957 dataItem.d = dataItem.d[dSize:]
958
959 if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out.
960 str.itl.dequeue()
961 }
962 if str.itl.isEmpty() {
963 str.state = empty
964 } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers.
965 if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
966 return false, err
967 }
968 if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
969 return false, err
970 }
971 } else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota.
972 str.state = waitingOnStreamQuota
973 } else { // Otherwise add it back to the list of active streams.
974 l.activeStreams.enqueue(str)
975 }
976 return false, nil
977}
978
979func min(a, b int) int {
980 if a < b {
981 return a
982 }
983 return b
984}