blob: 45532f8aeaab4def96bc8e44cd6a08ce7c397fcf [file] [log] [blame]
khenaidoo5fc5cea2021-08-11 17:39:16 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "bytes"
23 "errors"
24 "fmt"
25 "runtime"
26 "strconv"
27 "sync"
28 "sync/atomic"
29
30 "golang.org/x/net/http2"
31 "golang.org/x/net/http2/hpack"
32 "google.golang.org/grpc/internal/grpcutil"
33 "google.golang.org/grpc/status"
34)
35
36var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
37 e.SetMaxDynamicTableSizeLimit(v)
38}
39
40type itemNode struct {
41 it interface{}
42 next *itemNode
43}
44
45type itemList struct {
46 head *itemNode
47 tail *itemNode
48}
49
50func (il *itemList) enqueue(i interface{}) {
51 n := &itemNode{it: i}
52 if il.tail == nil {
53 il.head, il.tail = n, n
54 return
55 }
56 il.tail.next = n
57 il.tail = n
58}
59
60// peek returns the first item in the list without removing it from the
61// list.
62func (il *itemList) peek() interface{} {
63 return il.head.it
64}
65
66func (il *itemList) dequeue() interface{} {
67 if il.head == nil {
68 return nil
69 }
70 i := il.head.it
71 il.head = il.head.next
72 if il.head == nil {
73 il.tail = nil
74 }
75 return i
76}
77
78func (il *itemList) dequeueAll() *itemNode {
79 h := il.head
80 il.head, il.tail = nil, nil
81 return h
82}
83
84func (il *itemList) isEmpty() bool {
85 return il.head == nil
86}
87
88// The following defines various control items which could flow through
89// the control buffer of transport. They represent different aspects of
90// control tasks, e.g., flow control, settings, streaming resetting, etc.
91
92// maxQueuedTransportResponseFrames is the most queued "transport response"
93// frames we will buffer before preventing new reads from occurring on the
94// transport. These are control frames sent in response to client requests,
95// such as RST_STREAM due to bad headers or settings acks.
96const maxQueuedTransportResponseFrames = 50
97
98type cbItem interface {
99 isTransportResponseFrame() bool
100}
101
102// registerStream is used to register an incoming stream with loopy writer.
103type registerStream struct {
104 streamID uint32
105 wq *writeQuota
106}
107
108func (*registerStream) isTransportResponseFrame() bool { return false }
109
110// headerFrame is also used to register stream on the client-side.
111type headerFrame struct {
112 streamID uint32
113 hf []hpack.HeaderField
114 endStream bool // Valid on server side.
115 initStream func(uint32) error // Used only on the client side.
116 onWrite func()
117 wq *writeQuota // write quota for the stream created.
118 cleanup *cleanupStream // Valid on the server side.
119 onOrphaned func(error) // Valid on client-side
120}
121
122func (h *headerFrame) isTransportResponseFrame() bool {
123 return h.cleanup != nil && h.cleanup.rst // Results in a RST_STREAM
124}
125
126type cleanupStream struct {
127 streamID uint32
128 rst bool
129 rstCode http2.ErrCode
130 onWrite func()
131}
132
133func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
134
135type earlyAbortStream struct {
136 streamID uint32
137 contentSubtype string
138 status *status.Status
139}
140
141func (*earlyAbortStream) isTransportResponseFrame() bool { return false }
142
143type dataFrame struct {
144 streamID uint32
145 endStream bool
146 h []byte
147 d []byte
148 // onEachWrite is called every time
149 // a part of d is written out.
150 onEachWrite func()
151}
152
153func (*dataFrame) isTransportResponseFrame() bool { return false }
154
155type incomingWindowUpdate struct {
156 streamID uint32
157 increment uint32
158}
159
160func (*incomingWindowUpdate) isTransportResponseFrame() bool { return false }
161
162type outgoingWindowUpdate struct {
163 streamID uint32
164 increment uint32
165}
166
167func (*outgoingWindowUpdate) isTransportResponseFrame() bool {
168 return false // window updates are throttled by thresholds
169}
170
171type incomingSettings struct {
172 ss []http2.Setting
173}
174
175func (*incomingSettings) isTransportResponseFrame() bool { return true } // Results in a settings ACK
176
177type outgoingSettings struct {
178 ss []http2.Setting
179}
180
181func (*outgoingSettings) isTransportResponseFrame() bool { return false }
182
183type incomingGoAway struct {
184}
185
186func (*incomingGoAway) isTransportResponseFrame() bool { return false }
187
188type goAway struct {
189 code http2.ErrCode
190 debugData []byte
191 headsUp bool
192 closeConn bool
193}
194
195func (*goAway) isTransportResponseFrame() bool { return false }
196
197type ping struct {
198 ack bool
199 data [8]byte
200}
201
202func (*ping) isTransportResponseFrame() bool { return true }
203
204type outFlowControlSizeRequest struct {
205 resp chan uint32
206}
207
208func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }
209
210type outStreamState int
211
212const (
213 active outStreamState = iota
214 empty
215 waitingOnStreamQuota
216)
217
218type outStream struct {
219 id uint32
220 state outStreamState
221 itl *itemList
222 bytesOutStanding int
223 wq *writeQuota
224
225 next *outStream
226 prev *outStream
227}
228
229func (s *outStream) deleteSelf() {
230 if s.prev != nil {
231 s.prev.next = s.next
232 }
233 if s.next != nil {
234 s.next.prev = s.prev
235 }
236 s.next, s.prev = nil, nil
237}
238
239type outStreamList struct {
240 // Following are sentinel objects that mark the
241 // beginning and end of the list. They do not
242 // contain any item lists. All valid objects are
243 // inserted in between them.
244 // This is needed so that an outStream object can
245 // deleteSelf() in O(1) time without knowing which
246 // list it belongs to.
247 head *outStream
248 tail *outStream
249}
250
251func newOutStreamList() *outStreamList {
252 head, tail := new(outStream), new(outStream)
253 head.next = tail
254 tail.prev = head
255 return &outStreamList{
256 head: head,
257 tail: tail,
258 }
259}
260
261func (l *outStreamList) enqueue(s *outStream) {
262 e := l.tail.prev
263 e.next = s
264 s.prev = e
265 s.next = l.tail
266 l.tail.prev = s
267}
268
269// remove from the beginning of the list.
270func (l *outStreamList) dequeue() *outStream {
271 b := l.head.next
272 if b == l.tail {
273 return nil
274 }
275 b.deleteSelf()
276 return b
277}
278
279// controlBuffer is a way to pass information to loopy.
280// Information is passed as specific struct types called control frames.
281// A control frame not only represents data, messages or headers to be sent out
282// but can also be used to instruct loopy to update its internal state.
283// It shouldn't be confused with an HTTP2 frame, although some of the control frames
284// like dataFrame and headerFrame do go out on wire as HTTP2 frames.
285type controlBuffer struct {
286 ch chan struct{}
287 done <-chan struct{}
288 mu sync.Mutex
289 consumerWaiting bool
290 list *itemList
291 err error
292
293 // transportResponseFrames counts the number of queued items that represent
294 // the response of an action initiated by the peer. trfChan is created
295 // when transportResponseFrames >= maxQueuedTransportResponseFrames and is
296 // closed and nilled when transportResponseFrames drops below the
297 // threshold. Both fields are protected by mu.
298 transportResponseFrames int
299 trfChan atomic.Value // chan struct{}
300}
301
302func newControlBuffer(done <-chan struct{}) *controlBuffer {
303 return &controlBuffer{
304 ch: make(chan struct{}, 1),
305 list: &itemList{},
306 done: done,
307 }
308}
309
310// throttle blocks if there are too many incomingSettings/cleanupStreams in the
311// controlbuf.
312func (c *controlBuffer) throttle() {
313 ch, _ := c.trfChan.Load().(chan struct{})
314 if ch != nil {
315 select {
316 case <-ch:
317 case <-c.done:
318 }
319 }
320}
321
322func (c *controlBuffer) put(it cbItem) error {
323 _, err := c.executeAndPut(nil, it)
324 return err
325}
326
327func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (bool, error) {
328 var wakeUp bool
329 c.mu.Lock()
330 if c.err != nil {
331 c.mu.Unlock()
332 return false, c.err
333 }
334 if f != nil {
335 if !f(it) { // f wasn't successful
336 c.mu.Unlock()
337 return false, nil
338 }
339 }
340 if c.consumerWaiting {
341 wakeUp = true
342 c.consumerWaiting = false
343 }
344 c.list.enqueue(it)
345 if it.isTransportResponseFrame() {
346 c.transportResponseFrames++
347 if c.transportResponseFrames == maxQueuedTransportResponseFrames {
348 // We are adding the frame that puts us over the threshold; create
349 // a throttling channel.
350 c.trfChan.Store(make(chan struct{}))
351 }
352 }
353 c.mu.Unlock()
354 if wakeUp {
355 select {
356 case c.ch <- struct{}{}:
357 default:
358 }
359 }
360 return true, nil
361}
362
363// Note argument f should never be nil.
364func (c *controlBuffer) execute(f func(it interface{}) bool, it interface{}) (bool, error) {
365 c.mu.Lock()
366 if c.err != nil {
367 c.mu.Unlock()
368 return false, c.err
369 }
370 if !f(it) { // f wasn't successful
371 c.mu.Unlock()
372 return false, nil
373 }
374 c.mu.Unlock()
375 return true, nil
376}
377
378func (c *controlBuffer) get(block bool) (interface{}, error) {
379 for {
380 c.mu.Lock()
381 if c.err != nil {
382 c.mu.Unlock()
383 return nil, c.err
384 }
385 if !c.list.isEmpty() {
386 h := c.list.dequeue().(cbItem)
387 if h.isTransportResponseFrame() {
388 if c.transportResponseFrames == maxQueuedTransportResponseFrames {
389 // We are removing the frame that put us over the
390 // threshold; close and clear the throttling channel.
391 ch := c.trfChan.Load().(chan struct{})
392 close(ch)
393 c.trfChan.Store((chan struct{})(nil))
394 }
395 c.transportResponseFrames--
396 }
397 c.mu.Unlock()
398 return h, nil
399 }
400 if !block {
401 c.mu.Unlock()
402 return nil, nil
403 }
404 c.consumerWaiting = true
405 c.mu.Unlock()
406 select {
407 case <-c.ch:
408 case <-c.done:
409 return nil, ErrConnClosing
410 }
411 }
412}
413
414func (c *controlBuffer) finish() {
415 c.mu.Lock()
416 if c.err != nil {
417 c.mu.Unlock()
418 return
419 }
420 c.err = ErrConnClosing
421 // There may be headers for streams in the control buffer.
422 // These streams need to be cleaned out since the transport
423 // is still not aware of these yet.
424 for head := c.list.dequeueAll(); head != nil; head = head.next {
425 hdr, ok := head.it.(*headerFrame)
426 if !ok {
427 continue
428 }
429 if hdr.onOrphaned != nil { // It will be nil on the server-side.
430 hdr.onOrphaned(ErrConnClosing)
431 }
432 }
433 // In case throttle() is currently in flight, it needs to be unblocked.
434 // Otherwise, the transport may not close, since the transport is closed by
435 // the reader encountering the connection error.
436 ch, _ := c.trfChan.Load().(chan struct{})
437 if ch != nil {
438 close(ch)
439 }
440 c.trfChan.Store((chan struct{})(nil))
441 c.mu.Unlock()
442}
443
444type side int
445
446const (
447 clientSide side = iota
448 serverSide
449)
450
451// Loopy receives frames from the control buffer.
452// Each frame is handled individually; most of the work done by loopy goes
453// into handling data frames. Loopy maintains a queue of active streams, and each
454// stream maintains a queue of data frames; as loopy receives data frames
455// it gets added to the queue of the relevant stream.
456// Loopy goes over this list of active streams by processing one node every iteration,
457// thereby closely resemebling to a round-robin scheduling over all streams. While
458// processing a stream, loopy writes out data bytes from this stream capped by the min
459// of http2MaxFrameLen, connection-level flow control and stream-level flow control.
460type loopyWriter struct {
461 side side
462 cbuf *controlBuffer
463 sendQuota uint32
464 oiws uint32 // outbound initial window size.
465 // estdStreams is map of all established streams that are not cleaned-up yet.
466 // On client-side, this is all streams whose headers were sent out.
467 // On server-side, this is all streams whose headers were received.
468 estdStreams map[uint32]*outStream // Established streams.
469 // activeStreams is a linked-list of all streams that have data to send and some
470 // stream-level flow control quota.
471 // Each of these streams internally have a list of data items(and perhaps trailers
472 // on the server-side) to be sent out.
473 activeStreams *outStreamList
474 framer *framer
475 hBuf *bytes.Buffer // The buffer for HPACK encoding.
476 hEnc *hpack.Encoder // HPACK encoder.
477 bdpEst *bdpEstimator
478 draining bool
479
480 // Side-specific handlers
481 ssGoAwayHandler func(*goAway) (bool, error)
482}
483
484func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator) *loopyWriter {
485 var buf bytes.Buffer
486 l := &loopyWriter{
487 side: s,
488 cbuf: cbuf,
489 sendQuota: defaultWindowSize,
490 oiws: defaultWindowSize,
491 estdStreams: make(map[uint32]*outStream),
492 activeStreams: newOutStreamList(),
493 framer: fr,
494 hBuf: &buf,
495 hEnc: hpack.NewEncoder(&buf),
496 bdpEst: bdpEst,
497 }
498 return l
499}
500
501const minBatchSize = 1000
502
503// run should be run in a separate goroutine.
504// It reads control frames from controlBuf and processes them by:
505// 1. Updating loopy's internal state, or/and
506// 2. Writing out HTTP2 frames on the wire.
507//
508// Loopy keeps all active streams with data to send in a linked-list.
509// All streams in the activeStreams linked-list must have both:
510// 1. Data to send, and
511// 2. Stream level flow control quota available.
512//
513// In each iteration of run loop, other than processing the incoming control
514// frame, loopy calls processData, which processes one node from the activeStreams linked-list.
515// This results in writing of HTTP2 frames into an underlying write buffer.
516// When there's no more control frames to read from controlBuf, loopy flushes the write buffer.
517// As an optimization, to increase the batch size for each flush, loopy yields the processor, once
518// if the batch size is too low to give stream goroutines a chance to fill it up.
519func (l *loopyWriter) run() (err error) {
520 defer func() {
521 if err == ErrConnClosing {
522 // Don't log ErrConnClosing as error since it happens
523 // 1. When the connection is closed by some other known issue.
524 // 2. User closed the connection.
525 // 3. A graceful close of connection.
526 if logger.V(logLevel) {
527 logger.Infof("transport: loopyWriter.run returning. %v", err)
528 }
529 err = nil
530 }
531 }()
532 for {
533 it, err := l.cbuf.get(true)
534 if err != nil {
535 return err
536 }
537 if err = l.handle(it); err != nil {
538 return err
539 }
540 if _, err = l.processData(); err != nil {
541 return err
542 }
543 gosched := true
544 hasdata:
545 for {
546 it, err := l.cbuf.get(false)
547 if err != nil {
548 return err
549 }
550 if it != nil {
551 if err = l.handle(it); err != nil {
552 return err
553 }
554 if _, err = l.processData(); err != nil {
555 return err
556 }
557 continue hasdata
558 }
559 isEmpty, err := l.processData()
560 if err != nil {
561 return err
562 }
563 if !isEmpty {
564 continue hasdata
565 }
566 if gosched {
567 gosched = false
568 if l.framer.writer.offset < minBatchSize {
569 runtime.Gosched()
570 continue hasdata
571 }
572 }
573 l.framer.writer.Flush()
574 break hasdata
575
576 }
577 }
578}
579
580func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error {
581 return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment)
582}
583
584func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) error {
585 // Otherwise update the quota.
586 if w.streamID == 0 {
587 l.sendQuota += w.increment
588 return nil
589 }
590 // Find the stream and update it.
591 if str, ok := l.estdStreams[w.streamID]; ok {
592 str.bytesOutStanding -= int(w.increment)
593 if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {
594 str.state = active
595 l.activeStreams.enqueue(str)
596 return nil
597 }
598 }
599 return nil
600}
601
602func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
603 return l.framer.fr.WriteSettings(s.ss...)
604}
605
606func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
607 if err := l.applySettings(s.ss); err != nil {
608 return err
609 }
610 return l.framer.fr.WriteSettingsAck()
611}
612
613func (l *loopyWriter) registerStreamHandler(h *registerStream) error {
614 str := &outStream{
615 id: h.streamID,
616 state: empty,
617 itl: &itemList{},
618 wq: h.wq,
619 }
620 l.estdStreams[h.streamID] = str
621 return nil
622}
623
624func (l *loopyWriter) headerHandler(h *headerFrame) error {
625 if l.side == serverSide {
626 str, ok := l.estdStreams[h.streamID]
627 if !ok {
628 if logger.V(logLevel) {
629 logger.Warningf("transport: loopy doesn't recognize the stream: %d", h.streamID)
630 }
631 return nil
632 }
633 // Case 1.A: Server is responding back with headers.
634 if !h.endStream {
635 return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
636 }
637 // else: Case 1.B: Server wants to close stream.
638
639 if str.state != empty { // either active or waiting on stream quota.
640 // add it str's list of items.
641 str.itl.enqueue(h)
642 return nil
643 }
644 if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
645 return err
646 }
647 return l.cleanupStreamHandler(h.cleanup)
648 }
649 // Case 2: Client wants to originate stream.
650 str := &outStream{
651 id: h.streamID,
652 state: empty,
653 itl: &itemList{},
654 wq: h.wq,
655 }
656 str.itl.enqueue(h)
657 return l.originateStream(str)
658}
659
660func (l *loopyWriter) originateStream(str *outStream) error {
661 hdr := str.itl.dequeue().(*headerFrame)
662 if err := hdr.initStream(str.id); err != nil {
663 if err == ErrConnClosing {
664 return err
665 }
666 // Other errors(errStreamDrain) need not close transport.
667 return nil
668 }
669 if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
670 return err
671 }
672 l.estdStreams[str.id] = str
673 return nil
674}
675
676func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error {
677 if onWrite != nil {
678 onWrite()
679 }
680 l.hBuf.Reset()
681 for _, f := range hf {
682 if err := l.hEnc.WriteField(f); err != nil {
683 if logger.V(logLevel) {
684 logger.Warningf("transport: loopyWriter.writeHeader encountered error while encoding headers: %v", err)
685 }
686 }
687 }
688 var (
689 err error
690 endHeaders, first bool
691 )
692 first = true
693 for !endHeaders {
694 size := l.hBuf.Len()
695 if size > http2MaxFrameLen {
696 size = http2MaxFrameLen
697 } else {
698 endHeaders = true
699 }
700 if first {
701 first = false
702 err = l.framer.fr.WriteHeaders(http2.HeadersFrameParam{
703 StreamID: streamID,
704 BlockFragment: l.hBuf.Next(size),
705 EndStream: endStream,
706 EndHeaders: endHeaders,
707 })
708 } else {
709 err = l.framer.fr.WriteContinuation(
710 streamID,
711 endHeaders,
712 l.hBuf.Next(size),
713 )
714 }
715 if err != nil {
716 return err
717 }
718 }
719 return nil
720}
721
722func (l *loopyWriter) preprocessData(df *dataFrame) error {
723 str, ok := l.estdStreams[df.streamID]
724 if !ok {
725 return nil
726 }
727 // If we got data for a stream it means that
728 // stream was originated and the headers were sent out.
729 str.itl.enqueue(df)
730 if str.state == empty {
731 str.state = active
732 l.activeStreams.enqueue(str)
733 }
734 return nil
735}
736
737func (l *loopyWriter) pingHandler(p *ping) error {
738 if !p.ack {
739 l.bdpEst.timesnap(p.data)
740 }
741 return l.framer.fr.WritePing(p.ack, p.data)
742
743}
744
745func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) error {
746 o.resp <- l.sendQuota
747 return nil
748}
749
750func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
751 c.onWrite()
752 if str, ok := l.estdStreams[c.streamID]; ok {
753 // On the server side it could be a trailers-only response or
754 // a RST_STREAM before stream initialization thus the stream might
755 // not be established yet.
756 delete(l.estdStreams, c.streamID)
757 str.deleteSelf()
758 }
759 if c.rst { // If RST_STREAM needs to be sent.
760 if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil {
761 return err
762 }
763 }
764 if l.side == clientSide && l.draining && len(l.estdStreams) == 0 {
765 return ErrConnClosing
766 }
767 return nil
768}
769
770func (l *loopyWriter) earlyAbortStreamHandler(eas *earlyAbortStream) error {
771 if l.side == clientSide {
772 return errors.New("earlyAbortStream not handled on client")
773 }
774
775 headerFields := []hpack.HeaderField{
776 {Name: ":status", Value: "200"},
777 {Name: "content-type", Value: grpcutil.ContentType(eas.contentSubtype)},
778 {Name: "grpc-status", Value: strconv.Itoa(int(eas.status.Code()))},
779 {Name: "grpc-message", Value: encodeGrpcMessage(eas.status.Message())},
780 }
781
782 if err := l.writeHeader(eas.streamID, true, headerFields, nil); err != nil {
783 return err
784 }
785 return nil
786}
787
788func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
789 if l.side == clientSide {
790 l.draining = true
791 if len(l.estdStreams) == 0 {
792 return ErrConnClosing
793 }
794 }
795 return nil
796}
797
798func (l *loopyWriter) goAwayHandler(g *goAway) error {
799 // Handling of outgoing GoAway is very specific to side.
800 if l.ssGoAwayHandler != nil {
801 draining, err := l.ssGoAwayHandler(g)
802 if err != nil {
803 return err
804 }
805 l.draining = draining
806 }
807 return nil
808}
809
810func (l *loopyWriter) handle(i interface{}) error {
811 switch i := i.(type) {
812 case *incomingWindowUpdate:
813 return l.incomingWindowUpdateHandler(i)
814 case *outgoingWindowUpdate:
815 return l.outgoingWindowUpdateHandler(i)
816 case *incomingSettings:
817 return l.incomingSettingsHandler(i)
818 case *outgoingSettings:
819 return l.outgoingSettingsHandler(i)
820 case *headerFrame:
821 return l.headerHandler(i)
822 case *registerStream:
823 return l.registerStreamHandler(i)
824 case *cleanupStream:
825 return l.cleanupStreamHandler(i)
826 case *earlyAbortStream:
827 return l.earlyAbortStreamHandler(i)
828 case *incomingGoAway:
829 return l.incomingGoAwayHandler(i)
830 case *dataFrame:
831 return l.preprocessData(i)
832 case *ping:
833 return l.pingHandler(i)
834 case *goAway:
835 return l.goAwayHandler(i)
836 case *outFlowControlSizeRequest:
837 return l.outFlowControlSizeRequestHandler(i)
838 default:
839 return fmt.Errorf("transport: unknown control message type %T", i)
840 }
841}
842
843func (l *loopyWriter) applySettings(ss []http2.Setting) error {
844 for _, s := range ss {
845 switch s.ID {
846 case http2.SettingInitialWindowSize:
847 o := l.oiws
848 l.oiws = s.Val
849 if o < l.oiws {
850 // If the new limit is greater make all depleted streams active.
851 for _, stream := range l.estdStreams {
852 if stream.state == waitingOnStreamQuota {
853 stream.state = active
854 l.activeStreams.enqueue(stream)
855 }
856 }
857 }
858 case http2.SettingHeaderTableSize:
859 updateHeaderTblSize(l.hEnc, s.Val)
860 }
861 }
862 return nil
863}
864
865// processData removes the first stream from active streams, writes out at most 16KB
866// of its data and then puts it at the end of activeStreams if there's still more data
867// to be sent and stream has some stream-level flow control.
868func (l *loopyWriter) processData() (bool, error) {
869 if l.sendQuota == 0 {
870 return true, nil
871 }
872 str := l.activeStreams.dequeue() // Remove the first stream.
873 if str == nil {
874 return true, nil
875 }
876 dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
877 // A data item is represented by a dataFrame, since it later translates into
878 // multiple HTTP2 data frames.
879 // Every dataFrame has two buffers; h that keeps grpc-message header and d that is acutal data.
880 // As an optimization to keep wire traffic low, data from d is copied to h to make as big as the
881 // maximum possilbe HTTP2 frame size.
882
883 if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
884 // Client sends out empty data frame with endStream = true
885 if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
886 return false, err
887 }
888 str.itl.dequeue() // remove the empty data item from stream
889 if str.itl.isEmpty() {
890 str.state = empty
891 } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
892 if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
893 return false, err
894 }
895 if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
896 return false, nil
897 }
898 } else {
899 l.activeStreams.enqueue(str)
900 }
901 return false, nil
902 }
903 var (
904 buf []byte
905 )
906 // Figure out the maximum size we can send
907 maxSize := http2MaxFrameLen
908 if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.
909 str.state = waitingOnStreamQuota
910 return false, nil
911 } else if maxSize > strQuota {
912 maxSize = strQuota
913 }
914 if maxSize > int(l.sendQuota) { // connection-level flow control.
915 maxSize = int(l.sendQuota)
916 }
917 // Compute how much of the header and data we can send within quota and max frame length
918 hSize := min(maxSize, len(dataItem.h))
919 dSize := min(maxSize-hSize, len(dataItem.d))
920 if hSize != 0 {
921 if dSize == 0 {
922 buf = dataItem.h
923 } else {
924 // We can add some data to grpc message header to distribute bytes more equally across frames.
925 // Copy on the stack to avoid generating garbage
926 var localBuf [http2MaxFrameLen]byte
927 copy(localBuf[:hSize], dataItem.h)
928 copy(localBuf[hSize:], dataItem.d[:dSize])
929 buf = localBuf[:hSize+dSize]
930 }
931 } else {
932 buf = dataItem.d
933 }
934
935 size := hSize + dSize
936
937 // Now that outgoing flow controls are checked we can replenish str's write quota
938 str.wq.replenish(size)
939 var endStream bool
940 // If this is the last data message on this stream and all of it can be written in this iteration.
941 if dataItem.endStream && len(dataItem.h)+len(dataItem.d) <= size {
942 endStream = true
943 }
944 if dataItem.onEachWrite != nil {
945 dataItem.onEachWrite()
946 }
947 if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil {
948 return false, err
949 }
950 str.bytesOutStanding += size
951 l.sendQuota -= uint32(size)
952 dataItem.h = dataItem.h[hSize:]
953 dataItem.d = dataItem.d[dSize:]
954
955 if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out.
956 str.itl.dequeue()
957 }
958 if str.itl.isEmpty() {
959 str.state = empty
960 } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers.
961 if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
962 return false, err
963 }
964 if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
965 return false, err
966 }
967 } else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota.
968 str.state = waitingOnStreamQuota
969 } else { // Otherwise add it back to the list of active streams.
970 l.activeStreams.enqueue(str)
971 }
972 return false, nil
973}
974
975func min(a, b int) int {
976 if a < b {
977 return a
978 }
979 return b
980}