blob: 204ba1588bbfd5a0c6902e32a73a6fab32e9fcc3 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "bytes"
23 "fmt"
24 "runtime"
25 "sync"
26
27 "golang.org/x/net/http2"
28 "golang.org/x/net/http2/hpack"
29)
30
31var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
32 e.SetMaxDynamicTableSizeLimit(v)
33}
34
35type itemNode struct {
36 it interface{}
37 next *itemNode
38}
39
40type itemList struct {
41 head *itemNode
42 tail *itemNode
43}
44
45func (il *itemList) enqueue(i interface{}) {
46 n := &itemNode{it: i}
47 if il.tail == nil {
48 il.head, il.tail = n, n
49 return
50 }
51 il.tail.next = n
52 il.tail = n
53}
54
55// peek returns the first item in the list without removing it from the
56// list.
57func (il *itemList) peek() interface{} {
58 return il.head.it
59}
60
61func (il *itemList) dequeue() interface{} {
62 if il.head == nil {
63 return nil
64 }
65 i := il.head.it
66 il.head = il.head.next
67 if il.head == nil {
68 il.tail = nil
69 }
70 return i
71}
72
73func (il *itemList) dequeueAll() *itemNode {
74 h := il.head
75 il.head, il.tail = nil, nil
76 return h
77}
78
79func (il *itemList) isEmpty() bool {
80 return il.head == nil
81}
82
83// The following defines various control items which could flow through
84// the control buffer of transport. They represent different aspects of
85// control tasks, e.g., flow control, settings, streaming resetting, etc.
86
87// registerStream is used to register an incoming stream with loopy writer.
88type registerStream struct {
89 streamID uint32
90 wq *writeQuota
91}
92
93// headerFrame is also used to register stream on the client-side.
94type headerFrame struct {
95 streamID uint32
96 hf []hpack.HeaderField
97 endStream bool // Valid on server side.
98 initStream func(uint32) (bool, error) // Used only on the client side.
99 onWrite func()
100 wq *writeQuota // write quota for the stream created.
101 cleanup *cleanupStream // Valid on the server side.
102 onOrphaned func(error) // Valid on client-side
103}
104
105type cleanupStream struct {
106 streamID uint32
107 rst bool
108 rstCode http2.ErrCode
109 onWrite func()
110}
111
112type dataFrame struct {
113 streamID uint32
114 endStream bool
115 h []byte
116 d []byte
117 // onEachWrite is called every time
118 // a part of d is written out.
119 onEachWrite func()
120}
121
122type incomingWindowUpdate struct {
123 streamID uint32
124 increment uint32
125}
126
127type outgoingWindowUpdate struct {
128 streamID uint32
129 increment uint32
130}
131
132type incomingSettings struct {
133 ss []http2.Setting
134}
135
136type outgoingSettings struct {
137 ss []http2.Setting
138}
139
140type incomingGoAway struct {
141}
142
143type goAway struct {
144 code http2.ErrCode
145 debugData []byte
146 headsUp bool
147 closeConn bool
148}
149
150type ping struct {
151 ack bool
152 data [8]byte
153}
154
155type outFlowControlSizeRequest struct {
156 resp chan uint32
157}
158
159type outStreamState int
160
161const (
162 active outStreamState = iota
163 empty
164 waitingOnStreamQuota
165)
166
167type outStream struct {
168 id uint32
169 state outStreamState
170 itl *itemList
171 bytesOutStanding int
172 wq *writeQuota
173
174 next *outStream
175 prev *outStream
176}
177
178func (s *outStream) deleteSelf() {
179 if s.prev != nil {
180 s.prev.next = s.next
181 }
182 if s.next != nil {
183 s.next.prev = s.prev
184 }
185 s.next, s.prev = nil, nil
186}
187
188type outStreamList struct {
189 // Following are sentinel objects that mark the
190 // beginning and end of the list. They do not
191 // contain any item lists. All valid objects are
192 // inserted in between them.
193 // This is needed so that an outStream object can
194 // deleteSelf() in O(1) time without knowing which
195 // list it belongs to.
196 head *outStream
197 tail *outStream
198}
199
200func newOutStreamList() *outStreamList {
201 head, tail := new(outStream), new(outStream)
202 head.next = tail
203 tail.prev = head
204 return &outStreamList{
205 head: head,
206 tail: tail,
207 }
208}
209
210func (l *outStreamList) enqueue(s *outStream) {
211 e := l.tail.prev
212 e.next = s
213 s.prev = e
214 s.next = l.tail
215 l.tail.prev = s
216}
217
218// remove from the beginning of the list.
219func (l *outStreamList) dequeue() *outStream {
220 b := l.head.next
221 if b == l.tail {
222 return nil
223 }
224 b.deleteSelf()
225 return b
226}
227
228// controlBuffer is a way to pass information to loopy.
229// Information is passed as specific struct types called control frames.
230// A control frame not only represents data, messages or headers to be sent out
231// but can also be used to instruct loopy to update its internal state.
232// It shouldn't be confused with an HTTP2 frame, although some of the control frames
233// like dataFrame and headerFrame do go out on wire as HTTP2 frames.
234type controlBuffer struct {
235 ch chan struct{}
236 done <-chan struct{}
237 mu sync.Mutex
238 consumerWaiting bool
239 list *itemList
240 err error
241}
242
243func newControlBuffer(done <-chan struct{}) *controlBuffer {
244 return &controlBuffer{
245 ch: make(chan struct{}, 1),
246 list: &itemList{},
247 done: done,
248 }
249}
250
251func (c *controlBuffer) put(it interface{}) error {
252 _, err := c.executeAndPut(nil, it)
253 return err
254}
255
256func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it interface{}) (bool, error) {
257 var wakeUp bool
258 c.mu.Lock()
259 if c.err != nil {
260 c.mu.Unlock()
261 return false, c.err
262 }
263 if f != nil {
264 if !f(it) { // f wasn't successful
265 c.mu.Unlock()
266 return false, nil
267 }
268 }
269 if c.consumerWaiting {
270 wakeUp = true
271 c.consumerWaiting = false
272 }
273 c.list.enqueue(it)
274 c.mu.Unlock()
275 if wakeUp {
276 select {
277 case c.ch <- struct{}{}:
278 default:
279 }
280 }
281 return true, nil
282}
283
284// Note argument f should never be nil.
285func (c *controlBuffer) execute(f func(it interface{}) bool, it interface{}) (bool, error) {
286 c.mu.Lock()
287 if c.err != nil {
288 c.mu.Unlock()
289 return false, c.err
290 }
291 if !f(it) { // f wasn't successful
292 c.mu.Unlock()
293 return false, nil
294 }
295 c.mu.Unlock()
296 return true, nil
297}
298
299func (c *controlBuffer) get(block bool) (interface{}, error) {
300 for {
301 c.mu.Lock()
302 if c.err != nil {
303 c.mu.Unlock()
304 return nil, c.err
305 }
306 if !c.list.isEmpty() {
307 h := c.list.dequeue()
308 c.mu.Unlock()
309 return h, nil
310 }
311 if !block {
312 c.mu.Unlock()
313 return nil, nil
314 }
315 c.consumerWaiting = true
316 c.mu.Unlock()
317 select {
318 case <-c.ch:
319 case <-c.done:
320 c.finish()
321 return nil, ErrConnClosing
322 }
323 }
324}
325
326func (c *controlBuffer) finish() {
327 c.mu.Lock()
328 if c.err != nil {
329 c.mu.Unlock()
330 return
331 }
332 c.err = ErrConnClosing
333 // There may be headers for streams in the control buffer.
334 // These streams need to be cleaned out since the transport
335 // is still not aware of these yet.
336 for head := c.list.dequeueAll(); head != nil; head = head.next {
337 hdr, ok := head.it.(*headerFrame)
338 if !ok {
339 continue
340 }
341 if hdr.onOrphaned != nil { // It will be nil on the server-side.
342 hdr.onOrphaned(ErrConnClosing)
343 }
344 }
345 c.mu.Unlock()
346}
347
348type side int
349
350const (
351 clientSide side = iota
352 serverSide
353)
354
355// Loopy receives frames from the control buffer.
356// Each frame is handled individually; most of the work done by loopy goes
357// into handling data frames. Loopy maintains a queue of active streams, and each
358// stream maintains a queue of data frames; as loopy receives data frames
359// it gets added to the queue of the relevant stream.
360// Loopy goes over this list of active streams by processing one node every iteration,
361// thereby closely resemebling to a round-robin scheduling over all streams. While
362// processing a stream, loopy writes out data bytes from this stream capped by the min
363// of http2MaxFrameLen, connection-level flow control and stream-level flow control.
364type loopyWriter struct {
365 side side
366 cbuf *controlBuffer
367 sendQuota uint32
368 oiws uint32 // outbound initial window size.
369 // estdStreams is map of all established streams that are not cleaned-up yet.
370 // On client-side, this is all streams whose headers were sent out.
371 // On server-side, this is all streams whose headers were received.
372 estdStreams map[uint32]*outStream // Established streams.
373 // activeStreams is a linked-list of all streams that have data to send and some
374 // stream-level flow control quota.
375 // Each of these streams internally have a list of data items(and perhaps trailers
376 // on the server-side) to be sent out.
377 activeStreams *outStreamList
378 framer *framer
379 hBuf *bytes.Buffer // The buffer for HPACK encoding.
380 hEnc *hpack.Encoder // HPACK encoder.
381 bdpEst *bdpEstimator
382 draining bool
383
384 // Side-specific handlers
385 ssGoAwayHandler func(*goAway) (bool, error)
386}
387
388func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator) *loopyWriter {
389 var buf bytes.Buffer
390 l := &loopyWriter{
391 side: s,
392 cbuf: cbuf,
393 sendQuota: defaultWindowSize,
394 oiws: defaultWindowSize,
395 estdStreams: make(map[uint32]*outStream),
396 activeStreams: newOutStreamList(),
397 framer: fr,
398 hBuf: &buf,
399 hEnc: hpack.NewEncoder(&buf),
400 bdpEst: bdpEst,
401 }
402 return l
403}
404
405const minBatchSize = 1000
406
407// run should be run in a separate goroutine.
408// It reads control frames from controlBuf and processes them by:
409// 1. Updating loopy's internal state, or/and
410// 2. Writing out HTTP2 frames on the wire.
411//
412// Loopy keeps all active streams with data to send in a linked-list.
413// All streams in the activeStreams linked-list must have both:
414// 1. Data to send, and
415// 2. Stream level flow control quota available.
416//
417// In each iteration of run loop, other than processing the incoming control
418// frame, loopy calls processData, which processes one node from the activeStreams linked-list.
419// This results in writing of HTTP2 frames into an underlying write buffer.
420// When there's no more control frames to read from controlBuf, loopy flushes the write buffer.
421// As an optimization, to increase the batch size for each flush, loopy yields the processor, once
422// if the batch size is too low to give stream goroutines a chance to fill it up.
423func (l *loopyWriter) run() (err error) {
424 defer func() {
425 if err == ErrConnClosing {
426 // Don't log ErrConnClosing as error since it happens
427 // 1. When the connection is closed by some other known issue.
428 // 2. User closed the connection.
429 // 3. A graceful close of connection.
430 infof("transport: loopyWriter.run returning. %v", err)
431 err = nil
432 }
433 }()
434 for {
435 it, err := l.cbuf.get(true)
436 if err != nil {
437 return err
438 }
439 if err = l.handle(it); err != nil {
440 return err
441 }
442 if _, err = l.processData(); err != nil {
443 return err
444 }
445 gosched := true
446 hasdata:
447 for {
448 it, err := l.cbuf.get(false)
449 if err != nil {
450 return err
451 }
452 if it != nil {
453 if err = l.handle(it); err != nil {
454 return err
455 }
456 if _, err = l.processData(); err != nil {
457 return err
458 }
459 continue hasdata
460 }
461 isEmpty, err := l.processData()
462 if err != nil {
463 return err
464 }
465 if !isEmpty {
466 continue hasdata
467 }
468 if gosched {
469 gosched = false
470 if l.framer.writer.offset < minBatchSize {
471 runtime.Gosched()
472 continue hasdata
473 }
474 }
475 l.framer.writer.Flush()
476 break hasdata
477
478 }
479 }
480}
481
482func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error {
483 return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment)
484}
485
486func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) error {
487 // Otherwise update the quota.
488 if w.streamID == 0 {
489 l.sendQuota += w.increment
490 return nil
491 }
492 // Find the stream and update it.
493 if str, ok := l.estdStreams[w.streamID]; ok {
494 str.bytesOutStanding -= int(w.increment)
495 if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {
496 str.state = active
497 l.activeStreams.enqueue(str)
498 return nil
499 }
500 }
501 return nil
502}
503
504func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
505 return l.framer.fr.WriteSettings(s.ss...)
506}
507
508func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
509 if err := l.applySettings(s.ss); err != nil {
510 return err
511 }
512 return l.framer.fr.WriteSettingsAck()
513}
514
515func (l *loopyWriter) registerStreamHandler(h *registerStream) error {
516 str := &outStream{
517 id: h.streamID,
518 state: empty,
519 itl: &itemList{},
520 wq: h.wq,
521 }
522 l.estdStreams[h.streamID] = str
523 return nil
524}
525
526func (l *loopyWriter) headerHandler(h *headerFrame) error {
527 if l.side == serverSide {
528 str, ok := l.estdStreams[h.streamID]
529 if !ok {
530 warningf("transport: loopy doesn't recognize the stream: %d", h.streamID)
531 return nil
532 }
533 // Case 1.A: Server is responding back with headers.
534 if !h.endStream {
535 return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
536 }
537 // else: Case 1.B: Server wants to close stream.
538
539 if str.state != empty { // either active or waiting on stream quota.
540 // add it str's list of items.
541 str.itl.enqueue(h)
542 return nil
543 }
544 if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
545 return err
546 }
547 return l.cleanupStreamHandler(h.cleanup)
548 }
549 // Case 2: Client wants to originate stream.
550 str := &outStream{
551 id: h.streamID,
552 state: empty,
553 itl: &itemList{},
554 wq: h.wq,
555 }
556 str.itl.enqueue(h)
557 return l.originateStream(str)
558}
559
560func (l *loopyWriter) originateStream(str *outStream) error {
561 hdr := str.itl.dequeue().(*headerFrame)
562 sendPing, err := hdr.initStream(str.id)
563 if err != nil {
564 if err == ErrConnClosing {
565 return err
566 }
567 // Other errors(errStreamDrain) need not close transport.
568 return nil
569 }
570 if err = l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
571 return err
572 }
573 l.estdStreams[str.id] = str
574 if sendPing {
575 return l.pingHandler(&ping{data: [8]byte{}})
576 }
577 return nil
578}
579
580func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error {
581 if onWrite != nil {
582 onWrite()
583 }
584 l.hBuf.Reset()
585 for _, f := range hf {
586 if err := l.hEnc.WriteField(f); err != nil {
587 warningf("transport: loopyWriter.writeHeader encountered error while encoding headers:", err)
588 }
589 }
590 var (
591 err error
592 endHeaders, first bool
593 )
594 first = true
595 for !endHeaders {
596 size := l.hBuf.Len()
597 if size > http2MaxFrameLen {
598 size = http2MaxFrameLen
599 } else {
600 endHeaders = true
601 }
602 if first {
603 first = false
604 err = l.framer.fr.WriteHeaders(http2.HeadersFrameParam{
605 StreamID: streamID,
606 BlockFragment: l.hBuf.Next(size),
607 EndStream: endStream,
608 EndHeaders: endHeaders,
609 })
610 } else {
611 err = l.framer.fr.WriteContinuation(
612 streamID,
613 endHeaders,
614 l.hBuf.Next(size),
615 )
616 }
617 if err != nil {
618 return err
619 }
620 }
621 return nil
622}
623
624func (l *loopyWriter) preprocessData(df *dataFrame) error {
625 str, ok := l.estdStreams[df.streamID]
626 if !ok {
627 return nil
628 }
629 // If we got data for a stream it means that
630 // stream was originated and the headers were sent out.
631 str.itl.enqueue(df)
632 if str.state == empty {
633 str.state = active
634 l.activeStreams.enqueue(str)
635 }
636 return nil
637}
638
639func (l *loopyWriter) pingHandler(p *ping) error {
640 if !p.ack {
641 l.bdpEst.timesnap(p.data)
642 }
643 return l.framer.fr.WritePing(p.ack, p.data)
644
645}
646
647func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) error {
648 o.resp <- l.sendQuota
649 return nil
650}
651
652func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
653 c.onWrite()
654 if str, ok := l.estdStreams[c.streamID]; ok {
655 // On the server side it could be a trailers-only response or
656 // a RST_STREAM before stream initialization thus the stream might
657 // not be established yet.
658 delete(l.estdStreams, c.streamID)
659 str.deleteSelf()
660 }
661 if c.rst { // If RST_STREAM needs to be sent.
662 if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil {
663 return err
664 }
665 }
666 if l.side == clientSide && l.draining && len(l.estdStreams) == 0 {
667 return ErrConnClosing
668 }
669 return nil
670}
671
672func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
673 if l.side == clientSide {
674 l.draining = true
675 if len(l.estdStreams) == 0 {
676 return ErrConnClosing
677 }
678 }
679 return nil
680}
681
682func (l *loopyWriter) goAwayHandler(g *goAway) error {
683 // Handling of outgoing GoAway is very specific to side.
684 if l.ssGoAwayHandler != nil {
685 draining, err := l.ssGoAwayHandler(g)
686 if err != nil {
687 return err
688 }
689 l.draining = draining
690 }
691 return nil
692}
693
694func (l *loopyWriter) handle(i interface{}) error {
695 switch i := i.(type) {
696 case *incomingWindowUpdate:
697 return l.incomingWindowUpdateHandler(i)
698 case *outgoingWindowUpdate:
699 return l.outgoingWindowUpdateHandler(i)
700 case *incomingSettings:
701 return l.incomingSettingsHandler(i)
702 case *outgoingSettings:
703 return l.outgoingSettingsHandler(i)
704 case *headerFrame:
705 return l.headerHandler(i)
706 case *registerStream:
707 return l.registerStreamHandler(i)
708 case *cleanupStream:
709 return l.cleanupStreamHandler(i)
710 case *incomingGoAway:
711 return l.incomingGoAwayHandler(i)
712 case *dataFrame:
713 return l.preprocessData(i)
714 case *ping:
715 return l.pingHandler(i)
716 case *goAway:
717 return l.goAwayHandler(i)
718 case *outFlowControlSizeRequest:
719 return l.outFlowControlSizeRequestHandler(i)
720 default:
721 return fmt.Errorf("transport: unknown control message type %T", i)
722 }
723}
724
725func (l *loopyWriter) applySettings(ss []http2.Setting) error {
726 for _, s := range ss {
727 switch s.ID {
728 case http2.SettingInitialWindowSize:
729 o := l.oiws
730 l.oiws = s.Val
731 if o < l.oiws {
732 // If the new limit is greater make all depleted streams active.
733 for _, stream := range l.estdStreams {
734 if stream.state == waitingOnStreamQuota {
735 stream.state = active
736 l.activeStreams.enqueue(stream)
737 }
738 }
739 }
740 case http2.SettingHeaderTableSize:
741 updateHeaderTblSize(l.hEnc, s.Val)
742 }
743 }
744 return nil
745}
746
747// processData removes the first stream from active streams, writes out at most 16KB
748// of its data and then puts it at the end of activeStreams if there's still more data
749// to be sent and stream has some stream-level flow control.
750func (l *loopyWriter) processData() (bool, error) {
751 if l.sendQuota == 0 {
752 return true, nil
753 }
754 str := l.activeStreams.dequeue() // Remove the first stream.
755 if str == nil {
756 return true, nil
757 }
758 dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
759 // A data item is represented by a dataFrame, since it later translates into
760 // multiple HTTP2 data frames.
761 // Every dataFrame has two buffers; h that keeps grpc-message header and d that is acutal data.
762 // As an optimization to keep wire traffic low, data from d is copied to h to make as big as the
763 // maximum possilbe HTTP2 frame size.
764
765 if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
766 // Client sends out empty data frame with endStream = true
767 if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
768 return false, err
769 }
770 str.itl.dequeue() // remove the empty data item from stream
771 if str.itl.isEmpty() {
772 str.state = empty
773 } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
774 if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
775 return false, err
776 }
777 if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
778 return false, nil
779 }
780 } else {
781 l.activeStreams.enqueue(str)
782 }
783 return false, nil
784 }
785 var (
786 idx int
787 buf []byte
788 )
789 if len(dataItem.h) != 0 { // data header has not been written out yet.
790 buf = dataItem.h
791 } else {
792 idx = 1
793 buf = dataItem.d
794 }
795 size := http2MaxFrameLen
796 if len(buf) < size {
797 size = len(buf)
798 }
799 if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.
800 str.state = waitingOnStreamQuota
801 return false, nil
802 } else if strQuota < size {
803 size = strQuota
804 }
805
806 if l.sendQuota < uint32(size) { // connection-level flow control.
807 size = int(l.sendQuota)
808 }
809 // Now that outgoing flow controls are checked we can replenish str's write quota
810 str.wq.replenish(size)
811 var endStream bool
812 // If this is the last data message on this stream and all of it can be written in this iteration.
813 if dataItem.endStream && size == len(buf) {
814 // buf contains either data or it contains header but data is empty.
815 if idx == 1 || len(dataItem.d) == 0 {
816 endStream = true
817 }
818 }
819 if dataItem.onEachWrite != nil {
820 dataItem.onEachWrite()
821 }
822 if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil {
823 return false, err
824 }
825 buf = buf[size:]
826 str.bytesOutStanding += size
827 l.sendQuota -= uint32(size)
828 if idx == 0 {
829 dataItem.h = buf
830 } else {
831 dataItem.d = buf
832 }
833
834 if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out.
835 str.itl.dequeue()
836 }
837 if str.itl.isEmpty() {
838 str.state = empty
839 } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers.
840 if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
841 return false, err
842 }
843 if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
844 return false, err
845 }
846 } else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota.
847 str.state = waitingOnStreamQuota
848 } else { // Otherwise add it back to the list of active streams.
849 l.activeStreams.enqueue(str)
850 }
851 return false, nil
852}