blob: eadf9e05fd18df39854529d787a6e622fdd0a590 [file] [log] [blame]
khenaidoo5fc5cea2021-08-11 17:39:16 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "fmt"
25 "io"
26 "math"
27 "net"
28 "net/http"
29 "reflect"
30 "runtime"
31 "strings"
32 "sync"
33 "sync/atomic"
34 "time"
35
36 "golang.org/x/net/trace"
37
38 "google.golang.org/grpc/codes"
39 "google.golang.org/grpc/credentials"
40 "google.golang.org/grpc/encoding"
41 "google.golang.org/grpc/encoding/proto"
42 "google.golang.org/grpc/grpclog"
43 "google.golang.org/grpc/internal"
44 "google.golang.org/grpc/internal/binarylog"
45 "google.golang.org/grpc/internal/channelz"
46 "google.golang.org/grpc/internal/grpcrand"
47 "google.golang.org/grpc/internal/grpcsync"
48 "google.golang.org/grpc/internal/transport"
49 "google.golang.org/grpc/keepalive"
50 "google.golang.org/grpc/metadata"
51 "google.golang.org/grpc/peer"
52 "google.golang.org/grpc/stats"
53 "google.golang.org/grpc/status"
54 "google.golang.org/grpc/tap"
55)
56
57const (
58 defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
59 defaultServerMaxSendMessageSize = math.MaxInt32
60
61 // Server transports are tracked in a map which is keyed on listener
62 // address. For regular gRPC traffic, connections are accepted in Serve()
63 // through a call to Accept(), and we use the actual listener address as key
64 // when we add it to the map. But for connections received through
65 // ServeHTTP(), we do not have a listener and hence use this dummy value.
66 listenerAddressForServeHTTP = "listenerAddressForServeHTTP"
67)
68
69func init() {
70 internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials {
71 return srv.opts.creds
72 }
73 internal.DrainServerTransports = func(srv *Server, addr string) {
74 srv.drainServerTransports(addr)
75 }
76}
77
78var statusOK = status.New(codes.OK, "")
79var logger = grpclog.Component("core")
80
81type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
82
83// MethodDesc represents an RPC service's method specification.
84type MethodDesc struct {
85 MethodName string
86 Handler methodHandler
87}
88
89// ServiceDesc represents an RPC service's specification.
90type ServiceDesc struct {
91 ServiceName string
92 // The pointer to the service interface. Used to check whether the user
93 // provided implementation satisfies the interface requirements.
94 HandlerType interface{}
95 Methods []MethodDesc
96 Streams []StreamDesc
97 Metadata interface{}
98}
99
100// serviceInfo wraps information about a service. It is very similar to
101// ServiceDesc and is constructed from it for internal purposes.
102type serviceInfo struct {
103 // Contains the implementation for the methods in this service.
104 serviceImpl interface{}
105 methods map[string]*MethodDesc
106 streams map[string]*StreamDesc
107 mdata interface{}
108}
109
110type serverWorkerData struct {
111 st transport.ServerTransport
112 wg *sync.WaitGroup
113 stream *transport.Stream
114}
115
116// Server is a gRPC server to serve RPC requests.
117type Server struct {
118 opts serverOptions
119
120 mu sync.Mutex // guards following
121 lis map[net.Listener]bool
122 // conns contains all active server transports. It is a map keyed on a
123 // listener address with the value being the set of active transports
124 // belonging to that listener.
125 conns map[string]map[transport.ServerTransport]bool
126 serve bool
127 drain bool
128 cv *sync.Cond // signaled when connections close for GracefulStop
129 services map[string]*serviceInfo // service name -> service info
130 events trace.EventLog
131
132 quit *grpcsync.Event
133 done *grpcsync.Event
134 channelzRemoveOnce sync.Once
135 serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
136
137 channelzID int64 // channelz unique identification number
138 czData *channelzData
139
140 serverWorkerChannels []chan *serverWorkerData
141}
142
143type serverOptions struct {
144 creds credentials.TransportCredentials
145 codec baseCodec
146 cp Compressor
147 dc Decompressor
148 unaryInt UnaryServerInterceptor
149 streamInt StreamServerInterceptor
150 chainUnaryInts []UnaryServerInterceptor
151 chainStreamInts []StreamServerInterceptor
152 inTapHandle tap.ServerInHandle
153 statsHandler stats.Handler
154 maxConcurrentStreams uint32
155 maxReceiveMessageSize int
156 maxSendMessageSize int
157 unknownStreamDesc *StreamDesc
158 keepaliveParams keepalive.ServerParameters
159 keepalivePolicy keepalive.EnforcementPolicy
160 initialWindowSize int32
161 initialConnWindowSize int32
162 writeBufferSize int
163 readBufferSize int
164 connectionTimeout time.Duration
165 maxHeaderListSize *uint32
166 headerTableSize *uint32
167 numServerWorkers uint32
168}
169
170var defaultServerOptions = serverOptions{
171 maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
172 maxSendMessageSize: defaultServerMaxSendMessageSize,
173 connectionTimeout: 120 * time.Second,
174 writeBufferSize: defaultWriteBufSize,
175 readBufferSize: defaultReadBufSize,
176}
177
178// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
179type ServerOption interface {
180 apply(*serverOptions)
181}
182
183// EmptyServerOption does not alter the server configuration. It can be embedded
184// in another structure to build custom server options.
185//
186// Experimental
187//
188// Notice: This type is EXPERIMENTAL and may be changed or removed in a
189// later release.
190type EmptyServerOption struct{}
191
192func (EmptyServerOption) apply(*serverOptions) {}
193
194// funcServerOption wraps a function that modifies serverOptions into an
195// implementation of the ServerOption interface.
196type funcServerOption struct {
197 f func(*serverOptions)
198}
199
200func (fdo *funcServerOption) apply(do *serverOptions) {
201 fdo.f(do)
202}
203
204func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
205 return &funcServerOption{
206 f: f,
207 }
208}
209
210// WriteBufferSize determines how much data can be batched before doing a write on the wire.
211// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
212// The default value for this buffer is 32KB.
213// Zero will disable the write buffer such that each write will be on underlying connection.
214// Note: A Send call may not directly translate to a write.
215func WriteBufferSize(s int) ServerOption {
216 return newFuncServerOption(func(o *serverOptions) {
217 o.writeBufferSize = s
218 })
219}
220
221// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
222// for one read syscall.
223// The default value for this buffer is 32KB.
224// Zero will disable read buffer for a connection so data framer can access the underlying
225// conn directly.
226func ReadBufferSize(s int) ServerOption {
227 return newFuncServerOption(func(o *serverOptions) {
228 o.readBufferSize = s
229 })
230}
231
232// InitialWindowSize returns a ServerOption that sets window size for stream.
233// The lower bound for window size is 64K and any value smaller than that will be ignored.
234func InitialWindowSize(s int32) ServerOption {
235 return newFuncServerOption(func(o *serverOptions) {
236 o.initialWindowSize = s
237 })
238}
239
240// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
241// The lower bound for window size is 64K and any value smaller than that will be ignored.
242func InitialConnWindowSize(s int32) ServerOption {
243 return newFuncServerOption(func(o *serverOptions) {
244 o.initialConnWindowSize = s
245 })
246}
247
248// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
249func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
250 if kp.Time > 0 && kp.Time < time.Second {
251 logger.Warning("Adjusting keepalive ping interval to minimum period of 1s")
252 kp.Time = time.Second
253 }
254
255 return newFuncServerOption(func(o *serverOptions) {
256 o.keepaliveParams = kp
257 })
258}
259
260// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
261func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
262 return newFuncServerOption(func(o *serverOptions) {
263 o.keepalivePolicy = kep
264 })
265}
266
267// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
268//
269// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
270//
271// Deprecated: register codecs using encoding.RegisterCodec. The server will
272// automatically use registered codecs based on the incoming requests' headers.
273// See also
274// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
275// Will be supported throughout 1.x.
276func CustomCodec(codec Codec) ServerOption {
277 return newFuncServerOption(func(o *serverOptions) {
278 o.codec = codec
279 })
280}
281
282// ForceServerCodec returns a ServerOption that sets a codec for message
283// marshaling and unmarshaling.
284//
285// This will override any lookups by content-subtype for Codecs registered
286// with RegisterCodec.
287//
288// See Content-Type on
289// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
290// more details. Also see the documentation on RegisterCodec and
291// CallContentSubtype for more details on the interaction between encoding.Codec
292// and content-subtype.
293//
294// This function is provided for advanced users; prefer to register codecs
295// using encoding.RegisterCodec.
296// The server will automatically use registered codecs based on the incoming
297// requests' headers. See also
298// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
299// Will be supported throughout 1.x.
300//
301// Experimental
302//
303// Notice: This API is EXPERIMENTAL and may be changed or removed in a
304// later release.
305func ForceServerCodec(codec encoding.Codec) ServerOption {
306 return newFuncServerOption(func(o *serverOptions) {
307 o.codec = codec
308 })
309}
310
311// RPCCompressor returns a ServerOption that sets a compressor for outbound
312// messages. For backward compatibility, all outbound messages will be sent
313// using this compressor, regardless of incoming message compression. By
314// default, server messages will be sent using the same compressor with which
315// request messages were sent.
316//
317// Deprecated: use encoding.RegisterCompressor instead. Will be supported
318// throughout 1.x.
319func RPCCompressor(cp Compressor) ServerOption {
320 return newFuncServerOption(func(o *serverOptions) {
321 o.cp = cp
322 })
323}
324
325// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
326// messages. It has higher priority than decompressors registered via
327// encoding.RegisterCompressor.
328//
329// Deprecated: use encoding.RegisterCompressor instead. Will be supported
330// throughout 1.x.
331func RPCDecompressor(dc Decompressor) ServerOption {
332 return newFuncServerOption(func(o *serverOptions) {
333 o.dc = dc
334 })
335}
336
337// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
338// If this is not set, gRPC uses the default limit.
339//
340// Deprecated: use MaxRecvMsgSize instead. Will be supported throughout 1.x.
341func MaxMsgSize(m int) ServerOption {
342 return MaxRecvMsgSize(m)
343}
344
345// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
346// If this is not set, gRPC uses the default 4MB.
347func MaxRecvMsgSize(m int) ServerOption {
348 return newFuncServerOption(func(o *serverOptions) {
349 o.maxReceiveMessageSize = m
350 })
351}
352
353// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
354// If this is not set, gRPC uses the default `math.MaxInt32`.
355func MaxSendMsgSize(m int) ServerOption {
356 return newFuncServerOption(func(o *serverOptions) {
357 o.maxSendMessageSize = m
358 })
359}
360
361// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
362// of concurrent streams to each ServerTransport.
363func MaxConcurrentStreams(n uint32) ServerOption {
364 return newFuncServerOption(func(o *serverOptions) {
365 o.maxConcurrentStreams = n
366 })
367}
368
369// Creds returns a ServerOption that sets credentials for server connections.
370func Creds(c credentials.TransportCredentials) ServerOption {
371 return newFuncServerOption(func(o *serverOptions) {
372 o.creds = c
373 })
374}
375
376// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
377// server. Only one unary interceptor can be installed. The construction of multiple
378// interceptors (e.g., chaining) can be implemented at the caller.
379func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
380 return newFuncServerOption(func(o *serverOptions) {
381 if o.unaryInt != nil {
382 panic("The unary server interceptor was already set and may not be reset.")
383 }
384 o.unaryInt = i
385 })
386}
387
388// ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor
389// for unary RPCs. The first interceptor will be the outer most,
390// while the last interceptor will be the inner most wrapper around the real call.
391// All unary interceptors added by this method will be chained.
392func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption {
393 return newFuncServerOption(func(o *serverOptions) {
394 o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
395 })
396}
397
398// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
399// server. Only one stream interceptor can be installed.
400func StreamInterceptor(i StreamServerInterceptor) ServerOption {
401 return newFuncServerOption(func(o *serverOptions) {
402 if o.streamInt != nil {
403 panic("The stream server interceptor was already set and may not be reset.")
404 }
405 o.streamInt = i
406 })
407}
408
409// ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor
410// for streaming RPCs. The first interceptor will be the outer most,
411// while the last interceptor will be the inner most wrapper around the real call.
412// All stream interceptors added by this method will be chained.
413func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption {
414 return newFuncServerOption(func(o *serverOptions) {
415 o.chainStreamInts = append(o.chainStreamInts, interceptors...)
416 })
417}
418
419// InTapHandle returns a ServerOption that sets the tap handle for all the server
420// transport to be created. Only one can be installed.
421//
422// Experimental
423//
424// Notice: This API is EXPERIMENTAL and may be changed or removed in a
425// later release.
426func InTapHandle(h tap.ServerInHandle) ServerOption {
427 return newFuncServerOption(func(o *serverOptions) {
428 if o.inTapHandle != nil {
429 panic("The tap handle was already set and may not be reset.")
430 }
431 o.inTapHandle = h
432 })
433}
434
435// StatsHandler returns a ServerOption that sets the stats handler for the server.
436func StatsHandler(h stats.Handler) ServerOption {
437 return newFuncServerOption(func(o *serverOptions) {
438 o.statsHandler = h
439 })
440}
441
442// UnknownServiceHandler returns a ServerOption that allows for adding a custom
443// unknown service handler. The provided method is a bidi-streaming RPC service
444// handler that will be invoked instead of returning the "unimplemented" gRPC
445// error whenever a request is received for an unregistered service or method.
446// The handling function and stream interceptor (if set) have full access to
447// the ServerStream, including its Context.
448func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
449 return newFuncServerOption(func(o *serverOptions) {
450 o.unknownStreamDesc = &StreamDesc{
451 StreamName: "unknown_service_handler",
452 Handler: streamHandler,
453 // We need to assume that the users of the streamHandler will want to use both.
454 ClientStreams: true,
455 ServerStreams: true,
456 }
457 })
458}
459
460// ConnectionTimeout returns a ServerOption that sets the timeout for
461// connection establishment (up to and including HTTP/2 handshaking) for all
462// new connections. If this is not set, the default is 120 seconds. A zero or
463// negative value will result in an immediate timeout.
464//
465// Experimental
466//
467// Notice: This API is EXPERIMENTAL and may be changed or removed in a
468// later release.
469func ConnectionTimeout(d time.Duration) ServerOption {
470 return newFuncServerOption(func(o *serverOptions) {
471 o.connectionTimeout = d
472 })
473}
474
475// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
476// of header list that the server is prepared to accept.
477func MaxHeaderListSize(s uint32) ServerOption {
478 return newFuncServerOption(func(o *serverOptions) {
479 o.maxHeaderListSize = &s
480 })
481}
482
483// HeaderTableSize returns a ServerOption that sets the size of dynamic
484// header table for stream.
485//
486// Experimental
487//
488// Notice: This API is EXPERIMENTAL and may be changed or removed in a
489// later release.
490func HeaderTableSize(s uint32) ServerOption {
491 return newFuncServerOption(func(o *serverOptions) {
492 o.headerTableSize = &s
493 })
494}
495
496// NumStreamWorkers returns a ServerOption that sets the number of worker
497// goroutines that should be used to process incoming streams. Setting this to
498// zero (default) will disable workers and spawn a new goroutine for each
499// stream.
500//
501// Experimental
502//
503// Notice: This API is EXPERIMENTAL and may be changed or removed in a
504// later release.
505func NumStreamWorkers(numServerWorkers uint32) ServerOption {
506 // TODO: If/when this API gets stabilized (i.e. stream workers become the
507 // only way streams are processed), change the behavior of the zero value to
508 // a sane default. Preliminary experiments suggest that a value equal to the
509 // number of CPUs available is most performant; requires thorough testing.
510 return newFuncServerOption(func(o *serverOptions) {
511 o.numServerWorkers = numServerWorkers
512 })
513}
514
515// serverWorkerResetThreshold defines how often the stack must be reset. Every
516// N requests, by spawning a new goroutine in its place, a worker can reset its
517// stack so that large stacks don't live in memory forever. 2^16 should allow
518// each goroutine stack to live for at least a few seconds in a typical
519// workload (assuming a QPS of a few thousand requests/sec).
520const serverWorkerResetThreshold = 1 << 16
521
522// serverWorkers blocks on a *transport.Stream channel forever and waits for
523// data to be fed by serveStreams. This allows different requests to be
524// processed by the same goroutine, removing the need for expensive stack
525// re-allocations (see the runtime.morestack problem [1]).
526//
527// [1] https://github.com/golang/go/issues/18138
528func (s *Server) serverWorker(ch chan *serverWorkerData) {
529 // To make sure all server workers don't reset at the same time, choose a
530 // random number of iterations before resetting.
531 threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold)
532 for completed := 0; completed < threshold; completed++ {
533 data, ok := <-ch
534 if !ok {
535 return
536 }
537 s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
538 data.wg.Done()
539 }
540 go s.serverWorker(ch)
541}
542
543// initServerWorkers creates worker goroutines and channels to process incoming
544// connections to reduce the time spent overall on runtime.morestack.
545func (s *Server) initServerWorkers() {
546 s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers)
547 for i := uint32(0); i < s.opts.numServerWorkers; i++ {
548 s.serverWorkerChannels[i] = make(chan *serverWorkerData)
549 go s.serverWorker(s.serverWorkerChannels[i])
550 }
551}
552
553func (s *Server) stopServerWorkers() {
554 for i := uint32(0); i < s.opts.numServerWorkers; i++ {
555 close(s.serverWorkerChannels[i])
556 }
557}
558
559// NewServer creates a gRPC server which has no service registered and has not
560// started to accept requests yet.
561func NewServer(opt ...ServerOption) *Server {
562 opts := defaultServerOptions
563 for _, o := range opt {
564 o.apply(&opts)
565 }
566 s := &Server{
567 lis: make(map[net.Listener]bool),
568 opts: opts,
569 conns: make(map[string]map[transport.ServerTransport]bool),
570 services: make(map[string]*serviceInfo),
571 quit: grpcsync.NewEvent(),
572 done: grpcsync.NewEvent(),
573 czData: new(channelzData),
574 }
575 chainUnaryServerInterceptors(s)
576 chainStreamServerInterceptors(s)
577 s.cv = sync.NewCond(&s.mu)
578 if EnableTracing {
579 _, file, line, _ := runtime.Caller(1)
580 s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
581 }
582
583 if s.opts.numServerWorkers > 0 {
584 s.initServerWorkers()
585 }
586
587 if channelz.IsOn() {
588 s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
589 }
590 return s
591}
592
593// printf records an event in s's event log, unless s has been stopped.
594// REQUIRES s.mu is held.
595func (s *Server) printf(format string, a ...interface{}) {
596 if s.events != nil {
597 s.events.Printf(format, a...)
598 }
599}
600
601// errorf records an error in s's event log, unless s has been stopped.
602// REQUIRES s.mu is held.
603func (s *Server) errorf(format string, a ...interface{}) {
604 if s.events != nil {
605 s.events.Errorf(format, a...)
606 }
607}
608
609// ServiceRegistrar wraps a single method that supports service registration. It
610// enables users to pass concrete types other than grpc.Server to the service
611// registration methods exported by the IDL generated code.
612type ServiceRegistrar interface {
613 // RegisterService registers a service and its implementation to the
614 // concrete type implementing this interface. It may not be called
615 // once the server has started serving.
616 // desc describes the service and its methods and handlers. impl is the
617 // service implementation which is passed to the method handlers.
618 RegisterService(desc *ServiceDesc, impl interface{})
619}
620
621// RegisterService registers a service and its implementation to the gRPC
622// server. It is called from the IDL generated code. This must be called before
623// invoking Serve. If ss is non-nil (for legacy code), its type is checked to
624// ensure it implements sd.HandlerType.
625func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
626 if ss != nil {
627 ht := reflect.TypeOf(sd.HandlerType).Elem()
628 st := reflect.TypeOf(ss)
629 if !st.Implements(ht) {
630 logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
631 }
632 }
633 s.register(sd, ss)
634}
635
636func (s *Server) register(sd *ServiceDesc, ss interface{}) {
637 s.mu.Lock()
638 defer s.mu.Unlock()
639 s.printf("RegisterService(%q)", sd.ServiceName)
640 if s.serve {
641 logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
642 }
643 if _, ok := s.services[sd.ServiceName]; ok {
644 logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
645 }
646 info := &serviceInfo{
647 serviceImpl: ss,
648 methods: make(map[string]*MethodDesc),
649 streams: make(map[string]*StreamDesc),
650 mdata: sd.Metadata,
651 }
652 for i := range sd.Methods {
653 d := &sd.Methods[i]
654 info.methods[d.MethodName] = d
655 }
656 for i := range sd.Streams {
657 d := &sd.Streams[i]
658 info.streams[d.StreamName] = d
659 }
660 s.services[sd.ServiceName] = info
661}
662
663// MethodInfo contains the information of an RPC including its method name and type.
664type MethodInfo struct {
665 // Name is the method name only, without the service name or package name.
666 Name string
667 // IsClientStream indicates whether the RPC is a client streaming RPC.
668 IsClientStream bool
669 // IsServerStream indicates whether the RPC is a server streaming RPC.
670 IsServerStream bool
671}
672
673// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
674type ServiceInfo struct {
675 Methods []MethodInfo
676 // Metadata is the metadata specified in ServiceDesc when registering service.
677 Metadata interface{}
678}
679
680// GetServiceInfo returns a map from service names to ServiceInfo.
681// Service names include the package names, in the form of <package>.<service>.
682func (s *Server) GetServiceInfo() map[string]ServiceInfo {
683 ret := make(map[string]ServiceInfo)
684 for n, srv := range s.services {
685 methods := make([]MethodInfo, 0, len(srv.methods)+len(srv.streams))
686 for m := range srv.methods {
687 methods = append(methods, MethodInfo{
688 Name: m,
689 IsClientStream: false,
690 IsServerStream: false,
691 })
692 }
693 for m, d := range srv.streams {
694 methods = append(methods, MethodInfo{
695 Name: m,
696 IsClientStream: d.ClientStreams,
697 IsServerStream: d.ServerStreams,
698 })
699 }
700
701 ret[n] = ServiceInfo{
702 Methods: methods,
703 Metadata: srv.mdata,
704 }
705 }
706 return ret
707}
708
709// ErrServerStopped indicates that the operation is now illegal because of
710// the server being stopped.
711var ErrServerStopped = errors.New("grpc: the server has been stopped")
712
713type listenSocket struct {
714 net.Listener
715 channelzID int64
716}
717
718func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
719 return &channelz.SocketInternalMetric{
720 SocketOptions: channelz.GetSocketOption(l.Listener),
721 LocalAddr: l.Listener.Addr(),
722 }
723}
724
725func (l *listenSocket) Close() error {
726 err := l.Listener.Close()
727 if channelz.IsOn() {
728 channelz.RemoveEntry(l.channelzID)
729 }
730 return err
731}
732
733// Serve accepts incoming connections on the listener lis, creating a new
734// ServerTransport and service goroutine for each. The service goroutines
735// read gRPC requests and then call the registered handlers to reply to them.
736// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
737// this method returns.
738// Serve will return a non-nil error unless Stop or GracefulStop is called.
739func (s *Server) Serve(lis net.Listener) error {
740 s.mu.Lock()
741 s.printf("serving")
742 s.serve = true
743 if s.lis == nil {
744 // Serve called after Stop or GracefulStop.
745 s.mu.Unlock()
746 lis.Close()
747 return ErrServerStopped
748 }
749
750 s.serveWG.Add(1)
751 defer func() {
752 s.serveWG.Done()
753 if s.quit.HasFired() {
754 // Stop or GracefulStop called; block until done and return nil.
755 <-s.done.Done()
756 }
757 }()
758
759 ls := &listenSocket{Listener: lis}
760 s.lis[ls] = true
761
762 if channelz.IsOn() {
763 ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
764 }
765 s.mu.Unlock()
766
767 defer func() {
768 s.mu.Lock()
769 if s.lis != nil && s.lis[ls] {
770 ls.Close()
771 delete(s.lis, ls)
772 }
773 s.mu.Unlock()
774 }()
775
776 var tempDelay time.Duration // how long to sleep on accept failure
777
778 for {
779 rawConn, err := lis.Accept()
780 if err != nil {
781 if ne, ok := err.(interface {
782 Temporary() bool
783 }); ok && ne.Temporary() {
784 if tempDelay == 0 {
785 tempDelay = 5 * time.Millisecond
786 } else {
787 tempDelay *= 2
788 }
789 if max := 1 * time.Second; tempDelay > max {
790 tempDelay = max
791 }
792 s.mu.Lock()
793 s.printf("Accept error: %v; retrying in %v", err, tempDelay)
794 s.mu.Unlock()
795 timer := time.NewTimer(tempDelay)
796 select {
797 case <-timer.C:
798 case <-s.quit.Done():
799 timer.Stop()
800 return nil
801 }
802 continue
803 }
804 s.mu.Lock()
805 s.printf("done serving; Accept = %v", err)
806 s.mu.Unlock()
807
808 if s.quit.HasFired() {
809 return nil
810 }
811 return err
812 }
813 tempDelay = 0
814 // Start a new goroutine to deal with rawConn so we don't stall this Accept
815 // loop goroutine.
816 //
817 // Make sure we account for the goroutine so GracefulStop doesn't nil out
818 // s.conns before this conn can be added.
819 s.serveWG.Add(1)
820 go func() {
821 s.handleRawConn(lis.Addr().String(), rawConn)
822 s.serveWG.Done()
823 }()
824 }
825}
826
827// handleRawConn forks a goroutine to handle a just-accepted connection that
828// has not had any I/O performed on it yet.
829func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
830 if s.quit.HasFired() {
831 rawConn.Close()
832 return
833 }
834 rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
835
836 // Finish handshaking (HTTP2)
837 st := s.newHTTP2Transport(rawConn)
838 rawConn.SetDeadline(time.Time{})
839 if st == nil {
840 return
841 }
842
843 if !s.addConn(lisAddr, st) {
844 return
845 }
846 go func() {
847 s.serveStreams(st)
848 s.removeConn(lisAddr, st)
849 }()
850}
851
852func (s *Server) drainServerTransports(addr string) {
853 s.mu.Lock()
854 conns := s.conns[addr]
855 for st := range conns {
856 st.Drain()
857 }
858 s.mu.Unlock()
859}
860
861// newHTTP2Transport sets up a http/2 transport (using the
862// gRPC http2 server transport in transport/http2_server.go).
863func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
864 config := &transport.ServerConfig{
865 MaxStreams: s.opts.maxConcurrentStreams,
866 ConnectionTimeout: s.opts.connectionTimeout,
867 Credentials: s.opts.creds,
868 InTapHandle: s.opts.inTapHandle,
869 StatsHandler: s.opts.statsHandler,
870 KeepaliveParams: s.opts.keepaliveParams,
871 KeepalivePolicy: s.opts.keepalivePolicy,
872 InitialWindowSize: s.opts.initialWindowSize,
873 InitialConnWindowSize: s.opts.initialConnWindowSize,
874 WriteBufferSize: s.opts.writeBufferSize,
875 ReadBufferSize: s.opts.readBufferSize,
876 ChannelzParentID: s.channelzID,
877 MaxHeaderListSize: s.opts.maxHeaderListSize,
878 HeaderTableSize: s.opts.headerTableSize,
879 }
880 st, err := transport.NewServerTransport(c, config)
881 if err != nil {
882 s.mu.Lock()
883 s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
884 s.mu.Unlock()
885 // ErrConnDispatched means that the connection was dispatched away from
886 // gRPC; those connections should be left open.
887 if err != credentials.ErrConnDispatched {
khenaidoo5cb0d402021-12-08 14:09:16 -0500888 // Don't log on ErrConnDispatched and io.EOF to prevent log spam.
khenaidoo5fc5cea2021-08-11 17:39:16 -0400889 if err != io.EOF {
890 channelz.Warning(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
891 }
khenaidoo5cb0d402021-12-08 14:09:16 -0500892 c.Close()
khenaidoo5fc5cea2021-08-11 17:39:16 -0400893 }
894 return nil
895 }
896
897 return st
898}
899
900func (s *Server) serveStreams(st transport.ServerTransport) {
901 defer st.Close()
902 var wg sync.WaitGroup
903
904 var roundRobinCounter uint32
905 st.HandleStreams(func(stream *transport.Stream) {
906 wg.Add(1)
907 if s.opts.numServerWorkers > 0 {
908 data := &serverWorkerData{st: st, wg: &wg, stream: stream}
909 select {
910 case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:
911 default:
912 // If all stream workers are busy, fallback to the default code path.
913 go func() {
914 s.handleStream(st, stream, s.traceInfo(st, stream))
915 wg.Done()
916 }()
917 }
918 } else {
919 go func() {
920 defer wg.Done()
921 s.handleStream(st, stream, s.traceInfo(st, stream))
922 }()
923 }
924 }, func(ctx context.Context, method string) context.Context {
925 if !EnableTracing {
926 return ctx
927 }
928 tr := trace.New("grpc.Recv."+methodFamily(method), method)
929 return trace.NewContext(ctx, tr)
930 })
931 wg.Wait()
932}
933
934var _ http.Handler = (*Server)(nil)
935
936// ServeHTTP implements the Go standard library's http.Handler
937// interface by responding to the gRPC request r, by looking up
938// the requested gRPC method in the gRPC server s.
939//
940// The provided HTTP request must have arrived on an HTTP/2
941// connection. When using the Go standard library's server,
942// practically this means that the Request must also have arrived
943// over TLS.
944//
945// To share one port (such as 443 for https) between gRPC and an
946// existing http.Handler, use a root http.Handler such as:
947//
948// if r.ProtoMajor == 2 && strings.HasPrefix(
949// r.Header.Get("Content-Type"), "application/grpc") {
950// grpcServer.ServeHTTP(w, r)
951// } else {
952// yourMux.ServeHTTP(w, r)
953// }
954//
955// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
956// separate from grpc-go's HTTP/2 server. Performance and features may vary
957// between the two paths. ServeHTTP does not support some gRPC features
958// available through grpc-go's HTTP/2 server.
959//
960// Experimental
961//
962// Notice: This API is EXPERIMENTAL and may be changed or removed in a
963// later release.
964func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
965 st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
966 if err != nil {
967 http.Error(w, err.Error(), http.StatusInternalServerError)
968 return
969 }
970 if !s.addConn(listenerAddressForServeHTTP, st) {
971 return
972 }
973 defer s.removeConn(listenerAddressForServeHTTP, st)
974 s.serveStreams(st)
975}
976
977// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
978// If tracing is not enabled, it returns nil.
979func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
980 if !EnableTracing {
981 return nil
982 }
983 tr, ok := trace.FromContext(stream.Context())
984 if !ok {
985 return nil
986 }
987
988 trInfo = &traceInfo{
989 tr: tr,
990 firstLine: firstLine{
991 client: false,
992 remoteAddr: st.RemoteAddr(),
993 },
994 }
995 if dl, ok := stream.Context().Deadline(); ok {
996 trInfo.firstLine.deadline = time.Until(dl)
997 }
998 return trInfo
999}
1000
1001func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
1002 s.mu.Lock()
1003 defer s.mu.Unlock()
1004 if s.conns == nil {
1005 st.Close()
1006 return false
1007 }
1008 if s.drain {
1009 // Transport added after we drained our existing conns: drain it
1010 // immediately.
1011 st.Drain()
1012 }
1013
1014 if s.conns[addr] == nil {
1015 // Create a map entry if this is the first connection on this listener.
1016 s.conns[addr] = make(map[transport.ServerTransport]bool)
1017 }
1018 s.conns[addr][st] = true
1019 return true
1020}
1021
1022func (s *Server) removeConn(addr string, st transport.ServerTransport) {
1023 s.mu.Lock()
1024 defer s.mu.Unlock()
1025
1026 conns := s.conns[addr]
1027 if conns != nil {
1028 delete(conns, st)
1029 if len(conns) == 0 {
1030 // If the last connection for this address is being removed, also
1031 // remove the map entry corresponding to the address. This is used
1032 // in GracefulStop() when waiting for all connections to be closed.
1033 delete(s.conns, addr)
1034 }
1035 s.cv.Broadcast()
1036 }
1037}
1038
1039func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
1040 return &channelz.ServerInternalMetric{
1041 CallsStarted: atomic.LoadInt64(&s.czData.callsStarted),
1042 CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded),
1043 CallsFailed: atomic.LoadInt64(&s.czData.callsFailed),
1044 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
1045 }
1046}
1047
1048func (s *Server) incrCallsStarted() {
1049 atomic.AddInt64(&s.czData.callsStarted, 1)
1050 atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
1051}
1052
1053func (s *Server) incrCallsSucceeded() {
1054 atomic.AddInt64(&s.czData.callsSucceeded, 1)
1055}
1056
1057func (s *Server) incrCallsFailed() {
1058 atomic.AddInt64(&s.czData.callsFailed, 1)
1059}
1060
1061func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
1062 data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
1063 if err != nil {
1064 channelz.Error(logger, s.channelzID, "grpc: server failed to encode response: ", err)
1065 return err
1066 }
1067 compData, err := compress(data, cp, comp)
1068 if err != nil {
1069 channelz.Error(logger, s.channelzID, "grpc: server failed to compress response: ", err)
1070 return err
1071 }
1072 hdr, payload := msgHeader(data, compData)
1073 // TODO(dfawley): should we be checking len(data) instead?
1074 if len(payload) > s.opts.maxSendMessageSize {
1075 return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
1076 }
1077 err = t.Write(stream, hdr, payload, opts)
1078 if err == nil && s.opts.statsHandler != nil {
1079 s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
1080 }
1081 return err
1082}
1083
1084// chainUnaryServerInterceptors chains all unary server interceptors into one.
1085func chainUnaryServerInterceptors(s *Server) {
1086 // Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will
1087 // be executed before any other chained interceptors.
1088 interceptors := s.opts.chainUnaryInts
1089 if s.opts.unaryInt != nil {
1090 interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
1091 }
1092
1093 var chainedInt UnaryServerInterceptor
1094 if len(interceptors) == 0 {
1095 chainedInt = nil
1096 } else if len(interceptors) == 1 {
1097 chainedInt = interceptors[0]
1098 } else {
1099 chainedInt = chainUnaryInterceptors(interceptors)
1100 }
1101
1102 s.opts.unaryInt = chainedInt
1103}
1104
1105func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerInterceptor {
1106 return func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
khenaidoo5cb0d402021-12-08 14:09:16 -05001107 // the struct ensures the variables are allocated together, rather than separately, since we
1108 // know they should be garbage collected together. This saves 1 allocation and decreases
1109 // time/call by about 10% on the microbenchmark.
1110 var state struct {
1111 i int
1112 next UnaryHandler
khenaidoo5fc5cea2021-08-11 17:39:16 -04001113 }
khenaidoo5cb0d402021-12-08 14:09:16 -05001114 state.next = func(ctx context.Context, req interface{}) (interface{}, error) {
1115 if state.i == len(interceptors)-1 {
1116 return interceptors[state.i](ctx, req, info, handler)
1117 }
1118 state.i++
1119 return interceptors[state.i-1](ctx, req, info, state.next)
1120 }
1121 return state.next(ctx, req)
khenaidoo5fc5cea2021-08-11 17:39:16 -04001122 }
1123}
1124
1125func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
1126 sh := s.opts.statsHandler
1127 if sh != nil || trInfo != nil || channelz.IsOn() {
1128 if channelz.IsOn() {
1129 s.incrCallsStarted()
1130 }
1131 var statsBegin *stats.Begin
1132 if sh != nil {
1133 beginTime := time.Now()
1134 statsBegin = &stats.Begin{
1135 BeginTime: beginTime,
1136 IsClientStream: false,
1137 IsServerStream: false,
1138 }
1139 sh.HandleRPC(stream.Context(), statsBegin)
1140 }
1141 if trInfo != nil {
1142 trInfo.tr.LazyLog(&trInfo.firstLine, false)
1143 }
1144 // The deferred error handling for tracing, stats handler and channelz are
1145 // combined into one function to reduce stack usage -- a defer takes ~56-64
1146 // bytes on the stack, so overflowing the stack will require a stack
1147 // re-allocation, which is expensive.
1148 //
1149 // To maintain behavior similar to separate deferred statements, statements
1150 // should be executed in the reverse order. That is, tracing first, stats
1151 // handler second, and channelz last. Note that panics *within* defers will
1152 // lead to different behavior, but that's an acceptable compromise; that
1153 // would be undefined behavior territory anyway.
1154 defer func() {
1155 if trInfo != nil {
1156 if err != nil && err != io.EOF {
1157 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1158 trInfo.tr.SetError()
1159 }
1160 trInfo.tr.Finish()
1161 }
1162
1163 if sh != nil {
1164 end := &stats.End{
1165 BeginTime: statsBegin.BeginTime,
1166 EndTime: time.Now(),
1167 }
1168 if err != nil && err != io.EOF {
1169 end.Error = toRPCErr(err)
1170 }
1171 sh.HandleRPC(stream.Context(), end)
1172 }
1173
1174 if channelz.IsOn() {
1175 if err != nil && err != io.EOF {
1176 s.incrCallsFailed()
1177 } else {
1178 s.incrCallsSucceeded()
1179 }
1180 }
1181 }()
1182 }
1183
1184 binlog := binarylog.GetMethodLogger(stream.Method())
1185 if binlog != nil {
1186 ctx := stream.Context()
1187 md, _ := metadata.FromIncomingContext(ctx)
1188 logEntry := &binarylog.ClientHeader{
1189 Header: md,
1190 MethodName: stream.Method(),
1191 PeerAddr: nil,
1192 }
1193 if deadline, ok := ctx.Deadline(); ok {
1194 logEntry.Timeout = time.Until(deadline)
1195 if logEntry.Timeout < 0 {
1196 logEntry.Timeout = 0
1197 }
1198 }
1199 if a := md[":authority"]; len(a) > 0 {
1200 logEntry.Authority = a[0]
1201 }
1202 if peer, ok := peer.FromContext(ctx); ok {
1203 logEntry.PeerAddr = peer.Addr
1204 }
1205 binlog.Log(logEntry)
1206 }
1207
1208 // comp and cp are used for compression. decomp and dc are used for
1209 // decompression. If comp and decomp are both set, they are the same;
1210 // however they are kept separate to ensure that at most one of the
1211 // compressor/decompressor variable pairs are set for use later.
1212 var comp, decomp encoding.Compressor
1213 var cp Compressor
1214 var dc Decompressor
1215
1216 // If dc is set and matches the stream's compression, use it. Otherwise, try
1217 // to find a matching registered compressor for decomp.
1218 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1219 dc = s.opts.dc
1220 } else if rc != "" && rc != encoding.Identity {
1221 decomp = encoding.GetCompressor(rc)
1222 if decomp == nil {
1223 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1224 t.WriteStatus(stream, st)
1225 return st.Err()
1226 }
1227 }
1228
1229 // If cp is set, use it. Otherwise, attempt to compress the response using
1230 // the incoming message compression method.
1231 //
1232 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1233 if s.opts.cp != nil {
1234 cp = s.opts.cp
1235 stream.SetSendCompress(cp.Type())
1236 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1237 // Legacy compressor not specified; attempt to respond with same encoding.
1238 comp = encoding.GetCompressor(rc)
1239 if comp != nil {
1240 stream.SetSendCompress(rc)
1241 }
1242 }
1243
1244 var payInfo *payloadInfo
1245 if sh != nil || binlog != nil {
1246 payInfo = &payloadInfo{}
1247 }
1248 d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
1249 if err != nil {
1250 if e := t.WriteStatus(stream, status.Convert(err)); e != nil {
1251 channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e)
1252 }
1253 return err
1254 }
1255 if channelz.IsOn() {
1256 t.IncrMsgRecv()
1257 }
1258 df := func(v interface{}) error {
1259 if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
1260 return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
1261 }
1262 if sh != nil {
1263 sh.HandleRPC(stream.Context(), &stats.InPayload{
1264 RecvTime: time.Now(),
1265 Payload: v,
1266 WireLength: payInfo.wireLength + headerLen,
1267 Data: d,
1268 Length: len(d),
1269 })
1270 }
1271 if binlog != nil {
1272 binlog.Log(&binarylog.ClientMessage{
1273 Message: d,
1274 })
1275 }
1276 if trInfo != nil {
1277 trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
1278 }
1279 return nil
1280 }
1281 ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1282 reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
1283 if appErr != nil {
1284 appStatus, ok := status.FromError(appErr)
1285 if !ok {
1286 // Convert appErr if it is not a grpc status error.
1287 appErr = status.Error(codes.Unknown, appErr.Error())
1288 appStatus, _ = status.FromError(appErr)
1289 }
1290 if trInfo != nil {
1291 trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1292 trInfo.tr.SetError()
1293 }
1294 if e := t.WriteStatus(stream, appStatus); e != nil {
1295 channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
1296 }
1297 if binlog != nil {
1298 if h, _ := stream.Header(); h.Len() > 0 {
1299 // Only log serverHeader if there was header. Otherwise it can
1300 // be trailer only.
1301 binlog.Log(&binarylog.ServerHeader{
1302 Header: h,
1303 })
1304 }
1305 binlog.Log(&binarylog.ServerTrailer{
1306 Trailer: stream.Trailer(),
1307 Err: appErr,
1308 })
1309 }
1310 return appErr
1311 }
1312 if trInfo != nil {
1313 trInfo.tr.LazyLog(stringer("OK"), false)
1314 }
1315 opts := &transport.Options{Last: true}
1316
1317 if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
1318 if err == io.EOF {
1319 // The entire stream is done (for unary RPC only).
1320 return err
1321 }
1322 if sts, ok := status.FromError(err); ok {
1323 if e := t.WriteStatus(stream, sts); e != nil {
1324 channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
1325 }
1326 } else {
1327 switch st := err.(type) {
1328 case transport.ConnectionError:
1329 // Nothing to do here.
1330 default:
1331 panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
1332 }
1333 }
1334 if binlog != nil {
1335 h, _ := stream.Header()
1336 binlog.Log(&binarylog.ServerHeader{
1337 Header: h,
1338 })
1339 binlog.Log(&binarylog.ServerTrailer{
1340 Trailer: stream.Trailer(),
1341 Err: appErr,
1342 })
1343 }
1344 return err
1345 }
1346 if binlog != nil {
1347 h, _ := stream.Header()
1348 binlog.Log(&binarylog.ServerHeader{
1349 Header: h,
1350 })
1351 binlog.Log(&binarylog.ServerMessage{
1352 Message: reply,
1353 })
1354 }
1355 if channelz.IsOn() {
1356 t.IncrMsgSent()
1357 }
1358 if trInfo != nil {
1359 trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
1360 }
1361 // TODO: Should we be logging if writing status failed here, like above?
1362 // Should the logging be in WriteStatus? Should we ignore the WriteStatus
1363 // error or allow the stats handler to see it?
1364 err = t.WriteStatus(stream, statusOK)
1365 if binlog != nil {
1366 binlog.Log(&binarylog.ServerTrailer{
1367 Trailer: stream.Trailer(),
1368 Err: appErr,
1369 })
1370 }
1371 return err
1372}
1373
1374// chainStreamServerInterceptors chains all stream server interceptors into one.
1375func chainStreamServerInterceptors(s *Server) {
1376 // Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will
1377 // be executed before any other chained interceptors.
1378 interceptors := s.opts.chainStreamInts
1379 if s.opts.streamInt != nil {
1380 interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...)
1381 }
1382
1383 var chainedInt StreamServerInterceptor
1384 if len(interceptors) == 0 {
1385 chainedInt = nil
1386 } else if len(interceptors) == 1 {
1387 chainedInt = interceptors[0]
1388 } else {
1389 chainedInt = chainStreamInterceptors(interceptors)
1390 }
1391
1392 s.opts.streamInt = chainedInt
1393}
1394
1395func chainStreamInterceptors(interceptors []StreamServerInterceptor) StreamServerInterceptor {
1396 return func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
khenaidoo5cb0d402021-12-08 14:09:16 -05001397 // the struct ensures the variables are allocated together, rather than separately, since we
1398 // know they should be garbage collected together. This saves 1 allocation and decreases
1399 // time/call by about 10% on the microbenchmark.
1400 var state struct {
1401 i int
1402 next StreamHandler
khenaidoo5fc5cea2021-08-11 17:39:16 -04001403 }
khenaidoo5cb0d402021-12-08 14:09:16 -05001404 state.next = func(srv interface{}, ss ServerStream) error {
1405 if state.i == len(interceptors)-1 {
1406 return interceptors[state.i](srv, ss, info, handler)
1407 }
1408 state.i++
1409 return interceptors[state.i-1](srv, ss, info, state.next)
1410 }
1411 return state.next(srv, ss)
khenaidoo5fc5cea2021-08-11 17:39:16 -04001412 }
1413}
1414
1415func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) {
1416 if channelz.IsOn() {
1417 s.incrCallsStarted()
1418 }
1419 sh := s.opts.statsHandler
1420 var statsBegin *stats.Begin
1421 if sh != nil {
1422 beginTime := time.Now()
1423 statsBegin = &stats.Begin{
1424 BeginTime: beginTime,
1425 IsClientStream: sd.ClientStreams,
1426 IsServerStream: sd.ServerStreams,
1427 }
1428 sh.HandleRPC(stream.Context(), statsBegin)
1429 }
1430 ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1431 ss := &serverStream{
1432 ctx: ctx,
1433 t: t,
1434 s: stream,
1435 p: &parser{r: stream},
1436 codec: s.getCodec(stream.ContentSubtype()),
1437 maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
1438 maxSendMessageSize: s.opts.maxSendMessageSize,
1439 trInfo: trInfo,
1440 statsHandler: sh,
1441 }
1442
1443 if sh != nil || trInfo != nil || channelz.IsOn() {
1444 // See comment in processUnaryRPC on defers.
1445 defer func() {
1446 if trInfo != nil {
1447 ss.mu.Lock()
1448 if err != nil && err != io.EOF {
1449 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1450 ss.trInfo.tr.SetError()
1451 }
1452 ss.trInfo.tr.Finish()
1453 ss.trInfo.tr = nil
1454 ss.mu.Unlock()
1455 }
1456
1457 if sh != nil {
1458 end := &stats.End{
1459 BeginTime: statsBegin.BeginTime,
1460 EndTime: time.Now(),
1461 }
1462 if err != nil && err != io.EOF {
1463 end.Error = toRPCErr(err)
1464 }
1465 sh.HandleRPC(stream.Context(), end)
1466 }
1467
1468 if channelz.IsOn() {
1469 if err != nil && err != io.EOF {
1470 s.incrCallsFailed()
1471 } else {
1472 s.incrCallsSucceeded()
1473 }
1474 }
1475 }()
1476 }
1477
1478 ss.binlog = binarylog.GetMethodLogger(stream.Method())
1479 if ss.binlog != nil {
1480 md, _ := metadata.FromIncomingContext(ctx)
1481 logEntry := &binarylog.ClientHeader{
1482 Header: md,
1483 MethodName: stream.Method(),
1484 PeerAddr: nil,
1485 }
1486 if deadline, ok := ctx.Deadline(); ok {
1487 logEntry.Timeout = time.Until(deadline)
1488 if logEntry.Timeout < 0 {
1489 logEntry.Timeout = 0
1490 }
1491 }
1492 if a := md[":authority"]; len(a) > 0 {
1493 logEntry.Authority = a[0]
1494 }
1495 if peer, ok := peer.FromContext(ss.Context()); ok {
1496 logEntry.PeerAddr = peer.Addr
1497 }
1498 ss.binlog.Log(logEntry)
1499 }
1500
1501 // If dc is set and matches the stream's compression, use it. Otherwise, try
1502 // to find a matching registered compressor for decomp.
1503 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1504 ss.dc = s.opts.dc
1505 } else if rc != "" && rc != encoding.Identity {
1506 ss.decomp = encoding.GetCompressor(rc)
1507 if ss.decomp == nil {
1508 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1509 t.WriteStatus(ss.s, st)
1510 return st.Err()
1511 }
1512 }
1513
1514 // If cp is set, use it. Otherwise, attempt to compress the response using
1515 // the incoming message compression method.
1516 //
1517 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1518 if s.opts.cp != nil {
1519 ss.cp = s.opts.cp
1520 stream.SetSendCompress(s.opts.cp.Type())
1521 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1522 // Legacy compressor not specified; attempt to respond with same encoding.
1523 ss.comp = encoding.GetCompressor(rc)
1524 if ss.comp != nil {
1525 stream.SetSendCompress(rc)
1526 }
1527 }
1528
1529 ss.ctx = newContextWithRPCInfo(ss.ctx, false, ss.codec, ss.cp, ss.comp)
1530
1531 if trInfo != nil {
1532 trInfo.tr.LazyLog(&trInfo.firstLine, false)
1533 }
1534 var appErr error
1535 var server interface{}
1536 if info != nil {
1537 server = info.serviceImpl
1538 }
1539 if s.opts.streamInt == nil {
1540 appErr = sd.Handler(server, ss)
1541 } else {
1542 info := &StreamServerInfo{
1543 FullMethod: stream.Method(),
1544 IsClientStream: sd.ClientStreams,
1545 IsServerStream: sd.ServerStreams,
1546 }
1547 appErr = s.opts.streamInt(server, ss, info, sd.Handler)
1548 }
1549 if appErr != nil {
1550 appStatus, ok := status.FromError(appErr)
1551 if !ok {
1552 appStatus = status.New(codes.Unknown, appErr.Error())
1553 appErr = appStatus.Err()
1554 }
1555 if trInfo != nil {
1556 ss.mu.Lock()
1557 ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1558 ss.trInfo.tr.SetError()
1559 ss.mu.Unlock()
1560 }
1561 t.WriteStatus(ss.s, appStatus)
1562 if ss.binlog != nil {
1563 ss.binlog.Log(&binarylog.ServerTrailer{
1564 Trailer: ss.s.Trailer(),
1565 Err: appErr,
1566 })
1567 }
1568 // TODO: Should we log an error from WriteStatus here and below?
1569 return appErr
1570 }
1571 if trInfo != nil {
1572 ss.mu.Lock()
1573 ss.trInfo.tr.LazyLog(stringer("OK"), false)
1574 ss.mu.Unlock()
1575 }
1576 err = t.WriteStatus(ss.s, statusOK)
1577 if ss.binlog != nil {
1578 ss.binlog.Log(&binarylog.ServerTrailer{
1579 Trailer: ss.s.Trailer(),
1580 Err: appErr,
1581 })
1582 }
1583 return err
1584}
1585
1586func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
1587 sm := stream.Method()
1588 if sm != "" && sm[0] == '/' {
1589 sm = sm[1:]
1590 }
1591 pos := strings.LastIndex(sm, "/")
1592 if pos == -1 {
1593 if trInfo != nil {
1594 trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
1595 trInfo.tr.SetError()
1596 }
1597 errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
1598 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1599 if trInfo != nil {
1600 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1601 trInfo.tr.SetError()
1602 }
1603 channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
1604 }
1605 if trInfo != nil {
1606 trInfo.tr.Finish()
1607 }
1608 return
1609 }
1610 service := sm[:pos]
1611 method := sm[pos+1:]
1612
1613 srv, knownService := s.services[service]
1614 if knownService {
1615 if md, ok := srv.methods[method]; ok {
1616 s.processUnaryRPC(t, stream, srv, md, trInfo)
1617 return
1618 }
1619 if sd, ok := srv.streams[method]; ok {
1620 s.processStreamingRPC(t, stream, srv, sd, trInfo)
1621 return
1622 }
1623 }
1624 // Unknown service, or known server unknown method.
1625 if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
1626 s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
1627 return
1628 }
1629 var errDesc string
1630 if !knownService {
1631 errDesc = fmt.Sprintf("unknown service %v", service)
1632 } else {
1633 errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
1634 }
1635 if trInfo != nil {
1636 trInfo.tr.LazyPrintf("%s", errDesc)
1637 trInfo.tr.SetError()
1638 }
1639 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1640 if trInfo != nil {
1641 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1642 trInfo.tr.SetError()
1643 }
1644 channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
1645 }
1646 if trInfo != nil {
1647 trInfo.tr.Finish()
1648 }
1649}
1650
1651// The key to save ServerTransportStream in the context.
1652type streamKey struct{}
1653
1654// NewContextWithServerTransportStream creates a new context from ctx and
1655// attaches stream to it.
1656//
1657// Experimental
1658//
1659// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1660// later release.
1661func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
1662 return context.WithValue(ctx, streamKey{}, stream)
1663}
1664
1665// ServerTransportStream is a minimal interface that a transport stream must
1666// implement. This can be used to mock an actual transport stream for tests of
1667// handler code that use, for example, grpc.SetHeader (which requires some
1668// stream to be in context).
1669//
1670// See also NewContextWithServerTransportStream.
1671//
1672// Experimental
1673//
1674// Notice: This type is EXPERIMENTAL and may be changed or removed in a
1675// later release.
1676type ServerTransportStream interface {
1677 Method() string
1678 SetHeader(md metadata.MD) error
1679 SendHeader(md metadata.MD) error
1680 SetTrailer(md metadata.MD) error
1681}
1682
1683// ServerTransportStreamFromContext returns the ServerTransportStream saved in
1684// ctx. Returns nil if the given context has no stream associated with it
1685// (which implies it is not an RPC invocation context).
1686//
1687// Experimental
1688//
1689// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1690// later release.
1691func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
1692 s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
1693 return s
1694}
1695
1696// Stop stops the gRPC server. It immediately closes all open
1697// connections and listeners.
1698// It cancels all active RPCs on the server side and the corresponding
1699// pending RPCs on the client side will get notified by connection
1700// errors.
1701func (s *Server) Stop() {
1702 s.quit.Fire()
1703
1704 defer func() {
1705 s.serveWG.Wait()
1706 s.done.Fire()
1707 }()
1708
1709 s.channelzRemoveOnce.Do(func() {
1710 if channelz.IsOn() {
1711 channelz.RemoveEntry(s.channelzID)
1712 }
1713 })
1714
1715 s.mu.Lock()
1716 listeners := s.lis
1717 s.lis = nil
1718 conns := s.conns
1719 s.conns = nil
1720 // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
1721 s.cv.Broadcast()
1722 s.mu.Unlock()
1723
1724 for lis := range listeners {
1725 lis.Close()
1726 }
1727 for _, cs := range conns {
1728 for st := range cs {
1729 st.Close()
1730 }
1731 }
1732 if s.opts.numServerWorkers > 0 {
1733 s.stopServerWorkers()
1734 }
1735
1736 s.mu.Lock()
1737 if s.events != nil {
1738 s.events.Finish()
1739 s.events = nil
1740 }
1741 s.mu.Unlock()
1742}
1743
1744// GracefulStop stops the gRPC server gracefully. It stops the server from
1745// accepting new connections and RPCs and blocks until all the pending RPCs are
1746// finished.
1747func (s *Server) GracefulStop() {
1748 s.quit.Fire()
1749 defer s.done.Fire()
1750
1751 s.channelzRemoveOnce.Do(func() {
1752 if channelz.IsOn() {
1753 channelz.RemoveEntry(s.channelzID)
1754 }
1755 })
1756 s.mu.Lock()
1757 if s.conns == nil {
1758 s.mu.Unlock()
1759 return
1760 }
1761
1762 for lis := range s.lis {
1763 lis.Close()
1764 }
1765 s.lis = nil
1766 if !s.drain {
1767 for _, conns := range s.conns {
1768 for st := range conns {
1769 st.Drain()
1770 }
1771 }
1772 s.drain = true
1773 }
1774
1775 // Wait for serving threads to be ready to exit. Only then can we be sure no
1776 // new conns will be created.
1777 s.mu.Unlock()
1778 s.serveWG.Wait()
1779 s.mu.Lock()
1780
1781 for len(s.conns) != 0 {
1782 s.cv.Wait()
1783 }
1784 s.conns = nil
1785 if s.events != nil {
1786 s.events.Finish()
1787 s.events = nil
1788 }
1789 s.mu.Unlock()
1790}
1791
1792// contentSubtype must be lowercase
1793// cannot return nil
1794func (s *Server) getCodec(contentSubtype string) baseCodec {
1795 if s.opts.codec != nil {
1796 return s.opts.codec
1797 }
1798 if contentSubtype == "" {
1799 return encoding.GetCodec(proto.Name)
1800 }
1801 codec := encoding.GetCodec(contentSubtype)
1802 if codec == nil {
1803 return encoding.GetCodec(proto.Name)
1804 }
1805 return codec
1806}
1807
1808// SetHeader sets the header metadata.
1809// When called multiple times, all the provided metadata will be merged.
1810// All the metadata will be sent out when one of the following happens:
1811// - grpc.SendHeader() is called;
1812// - The first response is sent out;
1813// - An RPC status is sent out (error or success).
1814func SetHeader(ctx context.Context, md metadata.MD) error {
1815 if md.Len() == 0 {
1816 return nil
1817 }
1818 stream := ServerTransportStreamFromContext(ctx)
1819 if stream == nil {
1820 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1821 }
1822 return stream.SetHeader(md)
1823}
1824
1825// SendHeader sends header metadata. It may be called at most once.
1826// The provided md and headers set by SetHeader() will be sent.
1827func SendHeader(ctx context.Context, md metadata.MD) error {
1828 stream := ServerTransportStreamFromContext(ctx)
1829 if stream == nil {
1830 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1831 }
1832 if err := stream.SendHeader(md); err != nil {
1833 return toRPCErr(err)
1834 }
1835 return nil
1836}
1837
1838// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
1839// When called more than once, all the provided metadata will be merged.
1840func SetTrailer(ctx context.Context, md metadata.MD) error {
1841 if md.Len() == 0 {
1842 return nil
1843 }
1844 stream := ServerTransportStreamFromContext(ctx)
1845 if stream == nil {
1846 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1847 }
1848 return stream.SetTrailer(md)
1849}
1850
1851// Method returns the method string for the server context. The returned
1852// string is in the format of "/service/method".
1853func Method(ctx context.Context) (string, bool) {
1854 s := ServerTransportStreamFromContext(ctx)
1855 if s == nil {
1856 return "", false
1857 }
1858 return s.Method(), true
1859}
1860
1861type channelzServer struct {
1862 s *Server
1863}
1864
1865func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
1866 return c.s.channelzMetric()
1867}