blob: ddee20b6bef2b12eaa9e95fbb531aac0e4f32564 [file] [log] [blame]
Don Newton98fd8812019-09-23 15:15:02 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "bytes"
23 "fmt"
24 "runtime"
25 "sync"
26 "sync/atomic"
27
28 "golang.org/x/net/http2"
29 "golang.org/x/net/http2/hpack"
30)
31
32var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
33 e.SetMaxDynamicTableSizeLimit(v)
34}
35
36type itemNode struct {
37 it interface{}
38 next *itemNode
39}
40
41type itemList struct {
42 head *itemNode
43 tail *itemNode
44}
45
46func (il *itemList) enqueue(i interface{}) {
47 n := &itemNode{it: i}
48 if il.tail == nil {
49 il.head, il.tail = n, n
50 return
51 }
52 il.tail.next = n
53 il.tail = n
54}
55
56// peek returns the first item in the list without removing it from the
57// list.
58func (il *itemList) peek() interface{} {
59 return il.head.it
60}
61
62func (il *itemList) dequeue() interface{} {
63 if il.head == nil {
64 return nil
65 }
66 i := il.head.it
67 il.head = il.head.next
68 if il.head == nil {
69 il.tail = nil
70 }
71 return i
72}
73
74func (il *itemList) dequeueAll() *itemNode {
75 h := il.head
76 il.head, il.tail = nil, nil
77 return h
78}
79
80func (il *itemList) isEmpty() bool {
81 return il.head == nil
82}
83
84// The following defines various control items which could flow through
85// the control buffer of transport. They represent different aspects of
86// control tasks, e.g., flow control, settings, streaming resetting, etc.
87
88// maxQueuedTransportResponseFrames is the most queued "transport response"
89// frames we will buffer before preventing new reads from occurring on the
90// transport. These are control frames sent in response to client requests,
91// such as RST_STREAM due to bad headers or settings acks.
92const maxQueuedTransportResponseFrames = 50
93
94type cbItem interface {
95 isTransportResponseFrame() bool
96}
97
98// registerStream is used to register an incoming stream with loopy writer.
99type registerStream struct {
100 streamID uint32
101 wq *writeQuota
102}
103
104func (*registerStream) isTransportResponseFrame() bool { return false }
105
106// headerFrame is also used to register stream on the client-side.
107type headerFrame struct {
108 streamID uint32
109 hf []hpack.HeaderField
Don Newtone0d34a82019-11-14 10:58:06 -0500110 endStream bool // Valid on server side.
111 initStream func(uint32) error // Used only on the client side.
Don Newton98fd8812019-09-23 15:15:02 -0400112 onWrite func()
113 wq *writeQuota // write quota for the stream created.
114 cleanup *cleanupStream // Valid on the server side.
115 onOrphaned func(error) // Valid on client-side
116}
117
118func (h *headerFrame) isTransportResponseFrame() bool {
119 return h.cleanup != nil && h.cleanup.rst // Results in a RST_STREAM
120}
121
122type cleanupStream struct {
123 streamID uint32
124 rst bool
125 rstCode http2.ErrCode
126 onWrite func()
127}
128
129func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
130
131type dataFrame struct {
132 streamID uint32
133 endStream bool
134 h []byte
135 d []byte
136 // onEachWrite is called every time
137 // a part of d is written out.
138 onEachWrite func()
139}
140
141func (*dataFrame) isTransportResponseFrame() bool { return false }
142
143type incomingWindowUpdate struct {
144 streamID uint32
145 increment uint32
146}
147
148func (*incomingWindowUpdate) isTransportResponseFrame() bool { return false }
149
150type outgoingWindowUpdate struct {
151 streamID uint32
152 increment uint32
153}
154
155func (*outgoingWindowUpdate) isTransportResponseFrame() bool {
156 return false // window updates are throttled by thresholds
157}
158
159type incomingSettings struct {
160 ss []http2.Setting
161}
162
163func (*incomingSettings) isTransportResponseFrame() bool { return true } // Results in a settings ACK
164
165type outgoingSettings struct {
166 ss []http2.Setting
167}
168
169func (*outgoingSettings) isTransportResponseFrame() bool { return false }
170
171type incomingGoAway struct {
172}
173
174func (*incomingGoAway) isTransportResponseFrame() bool { return false }
175
176type goAway struct {
177 code http2.ErrCode
178 debugData []byte
179 headsUp bool
180 closeConn bool
181}
182
183func (*goAway) isTransportResponseFrame() bool { return false }
184
185type ping struct {
186 ack bool
187 data [8]byte
188}
189
190func (*ping) isTransportResponseFrame() bool { return true }
191
192type outFlowControlSizeRequest struct {
193 resp chan uint32
194}
195
196func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }
197
198type outStreamState int
199
200const (
201 active outStreamState = iota
202 empty
203 waitingOnStreamQuota
204)
205
206type outStream struct {
207 id uint32
208 state outStreamState
209 itl *itemList
210 bytesOutStanding int
211 wq *writeQuota
212
213 next *outStream
214 prev *outStream
215}
216
217func (s *outStream) deleteSelf() {
218 if s.prev != nil {
219 s.prev.next = s.next
220 }
221 if s.next != nil {
222 s.next.prev = s.prev
223 }
224 s.next, s.prev = nil, nil
225}
226
227type outStreamList struct {
228 // Following are sentinel objects that mark the
229 // beginning and end of the list. They do not
230 // contain any item lists. All valid objects are
231 // inserted in between them.
232 // This is needed so that an outStream object can
233 // deleteSelf() in O(1) time without knowing which
234 // list it belongs to.
235 head *outStream
236 tail *outStream
237}
238
239func newOutStreamList() *outStreamList {
240 head, tail := new(outStream), new(outStream)
241 head.next = tail
242 tail.prev = head
243 return &outStreamList{
244 head: head,
245 tail: tail,
246 }
247}
248
249func (l *outStreamList) enqueue(s *outStream) {
250 e := l.tail.prev
251 e.next = s
252 s.prev = e
253 s.next = l.tail
254 l.tail.prev = s
255}
256
257// remove from the beginning of the list.
258func (l *outStreamList) dequeue() *outStream {
259 b := l.head.next
260 if b == l.tail {
261 return nil
262 }
263 b.deleteSelf()
264 return b
265}
266
267// controlBuffer is a way to pass information to loopy.
268// Information is passed as specific struct types called control frames.
269// A control frame not only represents data, messages or headers to be sent out
270// but can also be used to instruct loopy to update its internal state.
271// It shouldn't be confused with an HTTP2 frame, although some of the control frames
272// like dataFrame and headerFrame do go out on wire as HTTP2 frames.
273type controlBuffer struct {
274 ch chan struct{}
275 done <-chan struct{}
276 mu sync.Mutex
277 consumerWaiting bool
278 list *itemList
279 err error
280
281 // transportResponseFrames counts the number of queued items that represent
282 // the response of an action initiated by the peer. trfChan is created
283 // when transportResponseFrames >= maxQueuedTransportResponseFrames and is
284 // closed and nilled when transportResponseFrames drops below the
285 // threshold. Both fields are protected by mu.
286 transportResponseFrames int
287 trfChan atomic.Value // *chan struct{}
288}
289
290func newControlBuffer(done <-chan struct{}) *controlBuffer {
291 return &controlBuffer{
292 ch: make(chan struct{}, 1),
293 list: &itemList{},
294 done: done,
295 }
296}
297
298// throttle blocks if there are too many incomingSettings/cleanupStreams in the
299// controlbuf.
300func (c *controlBuffer) throttle() {
301 ch, _ := c.trfChan.Load().(*chan struct{})
302 if ch != nil {
303 select {
304 case <-*ch:
305 case <-c.done:
306 }
307 }
308}
309
310func (c *controlBuffer) put(it cbItem) error {
311 _, err := c.executeAndPut(nil, it)
312 return err
313}
314
315func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (bool, error) {
316 var wakeUp bool
317 c.mu.Lock()
318 if c.err != nil {
319 c.mu.Unlock()
320 return false, c.err
321 }
322 if f != nil {
323 if !f(it) { // f wasn't successful
324 c.mu.Unlock()
325 return false, nil
326 }
327 }
328 if c.consumerWaiting {
329 wakeUp = true
330 c.consumerWaiting = false
331 }
332 c.list.enqueue(it)
333 if it.isTransportResponseFrame() {
334 c.transportResponseFrames++
335 if c.transportResponseFrames == maxQueuedTransportResponseFrames {
336 // We are adding the frame that puts us over the threshold; create
337 // a throttling channel.
338 ch := make(chan struct{})
339 c.trfChan.Store(&ch)
340 }
341 }
342 c.mu.Unlock()
343 if wakeUp {
344 select {
345 case c.ch <- struct{}{}:
346 default:
347 }
348 }
349 return true, nil
350}
351
352// Note argument f should never be nil.
353func (c *controlBuffer) execute(f func(it interface{}) bool, it interface{}) (bool, error) {
354 c.mu.Lock()
355 if c.err != nil {
356 c.mu.Unlock()
357 return false, c.err
358 }
359 if !f(it) { // f wasn't successful
360 c.mu.Unlock()
361 return false, nil
362 }
363 c.mu.Unlock()
364 return true, nil
365}
366
367func (c *controlBuffer) get(block bool) (interface{}, error) {
368 for {
369 c.mu.Lock()
370 if c.err != nil {
371 c.mu.Unlock()
372 return nil, c.err
373 }
374 if !c.list.isEmpty() {
375 h := c.list.dequeue().(cbItem)
376 if h.isTransportResponseFrame() {
377 if c.transportResponseFrames == maxQueuedTransportResponseFrames {
378 // We are removing the frame that put us over the
379 // threshold; close and clear the throttling channel.
380 ch := c.trfChan.Load().(*chan struct{})
381 close(*ch)
382 c.trfChan.Store((*chan struct{})(nil))
383 }
384 c.transportResponseFrames--
385 }
386 c.mu.Unlock()
387 return h, nil
388 }
389 if !block {
390 c.mu.Unlock()
391 return nil, nil
392 }
393 c.consumerWaiting = true
394 c.mu.Unlock()
395 select {
396 case <-c.ch:
397 case <-c.done:
398 c.finish()
399 return nil, ErrConnClosing
400 }
401 }
402}
403
404func (c *controlBuffer) finish() {
405 c.mu.Lock()
406 if c.err != nil {
407 c.mu.Unlock()
408 return
409 }
410 c.err = ErrConnClosing
411 // There may be headers for streams in the control buffer.
412 // These streams need to be cleaned out since the transport
413 // is still not aware of these yet.
414 for head := c.list.dequeueAll(); head != nil; head = head.next {
415 hdr, ok := head.it.(*headerFrame)
416 if !ok {
417 continue
418 }
419 if hdr.onOrphaned != nil { // It will be nil on the server-side.
420 hdr.onOrphaned(ErrConnClosing)
421 }
422 }
423 c.mu.Unlock()
424}
425
426type side int
427
428const (
429 clientSide side = iota
430 serverSide
431)
432
433// Loopy receives frames from the control buffer.
434// Each frame is handled individually; most of the work done by loopy goes
435// into handling data frames. Loopy maintains a queue of active streams, and each
436// stream maintains a queue of data frames; as loopy receives data frames
437// it gets added to the queue of the relevant stream.
438// Loopy goes over this list of active streams by processing one node every iteration,
439// thereby closely resemebling to a round-robin scheduling over all streams. While
440// processing a stream, loopy writes out data bytes from this stream capped by the min
441// of http2MaxFrameLen, connection-level flow control and stream-level flow control.
442type loopyWriter struct {
443 side side
444 cbuf *controlBuffer
445 sendQuota uint32
446 oiws uint32 // outbound initial window size.
447 // estdStreams is map of all established streams that are not cleaned-up yet.
448 // On client-side, this is all streams whose headers were sent out.
449 // On server-side, this is all streams whose headers were received.
450 estdStreams map[uint32]*outStream // Established streams.
451 // activeStreams is a linked-list of all streams that have data to send and some
452 // stream-level flow control quota.
453 // Each of these streams internally have a list of data items(and perhaps trailers
454 // on the server-side) to be sent out.
455 activeStreams *outStreamList
456 framer *framer
457 hBuf *bytes.Buffer // The buffer for HPACK encoding.
458 hEnc *hpack.Encoder // HPACK encoder.
459 bdpEst *bdpEstimator
460 draining bool
461
462 // Side-specific handlers
463 ssGoAwayHandler func(*goAway) (bool, error)
464}
465
466func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator) *loopyWriter {
467 var buf bytes.Buffer
468 l := &loopyWriter{
469 side: s,
470 cbuf: cbuf,
471 sendQuota: defaultWindowSize,
472 oiws: defaultWindowSize,
473 estdStreams: make(map[uint32]*outStream),
474 activeStreams: newOutStreamList(),
475 framer: fr,
476 hBuf: &buf,
477 hEnc: hpack.NewEncoder(&buf),
478 bdpEst: bdpEst,
479 }
480 return l
481}
482
483const minBatchSize = 1000
484
485// run should be run in a separate goroutine.
486// It reads control frames from controlBuf and processes them by:
487// 1. Updating loopy's internal state, or/and
488// 2. Writing out HTTP2 frames on the wire.
489//
490// Loopy keeps all active streams with data to send in a linked-list.
491// All streams in the activeStreams linked-list must have both:
492// 1. Data to send, and
493// 2. Stream level flow control quota available.
494//
495// In each iteration of run loop, other than processing the incoming control
496// frame, loopy calls processData, which processes one node from the activeStreams linked-list.
497// This results in writing of HTTP2 frames into an underlying write buffer.
498// When there's no more control frames to read from controlBuf, loopy flushes the write buffer.
499// As an optimization, to increase the batch size for each flush, loopy yields the processor, once
500// if the batch size is too low to give stream goroutines a chance to fill it up.
501func (l *loopyWriter) run() (err error) {
502 defer func() {
503 if err == ErrConnClosing {
504 // Don't log ErrConnClosing as error since it happens
505 // 1. When the connection is closed by some other known issue.
506 // 2. User closed the connection.
507 // 3. A graceful close of connection.
508 infof("transport: loopyWriter.run returning. %v", err)
509 err = nil
510 }
511 }()
512 for {
513 it, err := l.cbuf.get(true)
514 if err != nil {
515 return err
516 }
517 if err = l.handle(it); err != nil {
518 return err
519 }
520 if _, err = l.processData(); err != nil {
521 return err
522 }
523 gosched := true
524 hasdata:
525 for {
526 it, err := l.cbuf.get(false)
527 if err != nil {
528 return err
529 }
530 if it != nil {
531 if err = l.handle(it); err != nil {
532 return err
533 }
534 if _, err = l.processData(); err != nil {
535 return err
536 }
537 continue hasdata
538 }
539 isEmpty, err := l.processData()
540 if err != nil {
541 return err
542 }
543 if !isEmpty {
544 continue hasdata
545 }
546 if gosched {
547 gosched = false
548 if l.framer.writer.offset < minBatchSize {
549 runtime.Gosched()
550 continue hasdata
551 }
552 }
553 l.framer.writer.Flush()
554 break hasdata
555
556 }
557 }
558}
559
560func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error {
561 return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment)
562}
563
564func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) error {
565 // Otherwise update the quota.
566 if w.streamID == 0 {
567 l.sendQuota += w.increment
568 return nil
569 }
570 // Find the stream and update it.
571 if str, ok := l.estdStreams[w.streamID]; ok {
572 str.bytesOutStanding -= int(w.increment)
573 if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {
574 str.state = active
575 l.activeStreams.enqueue(str)
576 return nil
577 }
578 }
579 return nil
580}
581
582func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
583 return l.framer.fr.WriteSettings(s.ss...)
584}
585
586func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
587 if err := l.applySettings(s.ss); err != nil {
588 return err
589 }
590 return l.framer.fr.WriteSettingsAck()
591}
592
593func (l *loopyWriter) registerStreamHandler(h *registerStream) error {
594 str := &outStream{
595 id: h.streamID,
596 state: empty,
597 itl: &itemList{},
598 wq: h.wq,
599 }
600 l.estdStreams[h.streamID] = str
601 return nil
602}
603
604func (l *loopyWriter) headerHandler(h *headerFrame) error {
605 if l.side == serverSide {
606 str, ok := l.estdStreams[h.streamID]
607 if !ok {
608 warningf("transport: loopy doesn't recognize the stream: %d", h.streamID)
609 return nil
610 }
611 // Case 1.A: Server is responding back with headers.
612 if !h.endStream {
613 return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
614 }
615 // else: Case 1.B: Server wants to close stream.
616
617 if str.state != empty { // either active or waiting on stream quota.
618 // add it str's list of items.
619 str.itl.enqueue(h)
620 return nil
621 }
622 if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
623 return err
624 }
625 return l.cleanupStreamHandler(h.cleanup)
626 }
627 // Case 2: Client wants to originate stream.
628 str := &outStream{
629 id: h.streamID,
630 state: empty,
631 itl: &itemList{},
632 wq: h.wq,
633 }
634 str.itl.enqueue(h)
635 return l.originateStream(str)
636}
637
638func (l *loopyWriter) originateStream(str *outStream) error {
639 hdr := str.itl.dequeue().(*headerFrame)
Don Newtone0d34a82019-11-14 10:58:06 -0500640 if err := hdr.initStream(str.id); err != nil {
Don Newton98fd8812019-09-23 15:15:02 -0400641 if err == ErrConnClosing {
642 return err
643 }
644 // Other errors(errStreamDrain) need not close transport.
645 return nil
646 }
Don Newtone0d34a82019-11-14 10:58:06 -0500647 if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
Don Newton98fd8812019-09-23 15:15:02 -0400648 return err
649 }
650 l.estdStreams[str.id] = str
Don Newton98fd8812019-09-23 15:15:02 -0400651 return nil
652}
653
654func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error {
655 if onWrite != nil {
656 onWrite()
657 }
658 l.hBuf.Reset()
659 for _, f := range hf {
660 if err := l.hEnc.WriteField(f); err != nil {
661 warningf("transport: loopyWriter.writeHeader encountered error while encoding headers:", err)
662 }
663 }
664 var (
665 err error
666 endHeaders, first bool
667 )
668 first = true
669 for !endHeaders {
670 size := l.hBuf.Len()
671 if size > http2MaxFrameLen {
672 size = http2MaxFrameLen
673 } else {
674 endHeaders = true
675 }
676 if first {
677 first = false
678 err = l.framer.fr.WriteHeaders(http2.HeadersFrameParam{
679 StreamID: streamID,
680 BlockFragment: l.hBuf.Next(size),
681 EndStream: endStream,
682 EndHeaders: endHeaders,
683 })
684 } else {
685 err = l.framer.fr.WriteContinuation(
686 streamID,
687 endHeaders,
688 l.hBuf.Next(size),
689 )
690 }
691 if err != nil {
692 return err
693 }
694 }
695 return nil
696}
697
698func (l *loopyWriter) preprocessData(df *dataFrame) error {
699 str, ok := l.estdStreams[df.streamID]
700 if !ok {
701 return nil
702 }
703 // If we got data for a stream it means that
704 // stream was originated and the headers were sent out.
705 str.itl.enqueue(df)
706 if str.state == empty {
707 str.state = active
708 l.activeStreams.enqueue(str)
709 }
710 return nil
711}
712
713func (l *loopyWriter) pingHandler(p *ping) error {
714 if !p.ack {
715 l.bdpEst.timesnap(p.data)
716 }
717 return l.framer.fr.WritePing(p.ack, p.data)
718
719}
720
721func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) error {
722 o.resp <- l.sendQuota
723 return nil
724}
725
726func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
727 c.onWrite()
728 if str, ok := l.estdStreams[c.streamID]; ok {
729 // On the server side it could be a trailers-only response or
730 // a RST_STREAM before stream initialization thus the stream might
731 // not be established yet.
732 delete(l.estdStreams, c.streamID)
733 str.deleteSelf()
734 }
735 if c.rst { // If RST_STREAM needs to be sent.
736 if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil {
737 return err
738 }
739 }
740 if l.side == clientSide && l.draining && len(l.estdStreams) == 0 {
741 return ErrConnClosing
742 }
743 return nil
744}
745
746func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
747 if l.side == clientSide {
748 l.draining = true
749 if len(l.estdStreams) == 0 {
750 return ErrConnClosing
751 }
752 }
753 return nil
754}
755
756func (l *loopyWriter) goAwayHandler(g *goAway) error {
757 // Handling of outgoing GoAway is very specific to side.
758 if l.ssGoAwayHandler != nil {
759 draining, err := l.ssGoAwayHandler(g)
760 if err != nil {
761 return err
762 }
763 l.draining = draining
764 }
765 return nil
766}
767
768func (l *loopyWriter) handle(i interface{}) error {
769 switch i := i.(type) {
770 case *incomingWindowUpdate:
771 return l.incomingWindowUpdateHandler(i)
772 case *outgoingWindowUpdate:
773 return l.outgoingWindowUpdateHandler(i)
774 case *incomingSettings:
775 return l.incomingSettingsHandler(i)
776 case *outgoingSettings:
777 return l.outgoingSettingsHandler(i)
778 case *headerFrame:
779 return l.headerHandler(i)
780 case *registerStream:
781 return l.registerStreamHandler(i)
782 case *cleanupStream:
783 return l.cleanupStreamHandler(i)
784 case *incomingGoAway:
785 return l.incomingGoAwayHandler(i)
786 case *dataFrame:
787 return l.preprocessData(i)
788 case *ping:
789 return l.pingHandler(i)
790 case *goAway:
791 return l.goAwayHandler(i)
792 case *outFlowControlSizeRequest:
793 return l.outFlowControlSizeRequestHandler(i)
794 default:
795 return fmt.Errorf("transport: unknown control message type %T", i)
796 }
797}
798
799func (l *loopyWriter) applySettings(ss []http2.Setting) error {
800 for _, s := range ss {
801 switch s.ID {
802 case http2.SettingInitialWindowSize:
803 o := l.oiws
804 l.oiws = s.Val
805 if o < l.oiws {
806 // If the new limit is greater make all depleted streams active.
807 for _, stream := range l.estdStreams {
808 if stream.state == waitingOnStreamQuota {
809 stream.state = active
810 l.activeStreams.enqueue(stream)
811 }
812 }
813 }
814 case http2.SettingHeaderTableSize:
815 updateHeaderTblSize(l.hEnc, s.Val)
816 }
817 }
818 return nil
819}
820
821// processData removes the first stream from active streams, writes out at most 16KB
822// of its data and then puts it at the end of activeStreams if there's still more data
823// to be sent and stream has some stream-level flow control.
824func (l *loopyWriter) processData() (bool, error) {
825 if l.sendQuota == 0 {
826 return true, nil
827 }
828 str := l.activeStreams.dequeue() // Remove the first stream.
829 if str == nil {
830 return true, nil
831 }
832 dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
833 // A data item is represented by a dataFrame, since it later translates into
834 // multiple HTTP2 data frames.
835 // Every dataFrame has two buffers; h that keeps grpc-message header and d that is acutal data.
836 // As an optimization to keep wire traffic low, data from d is copied to h to make as big as the
837 // maximum possilbe HTTP2 frame size.
838
839 if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
840 // Client sends out empty data frame with endStream = true
841 if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
842 return false, err
843 }
844 str.itl.dequeue() // remove the empty data item from stream
845 if str.itl.isEmpty() {
846 str.state = empty
847 } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
848 if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
849 return false, err
850 }
851 if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
852 return false, nil
853 }
854 } else {
855 l.activeStreams.enqueue(str)
856 }
857 return false, nil
858 }
859 var (
860 idx int
861 buf []byte
862 )
863 if len(dataItem.h) != 0 { // data header has not been written out yet.
864 buf = dataItem.h
865 } else {
866 idx = 1
867 buf = dataItem.d
868 }
869 size := http2MaxFrameLen
870 if len(buf) < size {
871 size = len(buf)
872 }
873 if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.
874 str.state = waitingOnStreamQuota
875 return false, nil
876 } else if strQuota < size {
877 size = strQuota
878 }
879
880 if l.sendQuota < uint32(size) { // connection-level flow control.
881 size = int(l.sendQuota)
882 }
883 // Now that outgoing flow controls are checked we can replenish str's write quota
884 str.wq.replenish(size)
885 var endStream bool
886 // If this is the last data message on this stream and all of it can be written in this iteration.
887 if dataItem.endStream && size == len(buf) {
888 // buf contains either data or it contains header but data is empty.
889 if idx == 1 || len(dataItem.d) == 0 {
890 endStream = true
891 }
892 }
893 if dataItem.onEachWrite != nil {
894 dataItem.onEachWrite()
895 }
896 if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil {
897 return false, err
898 }
899 buf = buf[size:]
900 str.bytesOutStanding += size
901 l.sendQuota -= uint32(size)
902 if idx == 0 {
903 dataItem.h = buf
904 } else {
905 dataItem.d = buf
906 }
907
908 if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out.
909 str.itl.dequeue()
910 }
911 if str.itl.isEmpty() {
912 str.state = empty
913 } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers.
914 if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
915 return false, err
916 }
917 if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
918 return false, err
919 }
920 } else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota.
921 str.state = waitingOnStreamQuota
922 } else { // Otherwise add it back to the list of active streams.
923 l.activeStreams.enqueue(str)
924 }
925 return false, nil
926}