blob: 8f5f3349d906300ca04c78e44f74de1d2f0accf1 [file] [log] [blame]
Don Newton98fd8812019-09-23 15:15:02 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "bufio"
23 "bytes"
24 "encoding/base64"
25 "fmt"
26 "io"
27 "math"
28 "net"
29 "net/http"
30 "strconv"
31 "strings"
32 "time"
33 "unicode/utf8"
34
35 "github.com/golang/protobuf/proto"
36 "golang.org/x/net/http2"
37 "golang.org/x/net/http2/hpack"
38 spb "google.golang.org/genproto/googleapis/rpc/status"
39 "google.golang.org/grpc/codes"
40 "google.golang.org/grpc/status"
41)
42
43const (
44 // http2MaxFrameLen specifies the max length of a HTTP2 frame.
45 http2MaxFrameLen = 16384 // 16KB frame
46 // http://http2.github.io/http2-spec/#SettingValues
47 http2InitHeaderTableSize = 4096
48 // baseContentType is the base content-type for gRPC. This is a valid
49 // content-type on it's own, but can also include a content-subtype such as
50 // "proto" as a suffix after "+" or ";". See
51 // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
52 // for more details.
53 baseContentType = "application/grpc"
54)
55
56var (
57 clientPreface = []byte(http2.ClientPreface)
58 http2ErrConvTab = map[http2.ErrCode]codes.Code{
59 http2.ErrCodeNo: codes.Internal,
60 http2.ErrCodeProtocol: codes.Internal,
61 http2.ErrCodeInternal: codes.Internal,
62 http2.ErrCodeFlowControl: codes.ResourceExhausted,
63 http2.ErrCodeSettingsTimeout: codes.Internal,
64 http2.ErrCodeStreamClosed: codes.Internal,
65 http2.ErrCodeFrameSize: codes.Internal,
66 http2.ErrCodeRefusedStream: codes.Unavailable,
67 http2.ErrCodeCancel: codes.Canceled,
68 http2.ErrCodeCompression: codes.Internal,
69 http2.ErrCodeConnect: codes.Internal,
70 http2.ErrCodeEnhanceYourCalm: codes.ResourceExhausted,
71 http2.ErrCodeInadequateSecurity: codes.PermissionDenied,
72 http2.ErrCodeHTTP11Required: codes.Internal,
73 }
74 statusCodeConvTab = map[codes.Code]http2.ErrCode{
75 codes.Internal: http2.ErrCodeInternal,
76 codes.Canceled: http2.ErrCodeCancel,
77 codes.Unavailable: http2.ErrCodeRefusedStream,
78 codes.ResourceExhausted: http2.ErrCodeEnhanceYourCalm,
79 codes.PermissionDenied: http2.ErrCodeInadequateSecurity,
80 }
81 // HTTPStatusConvTab is the HTTP status code to gRPC error code conversion table.
82 HTTPStatusConvTab = map[int]codes.Code{
83 // 400 Bad Request - INTERNAL.
84 http.StatusBadRequest: codes.Internal,
85 // 401 Unauthorized - UNAUTHENTICATED.
86 http.StatusUnauthorized: codes.Unauthenticated,
87 // 403 Forbidden - PERMISSION_DENIED.
88 http.StatusForbidden: codes.PermissionDenied,
89 // 404 Not Found - UNIMPLEMENTED.
90 http.StatusNotFound: codes.Unimplemented,
91 // 429 Too Many Requests - UNAVAILABLE.
92 http.StatusTooManyRequests: codes.Unavailable,
93 // 502 Bad Gateway - UNAVAILABLE.
94 http.StatusBadGateway: codes.Unavailable,
95 // 503 Service Unavailable - UNAVAILABLE.
96 http.StatusServiceUnavailable: codes.Unavailable,
97 // 504 Gateway timeout - UNAVAILABLE.
98 http.StatusGatewayTimeout: codes.Unavailable,
99 }
100)
101
102type parsedHeaderData struct {
103 encoding string
104 // statusGen caches the stream status received from the trailer the server
105 // sent. Client side only. Do not access directly. After all trailers are
106 // parsed, use the status method to retrieve the status.
107 statusGen *status.Status
108 // rawStatusCode and rawStatusMsg are set from the raw trailer fields and are not
109 // intended for direct access outside of parsing.
110 rawStatusCode *int
111 rawStatusMsg string
112 httpStatus *int
113 // Server side only fields.
114 timeoutSet bool
115 timeout time.Duration
116 method string
117 // key-value metadata map from the peer.
118 mdata map[string][]string
119 statsTags []byte
120 statsTrace []byte
121 contentSubtype string
122
123 // isGRPC field indicates whether the peer is speaking gRPC (otherwise HTTP).
124 //
125 // We are in gRPC mode (peer speaking gRPC) if:
126 // * We are client side and have already received a HEADER frame that indicates gRPC peer.
127 // * The header contains valid a content-type, i.e. a string starts with "application/grpc"
128 // And we should handle error specific to gRPC.
129 //
130 // Otherwise (i.e. a content-type string starts without "application/grpc", or does not exist), we
131 // are in HTTP fallback mode, and should handle error specific to HTTP.
132 isGRPC bool
133 grpcErr error
134 httpErr error
135 contentTypeErr string
136}
137
138// decodeState configures decoding criteria and records the decoded data.
139type decodeState struct {
140 // whether decoding on server side or not
141 serverSide bool
142
143 // Records the states during HPACK decoding. It will be filled with info parsed from HTTP HEADERS
144 // frame once decodeHeader function has been invoked and returned.
145 data parsedHeaderData
146}
147
148// isReservedHeader checks whether hdr belongs to HTTP2 headers
149// reserved by gRPC protocol. Any other headers are classified as the
150// user-specified metadata.
151func isReservedHeader(hdr string) bool {
152 if hdr != "" && hdr[0] == ':' {
153 return true
154 }
155 switch hdr {
156 case "content-type",
157 "user-agent",
158 "grpc-message-type",
159 "grpc-encoding",
160 "grpc-message",
161 "grpc-status",
162 "grpc-timeout",
163 "grpc-status-details-bin",
164 // Intentionally exclude grpc-previous-rpc-attempts and
165 // grpc-retry-pushback-ms, which are "reserved", but their API
166 // intentionally works via metadata.
167 "te":
168 return true
169 default:
170 return false
171 }
172}
173
174// isWhitelistedHeader checks whether hdr should be propagated into metadata
175// visible to users, even though it is classified as "reserved", above.
176func isWhitelistedHeader(hdr string) bool {
177 switch hdr {
178 case ":authority", "user-agent":
179 return true
180 default:
181 return false
182 }
183}
184
185// contentSubtype returns the content-subtype for the given content-type. The
186// given content-type must be a valid content-type that starts with
187// "application/grpc". A content-subtype will follow "application/grpc" after a
188// "+" or ";". See
189// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
190// more details.
191//
192// If contentType is not a valid content-type for gRPC, the boolean
193// will be false, otherwise true. If content-type == "application/grpc",
194// "application/grpc+", or "application/grpc;", the boolean will be true,
195// but no content-subtype will be returned.
196//
197// contentType is assumed to be lowercase already.
198func contentSubtype(contentType string) (string, bool) {
199 if contentType == baseContentType {
200 return "", true
201 }
202 if !strings.HasPrefix(contentType, baseContentType) {
203 return "", false
204 }
205 // guaranteed since != baseContentType and has baseContentType prefix
206 switch contentType[len(baseContentType)] {
207 case '+', ';':
208 // this will return true for "application/grpc+" or "application/grpc;"
209 // which the previous validContentType function tested to be valid, so we
210 // just say that no content-subtype is specified in this case
211 return contentType[len(baseContentType)+1:], true
212 default:
213 return "", false
214 }
215}
216
217// contentSubtype is assumed to be lowercase
218func contentType(contentSubtype string) string {
219 if contentSubtype == "" {
220 return baseContentType
221 }
222 return baseContentType + "+" + contentSubtype
223}
224
225func (d *decodeState) status() *status.Status {
226 if d.data.statusGen == nil {
227 // No status-details were provided; generate status using code/msg.
228 d.data.statusGen = status.New(codes.Code(int32(*(d.data.rawStatusCode))), d.data.rawStatusMsg)
229 }
230 return d.data.statusGen
231}
232
233const binHdrSuffix = "-bin"
234
235func encodeBinHeader(v []byte) string {
236 return base64.RawStdEncoding.EncodeToString(v)
237}
238
239func decodeBinHeader(v string) ([]byte, error) {
240 if len(v)%4 == 0 {
241 // Input was padded, or padding was not necessary.
242 return base64.StdEncoding.DecodeString(v)
243 }
244 return base64.RawStdEncoding.DecodeString(v)
245}
246
247func encodeMetadataHeader(k, v string) string {
248 if strings.HasSuffix(k, binHdrSuffix) {
249 return encodeBinHeader(([]byte)(v))
250 }
251 return v
252}
253
254func decodeMetadataHeader(k, v string) (string, error) {
255 if strings.HasSuffix(k, binHdrSuffix) {
256 b, err := decodeBinHeader(v)
257 return string(b), err
258 }
259 return v, nil
260}
261
262func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) error {
263 // frame.Truncated is set to true when framer detects that the current header
264 // list size hits MaxHeaderListSize limit.
265 if frame.Truncated {
266 return status.Error(codes.Internal, "peer header list size exceeded limit")
267 }
268
269 for _, hf := range frame.Fields {
270 d.processHeaderField(hf)
271 }
272
273 if d.data.isGRPC {
274 if d.data.grpcErr != nil {
275 return d.data.grpcErr
276 }
277 if d.serverSide {
278 return nil
279 }
280 if d.data.rawStatusCode == nil && d.data.statusGen == nil {
281 // gRPC status doesn't exist.
282 // Set rawStatusCode to be unknown and return nil error.
283 // So that, if the stream has ended this Unknown status
284 // will be propagated to the user.
285 // Otherwise, it will be ignored. In which case, status from
286 // a later trailer, that has StreamEnded flag set, is propagated.
287 code := int(codes.Unknown)
288 d.data.rawStatusCode = &code
289 }
290 return nil
291 }
292
293 // HTTP fallback mode
294 if d.data.httpErr != nil {
295 return d.data.httpErr
296 }
297
298 var (
299 code = codes.Internal // when header does not include HTTP status, return INTERNAL
300 ok bool
301 )
302
303 if d.data.httpStatus != nil {
304 code, ok = HTTPStatusConvTab[*(d.data.httpStatus)]
305 if !ok {
306 code = codes.Unknown
307 }
308 }
309
310 return status.Error(code, d.constructHTTPErrMsg())
311}
312
313// constructErrMsg constructs error message to be returned in HTTP fallback mode.
314// Format: HTTP status code and its corresponding message + content-type error message.
315func (d *decodeState) constructHTTPErrMsg() string {
316 var errMsgs []string
317
318 if d.data.httpStatus == nil {
319 errMsgs = append(errMsgs, "malformed header: missing HTTP status")
320 } else {
321 errMsgs = append(errMsgs, fmt.Sprintf("%s: HTTP status code %d", http.StatusText(*(d.data.httpStatus)), *d.data.httpStatus))
322 }
323
324 if d.data.contentTypeErr == "" {
325 errMsgs = append(errMsgs, "transport: missing content-type field")
326 } else {
327 errMsgs = append(errMsgs, d.data.contentTypeErr)
328 }
329
330 return strings.Join(errMsgs, "; ")
331}
332
333func (d *decodeState) addMetadata(k, v string) {
334 if d.data.mdata == nil {
335 d.data.mdata = make(map[string][]string)
336 }
337 d.data.mdata[k] = append(d.data.mdata[k], v)
338}
339
340func (d *decodeState) processHeaderField(f hpack.HeaderField) {
341 switch f.Name {
342 case "content-type":
343 contentSubtype, validContentType := contentSubtype(f.Value)
344 if !validContentType {
345 d.data.contentTypeErr = fmt.Sprintf("transport: received the unexpected content-type %q", f.Value)
346 return
347 }
348 d.data.contentSubtype = contentSubtype
349 // TODO: do we want to propagate the whole content-type in the metadata,
350 // or come up with a way to just propagate the content-subtype if it was set?
351 // ie {"content-type": "application/grpc+proto"} or {"content-subtype": "proto"}
352 // in the metadata?
353 d.addMetadata(f.Name, f.Value)
354 d.data.isGRPC = true
355 case "grpc-encoding":
356 d.data.encoding = f.Value
357 case "grpc-status":
358 code, err := strconv.Atoi(f.Value)
359 if err != nil {
360 d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status: %v", err)
361 return
362 }
363 d.data.rawStatusCode = &code
364 case "grpc-message":
365 d.data.rawStatusMsg = decodeGrpcMessage(f.Value)
366 case "grpc-status-details-bin":
367 v, err := decodeBinHeader(f.Value)
368 if err != nil {
369 d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
370 return
371 }
372 s := &spb.Status{}
373 if err := proto.Unmarshal(v, s); err != nil {
374 d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
375 return
376 }
377 d.data.statusGen = status.FromProto(s)
378 case "grpc-timeout":
379 d.data.timeoutSet = true
380 var err error
381 if d.data.timeout, err = decodeTimeout(f.Value); err != nil {
382 d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed time-out: %v", err)
383 }
384 case ":path":
385 d.data.method = f.Value
386 case ":status":
387 code, err := strconv.Atoi(f.Value)
388 if err != nil {
389 d.data.httpErr = status.Errorf(codes.Internal, "transport: malformed http-status: %v", err)
390 return
391 }
392 d.data.httpStatus = &code
393 case "grpc-tags-bin":
394 v, err := decodeBinHeader(f.Value)
395 if err != nil {
396 d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-tags-bin: %v", err)
397 return
398 }
399 d.data.statsTags = v
400 d.addMetadata(f.Name, string(v))
401 case "grpc-trace-bin":
402 v, err := decodeBinHeader(f.Value)
403 if err != nil {
404 d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-trace-bin: %v", err)
405 return
406 }
407 d.data.statsTrace = v
408 d.addMetadata(f.Name, string(v))
409 default:
410 if isReservedHeader(f.Name) && !isWhitelistedHeader(f.Name) {
411 break
412 }
413 v, err := decodeMetadataHeader(f.Name, f.Value)
414 if err != nil {
415 errorf("Failed to decode metadata header (%q, %q): %v", f.Name, f.Value, err)
416 return
417 }
418 d.addMetadata(f.Name, v)
419 }
420}
421
422type timeoutUnit uint8
423
424const (
425 hour timeoutUnit = 'H'
426 minute timeoutUnit = 'M'
427 second timeoutUnit = 'S'
428 millisecond timeoutUnit = 'm'
429 microsecond timeoutUnit = 'u'
430 nanosecond timeoutUnit = 'n'
431)
432
433func timeoutUnitToDuration(u timeoutUnit) (d time.Duration, ok bool) {
434 switch u {
435 case hour:
436 return time.Hour, true
437 case minute:
438 return time.Minute, true
439 case second:
440 return time.Second, true
441 case millisecond:
442 return time.Millisecond, true
443 case microsecond:
444 return time.Microsecond, true
445 case nanosecond:
446 return time.Nanosecond, true
447 default:
448 }
449 return
450}
451
452const maxTimeoutValue int64 = 100000000 - 1
453
454// div does integer division and round-up the result. Note that this is
455// equivalent to (d+r-1)/r but has less chance to overflow.
456func div(d, r time.Duration) int64 {
457 if m := d % r; m > 0 {
458 return int64(d/r + 1)
459 }
460 return int64(d / r)
461}
462
463// TODO(zhaoq): It is the simplistic and not bandwidth efficient. Improve it.
464func encodeTimeout(t time.Duration) string {
465 if t <= 0 {
466 return "0n"
467 }
468 if d := div(t, time.Nanosecond); d <= maxTimeoutValue {
469 return strconv.FormatInt(d, 10) + "n"
470 }
471 if d := div(t, time.Microsecond); d <= maxTimeoutValue {
472 return strconv.FormatInt(d, 10) + "u"
473 }
474 if d := div(t, time.Millisecond); d <= maxTimeoutValue {
475 return strconv.FormatInt(d, 10) + "m"
476 }
477 if d := div(t, time.Second); d <= maxTimeoutValue {
478 return strconv.FormatInt(d, 10) + "S"
479 }
480 if d := div(t, time.Minute); d <= maxTimeoutValue {
481 return strconv.FormatInt(d, 10) + "M"
482 }
483 // Note that maxTimeoutValue * time.Hour > MaxInt64.
484 return strconv.FormatInt(div(t, time.Hour), 10) + "H"
485}
486
487func decodeTimeout(s string) (time.Duration, error) {
488 size := len(s)
489 if size < 2 {
490 return 0, fmt.Errorf("transport: timeout string is too short: %q", s)
491 }
492 if size > 9 {
493 // Spec allows for 8 digits plus the unit.
494 return 0, fmt.Errorf("transport: timeout string is too long: %q", s)
495 }
496 unit := timeoutUnit(s[size-1])
497 d, ok := timeoutUnitToDuration(unit)
498 if !ok {
499 return 0, fmt.Errorf("transport: timeout unit is not recognized: %q", s)
500 }
501 t, err := strconv.ParseInt(s[:size-1], 10, 64)
502 if err != nil {
503 return 0, err
504 }
505 const maxHours = math.MaxInt64 / int64(time.Hour)
506 if d == time.Hour && t > maxHours {
507 // This timeout would overflow math.MaxInt64; clamp it.
508 return time.Duration(math.MaxInt64), nil
509 }
510 return d * time.Duration(t), nil
511}
512
513const (
514 spaceByte = ' '
515 tildeByte = '~'
516 percentByte = '%'
517)
518
519// encodeGrpcMessage is used to encode status code in header field
520// "grpc-message". It does percent encoding and also replaces invalid utf-8
521// characters with Unicode replacement character.
522//
523// It checks to see if each individual byte in msg is an allowable byte, and
524// then either percent encoding or passing it through. When percent encoding,
525// the byte is converted into hexadecimal notation with a '%' prepended.
526func encodeGrpcMessage(msg string) string {
527 if msg == "" {
528 return ""
529 }
530 lenMsg := len(msg)
531 for i := 0; i < lenMsg; i++ {
532 c := msg[i]
533 if !(c >= spaceByte && c <= tildeByte && c != percentByte) {
534 return encodeGrpcMessageUnchecked(msg)
535 }
536 }
537 return msg
538}
539
540func encodeGrpcMessageUnchecked(msg string) string {
541 var buf bytes.Buffer
542 for len(msg) > 0 {
543 r, size := utf8.DecodeRuneInString(msg)
544 for _, b := range []byte(string(r)) {
545 if size > 1 {
546 // If size > 1, r is not ascii. Always do percent encoding.
547 buf.WriteString(fmt.Sprintf("%%%02X", b))
548 continue
549 }
550
551 // The for loop is necessary even if size == 1. r could be
552 // utf8.RuneError.
553 //
554 // fmt.Sprintf("%%%02X", utf8.RuneError) gives "%FFFD".
555 if b >= spaceByte && b <= tildeByte && b != percentByte {
556 buf.WriteByte(b)
557 } else {
558 buf.WriteString(fmt.Sprintf("%%%02X", b))
559 }
560 }
561 msg = msg[size:]
562 }
563 return buf.String()
564}
565
566// decodeGrpcMessage decodes the msg encoded by encodeGrpcMessage.
567func decodeGrpcMessage(msg string) string {
568 if msg == "" {
569 return ""
570 }
571 lenMsg := len(msg)
572 for i := 0; i < lenMsg; i++ {
573 if msg[i] == percentByte && i+2 < lenMsg {
574 return decodeGrpcMessageUnchecked(msg)
575 }
576 }
577 return msg
578}
579
580func decodeGrpcMessageUnchecked(msg string) string {
581 var buf bytes.Buffer
582 lenMsg := len(msg)
583 for i := 0; i < lenMsg; i++ {
584 c := msg[i]
585 if c == percentByte && i+2 < lenMsg {
586 parsed, err := strconv.ParseUint(msg[i+1:i+3], 16, 8)
587 if err != nil {
588 buf.WriteByte(c)
589 } else {
590 buf.WriteByte(byte(parsed))
591 i += 2
592 }
593 } else {
594 buf.WriteByte(c)
595 }
596 }
597 return buf.String()
598}
599
600type bufWriter struct {
601 buf []byte
602 offset int
603 batchSize int
604 conn net.Conn
605 err error
606
607 onFlush func()
608}
609
610func newBufWriter(conn net.Conn, batchSize int) *bufWriter {
611 return &bufWriter{
612 buf: make([]byte, batchSize*2),
613 batchSize: batchSize,
614 conn: conn,
615 }
616}
617
618func (w *bufWriter) Write(b []byte) (n int, err error) {
619 if w.err != nil {
620 return 0, w.err
621 }
622 if w.batchSize == 0 { // Buffer has been disabled.
623 return w.conn.Write(b)
624 }
625 for len(b) > 0 {
626 nn := copy(w.buf[w.offset:], b)
627 b = b[nn:]
628 w.offset += nn
629 n += nn
630 if w.offset >= w.batchSize {
631 err = w.Flush()
632 }
633 }
634 return n, err
635}
636
637func (w *bufWriter) Flush() error {
638 if w.err != nil {
639 return w.err
640 }
641 if w.offset == 0 {
642 return nil
643 }
644 if w.onFlush != nil {
645 w.onFlush()
646 }
647 _, w.err = w.conn.Write(w.buf[:w.offset])
648 w.offset = 0
649 return w.err
650}
651
652type framer struct {
653 writer *bufWriter
654 fr *http2.Framer
655}
656
657func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderListSize uint32) *framer {
658 if writeBufferSize < 0 {
659 writeBufferSize = 0
660 }
661 var r io.Reader = conn
662 if readBufferSize > 0 {
663 r = bufio.NewReaderSize(r, readBufferSize)
664 }
665 w := newBufWriter(conn, writeBufferSize)
666 f := &framer{
667 writer: w,
668 fr: http2.NewFramer(w, r),
669 }
670 f.fr.SetMaxReadFrameSize(http2MaxFrameLen)
671 // Opt-in to Frame reuse API on framer to reduce garbage.
672 // Frames aren't safe to read from after a subsequent call to ReadFrame.
673 f.fr.SetReuseFrames()
674 f.fr.MaxHeaderListSize = maxHeaderListSize
675 f.fr.ReadMetaHeaders = hpack.NewDecoder(http2InitHeaderTableSize, nil)
676 return f
677}