blob: 1c3459c2b4c5840f45404b1a7e77ddd1029f8e5a [file] [log] [blame]
khenaidoo5fc5cea2021-08-11 17:39:16 -04001/*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// This file is the implementation of a gRPC server using HTTP/2 which
20// uses the standard Go http2 Server implementation (via the
21// http.Handler interface), rather than speaking low-level HTTP/2
22// frames itself. It is the implementation of *grpc.Server.ServeHTTP.
23
24package transport
25
26import (
27 "bytes"
28 "context"
29 "errors"
30 "fmt"
31 "io"
32 "net"
33 "net/http"
34 "strings"
35 "sync"
36 "time"
37
38 "github.com/golang/protobuf/proto"
39 "golang.org/x/net/http2"
40 "google.golang.org/grpc/codes"
41 "google.golang.org/grpc/credentials"
42 "google.golang.org/grpc/internal/grpcutil"
43 "google.golang.org/grpc/metadata"
44 "google.golang.org/grpc/peer"
45 "google.golang.org/grpc/stats"
46 "google.golang.org/grpc/status"
47)
48
49// NewServerHandlerTransport returns a ServerTransport handling gRPC
50// from inside an http.Handler. It requires that the http Server
51// supports HTTP/2.
52func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats stats.Handler) (ServerTransport, error) {
53 if r.ProtoMajor != 2 {
54 return nil, errors.New("gRPC requires HTTP/2")
55 }
56 if r.Method != "POST" {
57 return nil, errors.New("invalid gRPC request method")
58 }
59 contentType := r.Header.Get("Content-Type")
60 // TODO: do we assume contentType is lowercase? we did before
61 contentSubtype, validContentType := grpcutil.ContentSubtype(contentType)
62 if !validContentType {
63 return nil, errors.New("invalid gRPC request content-type")
64 }
65 if _, ok := w.(http.Flusher); !ok {
66 return nil, errors.New("gRPC requires a ResponseWriter supporting http.Flusher")
67 }
68
69 st := &serverHandlerTransport{
70 rw: w,
71 req: r,
72 closedCh: make(chan struct{}),
73 writes: make(chan func()),
74 contentType: contentType,
75 contentSubtype: contentSubtype,
76 stats: stats,
77 }
78
79 if v := r.Header.Get("grpc-timeout"); v != "" {
80 to, err := decodeTimeout(v)
81 if err != nil {
82 return nil, status.Errorf(codes.Internal, "malformed time-out: %v", err)
83 }
84 st.timeoutSet = true
85 st.timeout = to
86 }
87
88 metakv := []string{"content-type", contentType}
89 if r.Host != "" {
90 metakv = append(metakv, ":authority", r.Host)
91 }
92 for k, vv := range r.Header {
93 k = strings.ToLower(k)
94 if isReservedHeader(k) && !isWhitelistedHeader(k) {
95 continue
96 }
97 for _, v := range vv {
98 v, err := decodeMetadataHeader(k, v)
99 if err != nil {
100 return nil, status.Errorf(codes.Internal, "malformed binary metadata: %v", err)
101 }
102 metakv = append(metakv, k, v)
103 }
104 }
105 st.headerMD = metadata.Pairs(metakv...)
106
107 return st, nil
108}
109
110// serverHandlerTransport is an implementation of ServerTransport
111// which replies to exactly one gRPC request (exactly one HTTP request),
112// using the net/http.Handler interface. This http.Handler is guaranteed
113// at this point to be speaking over HTTP/2, so it's able to speak valid
114// gRPC.
115type serverHandlerTransport struct {
116 rw http.ResponseWriter
117 req *http.Request
118 timeoutSet bool
119 timeout time.Duration
120
121 headerMD metadata.MD
122
123 closeOnce sync.Once
124 closedCh chan struct{} // closed on Close
125
126 // writes is a channel of code to run serialized in the
127 // ServeHTTP (HandleStreams) goroutine. The channel is closed
128 // when WriteStatus is called.
129 writes chan func()
130
131 // block concurrent WriteStatus calls
132 // e.g. grpc/(*serverStream).SendMsg/RecvMsg
133 writeStatusMu sync.Mutex
134
135 // we just mirror the request content-type
136 contentType string
137 // we store both contentType and contentSubtype so we don't keep recreating them
138 // TODO make sure this is consistent across handler_server and http2_server
139 contentSubtype string
140
141 stats stats.Handler
142}
143
144func (ht *serverHandlerTransport) Close() {
145 ht.closeOnce.Do(ht.closeCloseChanOnce)
146}
147
148func (ht *serverHandlerTransport) closeCloseChanOnce() { close(ht.closedCh) }
149
150func (ht *serverHandlerTransport) RemoteAddr() net.Addr { return strAddr(ht.req.RemoteAddr) }
151
152// strAddr is a net.Addr backed by either a TCP "ip:port" string, or
153// the empty string if unknown.
154type strAddr string
155
156func (a strAddr) Network() string {
157 if a != "" {
158 // Per the documentation on net/http.Request.RemoteAddr, if this is
159 // set, it's set to the IP:port of the peer (hence, TCP):
160 // https://golang.org/pkg/net/http/#Request
161 //
162 // If we want to support Unix sockets later, we can
163 // add our own grpc-specific convention within the
164 // grpc codebase to set RemoteAddr to a different
165 // format, or probably better: we can attach it to the
166 // context and use that from serverHandlerTransport.RemoteAddr.
167 return "tcp"
168 }
169 return ""
170}
171
172func (a strAddr) String() string { return string(a) }
173
174// do runs fn in the ServeHTTP goroutine.
175func (ht *serverHandlerTransport) do(fn func()) error {
176 select {
177 case <-ht.closedCh:
178 return ErrConnClosing
179 case ht.writes <- fn:
180 return nil
181 }
182}
183
184func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error {
185 ht.writeStatusMu.Lock()
186 defer ht.writeStatusMu.Unlock()
187
188 headersWritten := s.updateHeaderSent()
189 err := ht.do(func() {
190 if !headersWritten {
191 ht.writePendingHeaders(s)
192 }
193
194 // And flush, in case no header or body has been sent yet.
195 // This forces a separation of headers and trailers if this is the
196 // first call (for example, in end2end tests's TestNoService).
197 ht.rw.(http.Flusher).Flush()
198
199 h := ht.rw.Header()
200 h.Set("Grpc-Status", fmt.Sprintf("%d", st.Code()))
201 if m := st.Message(); m != "" {
202 h.Set("Grpc-Message", encodeGrpcMessage(m))
203 }
204
205 if p := st.Proto(); p != nil && len(p.Details) > 0 {
206 stBytes, err := proto.Marshal(p)
207 if err != nil {
208 // TODO: return error instead, when callers are able to handle it.
209 panic(err)
210 }
211
212 h.Set("Grpc-Status-Details-Bin", encodeBinHeader(stBytes))
213 }
214
215 if md := s.Trailer(); len(md) > 0 {
216 for k, vv := range md {
217 // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
218 if isReservedHeader(k) {
219 continue
220 }
221 for _, v := range vv {
222 // http2 ResponseWriter mechanism to send undeclared Trailers after
223 // the headers have possibly been written.
224 h.Add(http2.TrailerPrefix+k, encodeMetadataHeader(k, v))
225 }
226 }
227 }
228 })
229
230 if err == nil { // transport has not been closed
231 if ht.stats != nil {
232 // Note: The trailer fields are compressed with hpack after this call returns.
233 // No WireLength field is set here.
234 ht.stats.HandleRPC(s.Context(), &stats.OutTrailer{
235 Trailer: s.trailer.Copy(),
236 })
237 }
238 }
239 ht.Close()
240 return err
241}
242
243// writePendingHeaders sets common and custom headers on the first
244// write call (Write, WriteHeader, or WriteStatus)
245func (ht *serverHandlerTransport) writePendingHeaders(s *Stream) {
246 ht.writeCommonHeaders(s)
247 ht.writeCustomHeaders(s)
248}
249
250// writeCommonHeaders sets common headers on the first write
251// call (Write, WriteHeader, or WriteStatus).
252func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) {
253 h := ht.rw.Header()
254 h["Date"] = nil // suppress Date to make tests happy; TODO: restore
255 h.Set("Content-Type", ht.contentType)
256
257 // Predeclare trailers we'll set later in WriteStatus (after the body).
258 // This is a SHOULD in the HTTP RFC, and the way you add (known)
259 // Trailers per the net/http.ResponseWriter contract.
260 // See https://golang.org/pkg/net/http/#ResponseWriter
261 // and https://golang.org/pkg/net/http/#example_ResponseWriter_trailers
262 h.Add("Trailer", "Grpc-Status")
263 h.Add("Trailer", "Grpc-Message")
264 h.Add("Trailer", "Grpc-Status-Details-Bin")
265
266 if s.sendCompress != "" {
267 h.Set("Grpc-Encoding", s.sendCompress)
268 }
269}
270
271// writeCustomHeaders sets custom headers set on the stream via SetHeader
272// on the first write call (Write, WriteHeader, or WriteStatus).
273func (ht *serverHandlerTransport) writeCustomHeaders(s *Stream) {
274 h := ht.rw.Header()
275
276 s.hdrMu.Lock()
277 for k, vv := range s.header {
278 if isReservedHeader(k) {
279 continue
280 }
281 for _, v := range vv {
282 h.Add(k, encodeMetadataHeader(k, v))
283 }
284 }
285
286 s.hdrMu.Unlock()
287}
288
289func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
290 headersWritten := s.updateHeaderSent()
291 return ht.do(func() {
292 if !headersWritten {
293 ht.writePendingHeaders(s)
294 }
295 ht.rw.Write(hdr)
296 ht.rw.Write(data)
297 ht.rw.(http.Flusher).Flush()
298 })
299}
300
301func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
302 if err := s.SetHeader(md); err != nil {
303 return err
304 }
305
306 headersWritten := s.updateHeaderSent()
307 err := ht.do(func() {
308 if !headersWritten {
309 ht.writePendingHeaders(s)
310 }
311
312 ht.rw.WriteHeader(200)
313 ht.rw.(http.Flusher).Flush()
314 })
315
316 if err == nil {
317 if ht.stats != nil {
318 // Note: The header fields are compressed with hpack after this call returns.
319 // No WireLength field is set here.
320 ht.stats.HandleRPC(s.Context(), &stats.OutHeader{
321 Header: md.Copy(),
322 Compression: s.sendCompress,
323 })
324 }
325 }
326 return err
327}
328
329func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) {
330 // With this transport type there will be exactly 1 stream: this HTTP request.
331
332 ctx := ht.req.Context()
333 var cancel context.CancelFunc
334 if ht.timeoutSet {
335 ctx, cancel = context.WithTimeout(ctx, ht.timeout)
336 } else {
337 ctx, cancel = context.WithCancel(ctx)
338 }
339
340 // requestOver is closed when the status has been written via WriteStatus.
341 requestOver := make(chan struct{})
342 go func() {
343 select {
344 case <-requestOver:
345 case <-ht.closedCh:
346 case <-ht.req.Context().Done():
347 }
348 cancel()
349 ht.Close()
350 }()
351
352 req := ht.req
353
354 s := &Stream{
355 id: 0, // irrelevant
356 requestRead: func(int) {},
357 cancel: cancel,
358 buf: newRecvBuffer(),
359 st: ht,
360 method: req.URL.Path,
361 recvCompress: req.Header.Get("grpc-encoding"),
362 contentSubtype: ht.contentSubtype,
363 }
364 pr := &peer.Peer{
365 Addr: ht.RemoteAddr(),
366 }
367 if req.TLS != nil {
368 pr.AuthInfo = credentials.TLSInfo{State: *req.TLS, CommonAuthInfo: credentials.CommonAuthInfo{SecurityLevel: credentials.PrivacyAndIntegrity}}
369 }
370 ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
371 s.ctx = peer.NewContext(ctx, pr)
372 if ht.stats != nil {
373 s.ctx = ht.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
374 inHeader := &stats.InHeader{
375 FullMethod: s.method,
376 RemoteAddr: ht.RemoteAddr(),
377 Compression: s.recvCompress,
378 }
379 ht.stats.HandleRPC(s.ctx, inHeader)
380 }
381 s.trReader = &transportReader{
382 reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf, freeBuffer: func(*bytes.Buffer) {}},
383 windowHandler: func(int) {},
384 }
385
386 // readerDone is closed when the Body.Read-ing goroutine exits.
387 readerDone := make(chan struct{})
388 go func() {
389 defer close(readerDone)
390
391 // TODO: minimize garbage, optimize recvBuffer code/ownership
392 const readSize = 8196
393 for buf := make([]byte, readSize); ; {
394 n, err := req.Body.Read(buf)
395 if n > 0 {
396 s.buf.put(recvMsg{buffer: bytes.NewBuffer(buf[:n:n])})
397 buf = buf[n:]
398 }
399 if err != nil {
400 s.buf.put(recvMsg{err: mapRecvMsgError(err)})
401 return
402 }
403 if len(buf) == 0 {
404 buf = make([]byte, readSize)
405 }
406 }
407 }()
408
409 // startStream is provided by the *grpc.Server's serveStreams.
410 // It starts a goroutine serving s and exits immediately.
411 // The goroutine that is started is the one that then calls
412 // into ht, calling WriteHeader, Write, WriteStatus, Close, etc.
413 startStream(s)
414
415 ht.runStream()
416 close(requestOver)
417
418 // Wait for reading goroutine to finish.
419 req.Body.Close()
420 <-readerDone
421}
422
423func (ht *serverHandlerTransport) runStream() {
424 for {
425 select {
426 case fn := <-ht.writes:
427 fn()
428 case <-ht.closedCh:
429 return
430 }
431 }
432}
433
434func (ht *serverHandlerTransport) IncrMsgSent() {}
435
436func (ht *serverHandlerTransport) IncrMsgRecv() {}
437
438func (ht *serverHandlerTransport) Drain() {
439 panic("Drain() is not implemented")
440}
441
442// mapRecvMsgError returns the non-nil err into the appropriate
443// error value as expected by callers of *grpc.parser.recvMsg.
444// In particular, in can only be:
445// * io.EOF
446// * io.ErrUnexpectedEOF
447// * of type transport.ConnectionError
448// * an error from the status package
449func mapRecvMsgError(err error) error {
450 if err == io.EOF || err == io.ErrUnexpectedEOF {
451 return err
452 }
453 if se, ok := err.(http2.StreamError); ok {
454 if code, ok := http2ErrConvTab[se.Code]; ok {
455 return status.Error(code, se.Error())
456 }
457 }
458 if strings.Contains(err.Error(), "body closed by handler") {
459 return status.Error(codes.Canceled, err.Error())
460 }
461 return connectionErrorf(true, err, err.Error())
462}