blob: 98f80e3fa00aa4cbdba74ce591e25b932a54c217 [file] [log] [blame]
khenaidoo5fc5cea2021-08-11 17:39:16 -04001/*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// This file is the implementation of a gRPC server using HTTP/2 which
20// uses the standard Go http2 Server implementation (via the
21// http.Handler interface), rather than speaking low-level HTTP/2
22// frames itself. It is the implementation of *grpc.Server.ServeHTTP.
23
24package transport
25
26import (
27 "bytes"
28 "context"
29 "errors"
30 "fmt"
31 "io"
32 "net"
33 "net/http"
34 "strings"
35 "sync"
36 "time"
37
38 "github.com/golang/protobuf/proto"
39 "golang.org/x/net/http2"
40 "google.golang.org/grpc/codes"
41 "google.golang.org/grpc/credentials"
Akash Kankanala761955c2024-02-21 19:32:20 +053042 "google.golang.org/grpc/internal/grpclog"
khenaidoo5fc5cea2021-08-11 17:39:16 -040043 "google.golang.org/grpc/internal/grpcutil"
44 "google.golang.org/grpc/metadata"
45 "google.golang.org/grpc/peer"
46 "google.golang.org/grpc/stats"
47 "google.golang.org/grpc/status"
48)
49
Akash Kankanala761955c2024-02-21 19:32:20 +053050// NewServerHandlerTransport returns a ServerTransport handling gRPC from
51// inside an http.Handler, or writes an HTTP error to w and returns an error.
52// It requires that the http Server supports HTTP/2.
53func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []stats.Handler) (ServerTransport, error) {
khenaidoo5fc5cea2021-08-11 17:39:16 -040054 if r.ProtoMajor != 2 {
Akash Kankanala761955c2024-02-21 19:32:20 +053055 msg := "gRPC requires HTTP/2"
56 http.Error(w, msg, http.StatusBadRequest)
57 return nil, errors.New(msg)
khenaidoo5fc5cea2021-08-11 17:39:16 -040058 }
59 if r.Method != "POST" {
Akash Kankanala761955c2024-02-21 19:32:20 +053060 msg := fmt.Sprintf("invalid gRPC request method %q", r.Method)
61 http.Error(w, msg, http.StatusBadRequest)
62 return nil, errors.New(msg)
khenaidoo5fc5cea2021-08-11 17:39:16 -040063 }
64 contentType := r.Header.Get("Content-Type")
65 // TODO: do we assume contentType is lowercase? we did before
66 contentSubtype, validContentType := grpcutil.ContentSubtype(contentType)
67 if !validContentType {
Akash Kankanala761955c2024-02-21 19:32:20 +053068 msg := fmt.Sprintf("invalid gRPC request content-type %q", contentType)
69 http.Error(w, msg, http.StatusUnsupportedMediaType)
70 return nil, errors.New(msg)
khenaidoo5fc5cea2021-08-11 17:39:16 -040071 }
72 if _, ok := w.(http.Flusher); !ok {
Akash Kankanala761955c2024-02-21 19:32:20 +053073 msg := "gRPC requires a ResponseWriter supporting http.Flusher"
74 http.Error(w, msg, http.StatusInternalServerError)
75 return nil, errors.New(msg)
khenaidoo5fc5cea2021-08-11 17:39:16 -040076 }
77
78 st := &serverHandlerTransport{
79 rw: w,
80 req: r,
81 closedCh: make(chan struct{}),
82 writes: make(chan func()),
83 contentType: contentType,
84 contentSubtype: contentSubtype,
85 stats: stats,
86 }
Akash Kankanala761955c2024-02-21 19:32:20 +053087 st.logger = prefixLoggerForServerHandlerTransport(st)
khenaidoo5fc5cea2021-08-11 17:39:16 -040088
89 if v := r.Header.Get("grpc-timeout"); v != "" {
90 to, err := decodeTimeout(v)
91 if err != nil {
Akash Kankanala761955c2024-02-21 19:32:20 +053092 msg := fmt.Sprintf("malformed grpc-timeout: %v", err)
93 http.Error(w, msg, http.StatusBadRequest)
94 return nil, status.Error(codes.Internal, msg)
khenaidoo5fc5cea2021-08-11 17:39:16 -040095 }
96 st.timeoutSet = true
97 st.timeout = to
98 }
99
100 metakv := []string{"content-type", contentType}
101 if r.Host != "" {
102 metakv = append(metakv, ":authority", r.Host)
103 }
104 for k, vv := range r.Header {
105 k = strings.ToLower(k)
106 if isReservedHeader(k) && !isWhitelistedHeader(k) {
107 continue
108 }
109 for _, v := range vv {
110 v, err := decodeMetadataHeader(k, v)
111 if err != nil {
Akash Kankanala761955c2024-02-21 19:32:20 +0530112 msg := fmt.Sprintf("malformed binary metadata %q in header %q: %v", v, k, err)
113 http.Error(w, msg, http.StatusBadRequest)
114 return nil, status.Error(codes.Internal, msg)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400115 }
116 metakv = append(metakv, k, v)
117 }
118 }
119 st.headerMD = metadata.Pairs(metakv...)
120
121 return st, nil
122}
123
124// serverHandlerTransport is an implementation of ServerTransport
125// which replies to exactly one gRPC request (exactly one HTTP request),
126// using the net/http.Handler interface. This http.Handler is guaranteed
127// at this point to be speaking over HTTP/2, so it's able to speak valid
128// gRPC.
129type serverHandlerTransport struct {
130 rw http.ResponseWriter
131 req *http.Request
132 timeoutSet bool
133 timeout time.Duration
134
135 headerMD metadata.MD
136
137 closeOnce sync.Once
138 closedCh chan struct{} // closed on Close
139
140 // writes is a channel of code to run serialized in the
141 // ServeHTTP (HandleStreams) goroutine. The channel is closed
142 // when WriteStatus is called.
143 writes chan func()
144
145 // block concurrent WriteStatus calls
146 // e.g. grpc/(*serverStream).SendMsg/RecvMsg
147 writeStatusMu sync.Mutex
148
149 // we just mirror the request content-type
150 contentType string
151 // we store both contentType and contentSubtype so we don't keep recreating them
152 // TODO make sure this is consistent across handler_server and http2_server
153 contentSubtype string
154
Akash Kankanala761955c2024-02-21 19:32:20 +0530155 stats []stats.Handler
156 logger *grpclog.PrefixLogger
khenaidoo5fc5cea2021-08-11 17:39:16 -0400157}
158
Akash Kankanala761955c2024-02-21 19:32:20 +0530159func (ht *serverHandlerTransport) Close(err error) {
160 ht.closeOnce.Do(func() {
161 if ht.logger.V(logLevel) {
162 ht.logger.Infof("Closing: %v", err)
163 }
164 close(ht.closedCh)
165 })
khenaidoo5fc5cea2021-08-11 17:39:16 -0400166}
167
khenaidoo5fc5cea2021-08-11 17:39:16 -0400168func (ht *serverHandlerTransport) RemoteAddr() net.Addr { return strAddr(ht.req.RemoteAddr) }
169
170// strAddr is a net.Addr backed by either a TCP "ip:port" string, or
171// the empty string if unknown.
172type strAddr string
173
174func (a strAddr) Network() string {
175 if a != "" {
176 // Per the documentation on net/http.Request.RemoteAddr, if this is
177 // set, it's set to the IP:port of the peer (hence, TCP):
178 // https://golang.org/pkg/net/http/#Request
179 //
180 // If we want to support Unix sockets later, we can
181 // add our own grpc-specific convention within the
182 // grpc codebase to set RemoteAddr to a different
183 // format, or probably better: we can attach it to the
184 // context and use that from serverHandlerTransport.RemoteAddr.
185 return "tcp"
186 }
187 return ""
188}
189
190func (a strAddr) String() string { return string(a) }
191
192// do runs fn in the ServeHTTP goroutine.
193func (ht *serverHandlerTransport) do(fn func()) error {
194 select {
195 case <-ht.closedCh:
196 return ErrConnClosing
197 case ht.writes <- fn:
198 return nil
199 }
200}
201
202func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error {
203 ht.writeStatusMu.Lock()
204 defer ht.writeStatusMu.Unlock()
205
206 headersWritten := s.updateHeaderSent()
207 err := ht.do(func() {
208 if !headersWritten {
209 ht.writePendingHeaders(s)
210 }
211
212 // And flush, in case no header or body has been sent yet.
213 // This forces a separation of headers and trailers if this is the
214 // first call (for example, in end2end tests's TestNoService).
215 ht.rw.(http.Flusher).Flush()
216
217 h := ht.rw.Header()
218 h.Set("Grpc-Status", fmt.Sprintf("%d", st.Code()))
219 if m := st.Message(); m != "" {
220 h.Set("Grpc-Message", encodeGrpcMessage(m))
221 }
222
223 if p := st.Proto(); p != nil && len(p.Details) > 0 {
224 stBytes, err := proto.Marshal(p)
225 if err != nil {
226 // TODO: return error instead, when callers are able to handle it.
227 panic(err)
228 }
229
230 h.Set("Grpc-Status-Details-Bin", encodeBinHeader(stBytes))
231 }
232
233 if md := s.Trailer(); len(md) > 0 {
234 for k, vv := range md {
235 // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
236 if isReservedHeader(k) {
237 continue
238 }
239 for _, v := range vv {
240 // http2 ResponseWriter mechanism to send undeclared Trailers after
241 // the headers have possibly been written.
242 h.Add(http2.TrailerPrefix+k, encodeMetadataHeader(k, v))
243 }
244 }
245 }
246 })
247
248 if err == nil { // transport has not been closed
Akash Kankanala761955c2024-02-21 19:32:20 +0530249 // Note: The trailer fields are compressed with hpack after this call returns.
250 // No WireLength field is set here.
251 for _, sh := range ht.stats {
252 sh.HandleRPC(s.Context(), &stats.OutTrailer{
khenaidoo5fc5cea2021-08-11 17:39:16 -0400253 Trailer: s.trailer.Copy(),
254 })
255 }
256 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530257 ht.Close(errors.New("finished writing status"))
khenaidoo5fc5cea2021-08-11 17:39:16 -0400258 return err
259}
260
261// writePendingHeaders sets common and custom headers on the first
262// write call (Write, WriteHeader, or WriteStatus)
263func (ht *serverHandlerTransport) writePendingHeaders(s *Stream) {
264 ht.writeCommonHeaders(s)
265 ht.writeCustomHeaders(s)
266}
267
268// writeCommonHeaders sets common headers on the first write
269// call (Write, WriteHeader, or WriteStatus).
270func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) {
271 h := ht.rw.Header()
272 h["Date"] = nil // suppress Date to make tests happy; TODO: restore
273 h.Set("Content-Type", ht.contentType)
274
275 // Predeclare trailers we'll set later in WriteStatus (after the body).
276 // This is a SHOULD in the HTTP RFC, and the way you add (known)
277 // Trailers per the net/http.ResponseWriter contract.
278 // See https://golang.org/pkg/net/http/#ResponseWriter
279 // and https://golang.org/pkg/net/http/#example_ResponseWriter_trailers
280 h.Add("Trailer", "Grpc-Status")
281 h.Add("Trailer", "Grpc-Message")
282 h.Add("Trailer", "Grpc-Status-Details-Bin")
283
284 if s.sendCompress != "" {
285 h.Set("Grpc-Encoding", s.sendCompress)
286 }
287}
288
289// writeCustomHeaders sets custom headers set on the stream via SetHeader
290// on the first write call (Write, WriteHeader, or WriteStatus).
291func (ht *serverHandlerTransport) writeCustomHeaders(s *Stream) {
292 h := ht.rw.Header()
293
294 s.hdrMu.Lock()
295 for k, vv := range s.header {
296 if isReservedHeader(k) {
297 continue
298 }
299 for _, v := range vv {
300 h.Add(k, encodeMetadataHeader(k, v))
301 }
302 }
303
304 s.hdrMu.Unlock()
305}
306
307func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
308 headersWritten := s.updateHeaderSent()
309 return ht.do(func() {
310 if !headersWritten {
311 ht.writePendingHeaders(s)
312 }
313 ht.rw.Write(hdr)
314 ht.rw.Write(data)
315 ht.rw.(http.Flusher).Flush()
316 })
317}
318
319func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
320 if err := s.SetHeader(md); err != nil {
321 return err
322 }
323
324 headersWritten := s.updateHeaderSent()
325 err := ht.do(func() {
326 if !headersWritten {
327 ht.writePendingHeaders(s)
328 }
329
330 ht.rw.WriteHeader(200)
331 ht.rw.(http.Flusher).Flush()
332 })
333
334 if err == nil {
Akash Kankanala761955c2024-02-21 19:32:20 +0530335 for _, sh := range ht.stats {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400336 // Note: The header fields are compressed with hpack after this call returns.
337 // No WireLength field is set here.
Akash Kankanala761955c2024-02-21 19:32:20 +0530338 sh.HandleRPC(s.Context(), &stats.OutHeader{
khenaidoo5fc5cea2021-08-11 17:39:16 -0400339 Header: md.Copy(),
340 Compression: s.sendCompress,
341 })
342 }
343 }
344 return err
345}
346
347func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) {
348 // With this transport type there will be exactly 1 stream: this HTTP request.
349
350 ctx := ht.req.Context()
351 var cancel context.CancelFunc
352 if ht.timeoutSet {
353 ctx, cancel = context.WithTimeout(ctx, ht.timeout)
354 } else {
355 ctx, cancel = context.WithCancel(ctx)
356 }
357
358 // requestOver is closed when the status has been written via WriteStatus.
359 requestOver := make(chan struct{})
360 go func() {
361 select {
362 case <-requestOver:
363 case <-ht.closedCh:
364 case <-ht.req.Context().Done():
365 }
366 cancel()
Akash Kankanala761955c2024-02-21 19:32:20 +0530367 ht.Close(errors.New("request is done processing"))
khenaidoo5fc5cea2021-08-11 17:39:16 -0400368 }()
369
370 req := ht.req
371
372 s := &Stream{
373 id: 0, // irrelevant
374 requestRead: func(int) {},
375 cancel: cancel,
376 buf: newRecvBuffer(),
377 st: ht,
378 method: req.URL.Path,
379 recvCompress: req.Header.Get("grpc-encoding"),
380 contentSubtype: ht.contentSubtype,
381 }
382 pr := &peer.Peer{
383 Addr: ht.RemoteAddr(),
384 }
385 if req.TLS != nil {
386 pr.AuthInfo = credentials.TLSInfo{State: *req.TLS, CommonAuthInfo: credentials.CommonAuthInfo{SecurityLevel: credentials.PrivacyAndIntegrity}}
387 }
388 ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
389 s.ctx = peer.NewContext(ctx, pr)
Akash Kankanala761955c2024-02-21 19:32:20 +0530390 for _, sh := range ht.stats {
391 s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
khenaidoo5fc5cea2021-08-11 17:39:16 -0400392 inHeader := &stats.InHeader{
393 FullMethod: s.method,
394 RemoteAddr: ht.RemoteAddr(),
395 Compression: s.recvCompress,
396 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530397 sh.HandleRPC(s.ctx, inHeader)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400398 }
399 s.trReader = &transportReader{
400 reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf, freeBuffer: func(*bytes.Buffer) {}},
401 windowHandler: func(int) {},
402 }
403
404 // readerDone is closed when the Body.Read-ing goroutine exits.
405 readerDone := make(chan struct{})
406 go func() {
407 defer close(readerDone)
408
409 // TODO: minimize garbage, optimize recvBuffer code/ownership
410 const readSize = 8196
411 for buf := make([]byte, readSize); ; {
412 n, err := req.Body.Read(buf)
413 if n > 0 {
414 s.buf.put(recvMsg{buffer: bytes.NewBuffer(buf[:n:n])})
415 buf = buf[n:]
416 }
417 if err != nil {
418 s.buf.put(recvMsg{err: mapRecvMsgError(err)})
419 return
420 }
421 if len(buf) == 0 {
422 buf = make([]byte, readSize)
423 }
424 }
425 }()
426
427 // startStream is provided by the *grpc.Server's serveStreams.
428 // It starts a goroutine serving s and exits immediately.
429 // The goroutine that is started is the one that then calls
430 // into ht, calling WriteHeader, Write, WriteStatus, Close, etc.
431 startStream(s)
432
433 ht.runStream()
434 close(requestOver)
435
436 // Wait for reading goroutine to finish.
437 req.Body.Close()
438 <-readerDone
439}
440
441func (ht *serverHandlerTransport) runStream() {
442 for {
443 select {
444 case fn := <-ht.writes:
445 fn()
446 case <-ht.closedCh:
447 return
448 }
449 }
450}
451
452func (ht *serverHandlerTransport) IncrMsgSent() {}
453
454func (ht *serverHandlerTransport) IncrMsgRecv() {}
455
Akash Kankanala761955c2024-02-21 19:32:20 +0530456func (ht *serverHandlerTransport) Drain(debugData string) {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400457 panic("Drain() is not implemented")
458}
459
460// mapRecvMsgError returns the non-nil err into the appropriate
461// error value as expected by callers of *grpc.parser.recvMsg.
462// In particular, in can only be:
Joey Armstrongba3d9d12024-01-15 14:22:11 -0500463// - io.EOF
464// - io.ErrUnexpectedEOF
465// - of type transport.ConnectionError
466// - an error from the status package
khenaidoo5fc5cea2021-08-11 17:39:16 -0400467func mapRecvMsgError(err error) error {
468 if err == io.EOF || err == io.ErrUnexpectedEOF {
469 return err
470 }
471 if se, ok := err.(http2.StreamError); ok {
472 if code, ok := http2ErrConvTab[se.Code]; ok {
473 return status.Error(code, se.Error())
474 }
475 }
476 if strings.Contains(err.Error(), "body closed by handler") {
477 return status.Error(codes.Canceled, err.Error())
478 }
479 return connectionErrorf(true, err, err.Error())
480}