blob: f064b73e555d40c0053010363ae683a7c34f432b [file] [log] [blame]
Girish Gowdra64503432020-01-07 10:59:10 +05301/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "fmt"
25 "io"
26 "math"
27 "net"
28 "net/http"
29 "reflect"
30 "runtime"
31 "strings"
32 "sync"
33 "sync/atomic"
34 "time"
35
36 "golang.org/x/net/trace"
37
38 "google.golang.org/grpc/codes"
39 "google.golang.org/grpc/credentials"
40 "google.golang.org/grpc/encoding"
41 "google.golang.org/grpc/encoding/proto"
42 "google.golang.org/grpc/grpclog"
43 "google.golang.org/grpc/internal/binarylog"
44 "google.golang.org/grpc/internal/channelz"
45 "google.golang.org/grpc/internal/grpcsync"
46 "google.golang.org/grpc/internal/transport"
47 "google.golang.org/grpc/keepalive"
48 "google.golang.org/grpc/metadata"
49 "google.golang.org/grpc/peer"
50 "google.golang.org/grpc/stats"
51 "google.golang.org/grpc/status"
52 "google.golang.org/grpc/tap"
53)
54
55const (
56 defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
57 defaultServerMaxSendMessageSize = math.MaxInt32
58)
59
60var statusOK = status.New(codes.OK, "")
61
62type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
63
64// MethodDesc represents an RPC service's method specification.
65type MethodDesc struct {
66 MethodName string
67 Handler methodHandler
68}
69
70// ServiceDesc represents an RPC service's specification.
71type ServiceDesc struct {
72 ServiceName string
73 // The pointer to the service interface. Used to check whether the user
74 // provided implementation satisfies the interface requirements.
75 HandlerType interface{}
76 Methods []MethodDesc
77 Streams []StreamDesc
78 Metadata interface{}
79}
80
81// service consists of the information of the server serving this service and
82// the methods in this service.
83type service struct {
84 server interface{} // the server for service methods
85 md map[string]*MethodDesc
86 sd map[string]*StreamDesc
87 mdata interface{}
88}
89
90// Server is a gRPC server to serve RPC requests.
91type Server struct {
92 opts serverOptions
93
94 mu sync.Mutex // guards following
95 lis map[net.Listener]bool
96 conns map[transport.ServerTransport]bool
97 serve bool
98 drain bool
99 cv *sync.Cond // signaled when connections close for GracefulStop
100 m map[string]*service // service name -> service info
101 events trace.EventLog
102
103 quit *grpcsync.Event
104 done *grpcsync.Event
105 channelzRemoveOnce sync.Once
106 serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
107
108 channelzID int64 // channelz unique identification number
109 czData *channelzData
110}
111
112type serverOptions struct {
113 creds credentials.TransportCredentials
114 codec baseCodec
115 cp Compressor
116 dc Decompressor
117 unaryInt UnaryServerInterceptor
118 streamInt StreamServerInterceptor
119 inTapHandle tap.ServerInHandle
120 statsHandler stats.Handler
121 maxConcurrentStreams uint32
122 maxReceiveMessageSize int
123 maxSendMessageSize int
124 unknownStreamDesc *StreamDesc
125 keepaliveParams keepalive.ServerParameters
126 keepalivePolicy keepalive.EnforcementPolicy
127 initialWindowSize int32
128 initialConnWindowSize int32
129 writeBufferSize int
130 readBufferSize int
131 connectionTimeout time.Duration
132 maxHeaderListSize *uint32
133}
134
135var defaultServerOptions = serverOptions{
136 maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
137 maxSendMessageSize: defaultServerMaxSendMessageSize,
138 connectionTimeout: 120 * time.Second,
139 writeBufferSize: defaultWriteBufSize,
140 readBufferSize: defaultReadBufSize,
141}
142
143// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
144type ServerOption interface {
145 apply(*serverOptions)
146}
147
148// EmptyServerOption does not alter the server configuration. It can be embedded
149// in another structure to build custom server options.
150//
151// This API is EXPERIMENTAL.
152type EmptyServerOption struct{}
153
154func (EmptyServerOption) apply(*serverOptions) {}
155
156// funcServerOption wraps a function that modifies serverOptions into an
157// implementation of the ServerOption interface.
158type funcServerOption struct {
159 f func(*serverOptions)
160}
161
162func (fdo *funcServerOption) apply(do *serverOptions) {
163 fdo.f(do)
164}
165
166func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
167 return &funcServerOption{
168 f: f,
169 }
170}
171
172// WriteBufferSize determines how much data can be batched before doing a write on the wire.
173// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
174// The default value for this buffer is 32KB.
175// Zero will disable the write buffer such that each write will be on underlying connection.
176// Note: A Send call may not directly translate to a write.
177func WriteBufferSize(s int) ServerOption {
178 return newFuncServerOption(func(o *serverOptions) {
179 o.writeBufferSize = s
180 })
181}
182
183// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
184// for one read syscall.
185// The default value for this buffer is 32KB.
186// Zero will disable read buffer for a connection so data framer can access the underlying
187// conn directly.
188func ReadBufferSize(s int) ServerOption {
189 return newFuncServerOption(func(o *serverOptions) {
190 o.readBufferSize = s
191 })
192}
193
194// InitialWindowSize returns a ServerOption that sets window size for stream.
195// The lower bound for window size is 64K and any value smaller than that will be ignored.
196func InitialWindowSize(s int32) ServerOption {
197 return newFuncServerOption(func(o *serverOptions) {
198 o.initialWindowSize = s
199 })
200}
201
202// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
203// The lower bound for window size is 64K and any value smaller than that will be ignored.
204func InitialConnWindowSize(s int32) ServerOption {
205 return newFuncServerOption(func(o *serverOptions) {
206 o.initialConnWindowSize = s
207 })
208}
209
210// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
211func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
212 if kp.Time > 0 && kp.Time < time.Second {
213 grpclog.Warning("Adjusting keepalive ping interval to minimum period of 1s")
214 kp.Time = time.Second
215 }
216
217 return newFuncServerOption(func(o *serverOptions) {
218 o.keepaliveParams = kp
219 })
220}
221
222// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
223func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
224 return newFuncServerOption(func(o *serverOptions) {
225 o.keepalivePolicy = kep
226 })
227}
228
229// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
230//
231// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
232func CustomCodec(codec Codec) ServerOption {
233 return newFuncServerOption(func(o *serverOptions) {
234 o.codec = codec
235 })
236}
237
238// RPCCompressor returns a ServerOption that sets a compressor for outbound
239// messages. For backward compatibility, all outbound messages will be sent
240// using this compressor, regardless of incoming message compression. By
241// default, server messages will be sent using the same compressor with which
242// request messages were sent.
243//
244// Deprecated: use encoding.RegisterCompressor instead.
245func RPCCompressor(cp Compressor) ServerOption {
246 return newFuncServerOption(func(o *serverOptions) {
247 o.cp = cp
248 })
249}
250
251// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
252// messages. It has higher priority than decompressors registered via
253// encoding.RegisterCompressor.
254//
255// Deprecated: use encoding.RegisterCompressor instead.
256func RPCDecompressor(dc Decompressor) ServerOption {
257 return newFuncServerOption(func(o *serverOptions) {
258 o.dc = dc
259 })
260}
261
262// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
263// If this is not set, gRPC uses the default limit.
264//
265// Deprecated: use MaxRecvMsgSize instead.
266func MaxMsgSize(m int) ServerOption {
267 return MaxRecvMsgSize(m)
268}
269
270// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
271// If this is not set, gRPC uses the default 4MB.
272func MaxRecvMsgSize(m int) ServerOption {
273 return newFuncServerOption(func(o *serverOptions) {
274 o.maxReceiveMessageSize = m
275 })
276}
277
278// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
279// If this is not set, gRPC uses the default `math.MaxInt32`.
280func MaxSendMsgSize(m int) ServerOption {
281 return newFuncServerOption(func(o *serverOptions) {
282 o.maxSendMessageSize = m
283 })
284}
285
286// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
287// of concurrent streams to each ServerTransport.
288func MaxConcurrentStreams(n uint32) ServerOption {
289 return newFuncServerOption(func(o *serverOptions) {
290 o.maxConcurrentStreams = n
291 })
292}
293
294// Creds returns a ServerOption that sets credentials for server connections.
295func Creds(c credentials.TransportCredentials) ServerOption {
296 return newFuncServerOption(func(o *serverOptions) {
297 o.creds = c
298 })
299}
300
301// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
302// server. Only one unary interceptor can be installed. The construction of multiple
303// interceptors (e.g., chaining) can be implemented at the caller.
304func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
305 return newFuncServerOption(func(o *serverOptions) {
306 if o.unaryInt != nil {
307 panic("The unary server interceptor was already set and may not be reset.")
308 }
309 o.unaryInt = i
310 })
311}
312
313// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
314// server. Only one stream interceptor can be installed.
315func StreamInterceptor(i StreamServerInterceptor) ServerOption {
316 return newFuncServerOption(func(o *serverOptions) {
317 if o.streamInt != nil {
318 panic("The stream server interceptor was already set and may not be reset.")
319 }
320 o.streamInt = i
321 })
322}
323
324// InTapHandle returns a ServerOption that sets the tap handle for all the server
325// transport to be created. Only one can be installed.
326func InTapHandle(h tap.ServerInHandle) ServerOption {
327 return newFuncServerOption(func(o *serverOptions) {
328 if o.inTapHandle != nil {
329 panic("The tap handle was already set and may not be reset.")
330 }
331 o.inTapHandle = h
332 })
333}
334
335// StatsHandler returns a ServerOption that sets the stats handler for the server.
336func StatsHandler(h stats.Handler) ServerOption {
337 return newFuncServerOption(func(o *serverOptions) {
338 o.statsHandler = h
339 })
340}
341
342// UnknownServiceHandler returns a ServerOption that allows for adding a custom
343// unknown service handler. The provided method is a bidi-streaming RPC service
344// handler that will be invoked instead of returning the "unimplemented" gRPC
345// error whenever a request is received for an unregistered service or method.
346// The handling function has full access to the Context of the request and the
347// stream, and the invocation bypasses interceptors.
348func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
349 return newFuncServerOption(func(o *serverOptions) {
350 o.unknownStreamDesc = &StreamDesc{
351 StreamName: "unknown_service_handler",
352 Handler: streamHandler,
353 // We need to assume that the users of the streamHandler will want to use both.
354 ClientStreams: true,
355 ServerStreams: true,
356 }
357 })
358}
359
360// ConnectionTimeout returns a ServerOption that sets the timeout for
361// connection establishment (up to and including HTTP/2 handshaking) for all
362// new connections. If this is not set, the default is 120 seconds. A zero or
363// negative value will result in an immediate timeout.
364//
365// This API is EXPERIMENTAL.
366func ConnectionTimeout(d time.Duration) ServerOption {
367 return newFuncServerOption(func(o *serverOptions) {
368 o.connectionTimeout = d
369 })
370}
371
372// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
373// of header list that the server is prepared to accept.
374func MaxHeaderListSize(s uint32) ServerOption {
375 return newFuncServerOption(func(o *serverOptions) {
376 o.maxHeaderListSize = &s
377 })
378}
379
380// NewServer creates a gRPC server which has no service registered and has not
381// started to accept requests yet.
382func NewServer(opt ...ServerOption) *Server {
383 opts := defaultServerOptions
384 for _, o := range opt {
385 o.apply(&opts)
386 }
387 s := &Server{
388 lis: make(map[net.Listener]bool),
389 opts: opts,
390 conns: make(map[transport.ServerTransport]bool),
391 m: make(map[string]*service),
392 quit: grpcsync.NewEvent(),
393 done: grpcsync.NewEvent(),
394 czData: new(channelzData),
395 }
396 s.cv = sync.NewCond(&s.mu)
397 if EnableTracing {
398 _, file, line, _ := runtime.Caller(1)
399 s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
400 }
401
402 if channelz.IsOn() {
403 s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
404 }
405 return s
406}
407
408// printf records an event in s's event log, unless s has been stopped.
409// REQUIRES s.mu is held.
410func (s *Server) printf(format string, a ...interface{}) {
411 if s.events != nil {
412 s.events.Printf(format, a...)
413 }
414}
415
416// errorf records an error in s's event log, unless s has been stopped.
417// REQUIRES s.mu is held.
418func (s *Server) errorf(format string, a ...interface{}) {
419 if s.events != nil {
420 s.events.Errorf(format, a...)
421 }
422}
423
424// RegisterService registers a service and its implementation to the gRPC
425// server. It is called from the IDL generated code. This must be called before
426// invoking Serve.
427func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
428 ht := reflect.TypeOf(sd.HandlerType).Elem()
429 st := reflect.TypeOf(ss)
430 if !st.Implements(ht) {
431 grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
432 }
433 s.register(sd, ss)
434}
435
436func (s *Server) register(sd *ServiceDesc, ss interface{}) {
437 s.mu.Lock()
438 defer s.mu.Unlock()
439 s.printf("RegisterService(%q)", sd.ServiceName)
440 if s.serve {
441 grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
442 }
443 if _, ok := s.m[sd.ServiceName]; ok {
444 grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
445 }
446 srv := &service{
447 server: ss,
448 md: make(map[string]*MethodDesc),
449 sd: make(map[string]*StreamDesc),
450 mdata: sd.Metadata,
451 }
452 for i := range sd.Methods {
453 d := &sd.Methods[i]
454 srv.md[d.MethodName] = d
455 }
456 for i := range sd.Streams {
457 d := &sd.Streams[i]
458 srv.sd[d.StreamName] = d
459 }
460 s.m[sd.ServiceName] = srv
461}
462
463// MethodInfo contains the information of an RPC including its method name and type.
464type MethodInfo struct {
465 // Name is the method name only, without the service name or package name.
466 Name string
467 // IsClientStream indicates whether the RPC is a client streaming RPC.
468 IsClientStream bool
469 // IsServerStream indicates whether the RPC is a server streaming RPC.
470 IsServerStream bool
471}
472
473// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
474type ServiceInfo struct {
475 Methods []MethodInfo
476 // Metadata is the metadata specified in ServiceDesc when registering service.
477 Metadata interface{}
478}
479
480// GetServiceInfo returns a map from service names to ServiceInfo.
481// Service names include the package names, in the form of <package>.<service>.
482func (s *Server) GetServiceInfo() map[string]ServiceInfo {
483 ret := make(map[string]ServiceInfo)
484 for n, srv := range s.m {
485 methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
486 for m := range srv.md {
487 methods = append(methods, MethodInfo{
488 Name: m,
489 IsClientStream: false,
490 IsServerStream: false,
491 })
492 }
493 for m, d := range srv.sd {
494 methods = append(methods, MethodInfo{
495 Name: m,
496 IsClientStream: d.ClientStreams,
497 IsServerStream: d.ServerStreams,
498 })
499 }
500
501 ret[n] = ServiceInfo{
502 Methods: methods,
503 Metadata: srv.mdata,
504 }
505 }
506 return ret
507}
508
509// ErrServerStopped indicates that the operation is now illegal because of
510// the server being stopped.
511var ErrServerStopped = errors.New("grpc: the server has been stopped")
512
513func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
514 if s.opts.creds == nil {
515 return rawConn, nil, nil
516 }
517 return s.opts.creds.ServerHandshake(rawConn)
518}
519
520type listenSocket struct {
521 net.Listener
522 channelzID int64
523}
524
525func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
526 return &channelz.SocketInternalMetric{
527 SocketOptions: channelz.GetSocketOption(l.Listener),
528 LocalAddr: l.Listener.Addr(),
529 }
530}
531
532func (l *listenSocket) Close() error {
533 err := l.Listener.Close()
534 if channelz.IsOn() {
535 channelz.RemoveEntry(l.channelzID)
536 }
537 return err
538}
539
540// Serve accepts incoming connections on the listener lis, creating a new
541// ServerTransport and service goroutine for each. The service goroutines
542// read gRPC requests and then call the registered handlers to reply to them.
543// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
544// this method returns.
545// Serve will return a non-nil error unless Stop or GracefulStop is called.
546func (s *Server) Serve(lis net.Listener) error {
547 s.mu.Lock()
548 s.printf("serving")
549 s.serve = true
550 if s.lis == nil {
551 // Serve called after Stop or GracefulStop.
552 s.mu.Unlock()
553 lis.Close()
554 return ErrServerStopped
555 }
556
557 s.serveWG.Add(1)
558 defer func() {
559 s.serveWG.Done()
560 if s.quit.HasFired() {
561 // Stop or GracefulStop called; block until done and return nil.
562 <-s.done.Done()
563 }
564 }()
565
566 ls := &listenSocket{Listener: lis}
567 s.lis[ls] = true
568
569 if channelz.IsOn() {
570 ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
571 }
572 s.mu.Unlock()
573
574 defer func() {
575 s.mu.Lock()
576 if s.lis != nil && s.lis[ls] {
577 ls.Close()
578 delete(s.lis, ls)
579 }
580 s.mu.Unlock()
581 }()
582
583 var tempDelay time.Duration // how long to sleep on accept failure
584
585 for {
586 rawConn, err := lis.Accept()
587 if err != nil {
588 if ne, ok := err.(interface {
589 Temporary() bool
590 }); ok && ne.Temporary() {
591 if tempDelay == 0 {
592 tempDelay = 5 * time.Millisecond
593 } else {
594 tempDelay *= 2
595 }
596 if max := 1 * time.Second; tempDelay > max {
597 tempDelay = max
598 }
599 s.mu.Lock()
600 s.printf("Accept error: %v; retrying in %v", err, tempDelay)
601 s.mu.Unlock()
602 timer := time.NewTimer(tempDelay)
603 select {
604 case <-timer.C:
605 case <-s.quit.Done():
606 timer.Stop()
607 return nil
608 }
609 continue
610 }
611 s.mu.Lock()
612 s.printf("done serving; Accept = %v", err)
613 s.mu.Unlock()
614
615 if s.quit.HasFired() {
616 return nil
617 }
618 return err
619 }
620 tempDelay = 0
621 // Start a new goroutine to deal with rawConn so we don't stall this Accept
622 // loop goroutine.
623 //
624 // Make sure we account for the goroutine so GracefulStop doesn't nil out
625 // s.conns before this conn can be added.
626 s.serveWG.Add(1)
627 go func() {
628 s.handleRawConn(rawConn)
629 s.serveWG.Done()
630 }()
631 }
632}
633
634// handleRawConn forks a goroutine to handle a just-accepted connection that
635// has not had any I/O performed on it yet.
636func (s *Server) handleRawConn(rawConn net.Conn) {
637 if s.quit.HasFired() {
638 rawConn.Close()
639 return
640 }
641 rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
642 conn, authInfo, err := s.useTransportAuthenticator(rawConn)
643 if err != nil {
644 // ErrConnDispatched means that the connection was dispatched away from
645 // gRPC; those connections should be left open.
646 if err != credentials.ErrConnDispatched {
647 s.mu.Lock()
648 s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
649 s.mu.Unlock()
650 grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
651 rawConn.Close()
652 }
653 rawConn.SetDeadline(time.Time{})
654 return
655 }
656
657 // Finish handshaking (HTTP2)
658 st := s.newHTTP2Transport(conn, authInfo)
659 if st == nil {
660 return
661 }
662
663 rawConn.SetDeadline(time.Time{})
664 if !s.addConn(st) {
665 return
666 }
667 go func() {
668 s.serveStreams(st)
669 s.removeConn(st)
670 }()
671}
672
673// newHTTP2Transport sets up a http/2 transport (using the
674// gRPC http2 server transport in transport/http2_server.go).
675func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
676 config := &transport.ServerConfig{
677 MaxStreams: s.opts.maxConcurrentStreams,
678 AuthInfo: authInfo,
679 InTapHandle: s.opts.inTapHandle,
680 StatsHandler: s.opts.statsHandler,
681 KeepaliveParams: s.opts.keepaliveParams,
682 KeepalivePolicy: s.opts.keepalivePolicy,
683 InitialWindowSize: s.opts.initialWindowSize,
684 InitialConnWindowSize: s.opts.initialConnWindowSize,
685 WriteBufferSize: s.opts.writeBufferSize,
686 ReadBufferSize: s.opts.readBufferSize,
687 ChannelzParentID: s.channelzID,
688 MaxHeaderListSize: s.opts.maxHeaderListSize,
689 }
690 st, err := transport.NewServerTransport("http2", c, config)
691 if err != nil {
692 s.mu.Lock()
693 s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
694 s.mu.Unlock()
695 c.Close()
696 grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
697 return nil
698 }
699
700 return st
701}
702
703func (s *Server) serveStreams(st transport.ServerTransport) {
704 defer st.Close()
705 var wg sync.WaitGroup
706 st.HandleStreams(func(stream *transport.Stream) {
707 wg.Add(1)
708 go func() {
709 defer wg.Done()
710 s.handleStream(st, stream, s.traceInfo(st, stream))
711 }()
712 }, func(ctx context.Context, method string) context.Context {
713 if !EnableTracing {
714 return ctx
715 }
716 tr := trace.New("grpc.Recv."+methodFamily(method), method)
717 return trace.NewContext(ctx, tr)
718 })
719 wg.Wait()
720}
721
722var _ http.Handler = (*Server)(nil)
723
724// ServeHTTP implements the Go standard library's http.Handler
725// interface by responding to the gRPC request r, by looking up
726// the requested gRPC method in the gRPC server s.
727//
728// The provided HTTP request must have arrived on an HTTP/2
729// connection. When using the Go standard library's server,
730// practically this means that the Request must also have arrived
731// over TLS.
732//
733// To share one port (such as 443 for https) between gRPC and an
734// existing http.Handler, use a root http.Handler such as:
735//
736// if r.ProtoMajor == 2 && strings.HasPrefix(
737// r.Header.Get("Content-Type"), "application/grpc") {
738// grpcServer.ServeHTTP(w, r)
739// } else {
740// yourMux.ServeHTTP(w, r)
741// }
742//
743// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
744// separate from grpc-go's HTTP/2 server. Performance and features may vary
745// between the two paths. ServeHTTP does not support some gRPC features
746// available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
747// and subject to change.
748func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
749 st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
750 if err != nil {
751 http.Error(w, err.Error(), http.StatusInternalServerError)
752 return
753 }
754 if !s.addConn(st) {
755 return
756 }
757 defer s.removeConn(st)
758 s.serveStreams(st)
759}
760
761// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
762// If tracing is not enabled, it returns nil.
763func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
764 if !EnableTracing {
765 return nil
766 }
767 tr, ok := trace.FromContext(stream.Context())
768 if !ok {
769 return nil
770 }
771
772 trInfo = &traceInfo{
773 tr: tr,
774 firstLine: firstLine{
775 client: false,
776 remoteAddr: st.RemoteAddr(),
777 },
778 }
779 if dl, ok := stream.Context().Deadline(); ok {
780 trInfo.firstLine.deadline = time.Until(dl)
781 }
782 return trInfo
783}
784
785func (s *Server) addConn(st transport.ServerTransport) bool {
786 s.mu.Lock()
787 defer s.mu.Unlock()
788 if s.conns == nil {
789 st.Close()
790 return false
791 }
792 if s.drain {
793 // Transport added after we drained our existing conns: drain it
794 // immediately.
795 st.Drain()
796 }
797 s.conns[st] = true
798 return true
799}
800
801func (s *Server) removeConn(st transport.ServerTransport) {
802 s.mu.Lock()
803 defer s.mu.Unlock()
804 if s.conns != nil {
805 delete(s.conns, st)
806 s.cv.Broadcast()
807 }
808}
809
810func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
811 return &channelz.ServerInternalMetric{
812 CallsStarted: atomic.LoadInt64(&s.czData.callsStarted),
813 CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded),
814 CallsFailed: atomic.LoadInt64(&s.czData.callsFailed),
815 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
816 }
817}
818
819func (s *Server) incrCallsStarted() {
820 atomic.AddInt64(&s.czData.callsStarted, 1)
821 atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
822}
823
824func (s *Server) incrCallsSucceeded() {
825 atomic.AddInt64(&s.czData.callsSucceeded, 1)
826}
827
828func (s *Server) incrCallsFailed() {
829 atomic.AddInt64(&s.czData.callsFailed, 1)
830}
831
832func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
833 data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
834 if err != nil {
835 grpclog.Errorln("grpc: server failed to encode response: ", err)
836 return err
837 }
838 compData, err := compress(data, cp, comp)
839 if err != nil {
840 grpclog.Errorln("grpc: server failed to compress response: ", err)
841 return err
842 }
843 hdr, payload := msgHeader(data, compData)
844 // TODO(dfawley): should we be checking len(data) instead?
845 if len(payload) > s.opts.maxSendMessageSize {
846 return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
847 }
848 err = t.Write(stream, hdr, payload, opts)
849 if err == nil && s.opts.statsHandler != nil {
850 s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
851 }
852 return err
853}
854
855func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
856 if channelz.IsOn() {
857 s.incrCallsStarted()
858 defer func() {
859 if err != nil && err != io.EOF {
860 s.incrCallsFailed()
861 } else {
862 s.incrCallsSucceeded()
863 }
864 }()
865 }
866 sh := s.opts.statsHandler
867 if sh != nil {
868 beginTime := time.Now()
869 begin := &stats.Begin{
870 BeginTime: beginTime,
871 }
872 sh.HandleRPC(stream.Context(), begin)
873 defer func() {
874 end := &stats.End{
875 BeginTime: beginTime,
876 EndTime: time.Now(),
877 }
878 if err != nil && err != io.EOF {
879 end.Error = toRPCErr(err)
880 }
881 sh.HandleRPC(stream.Context(), end)
882 }()
883 }
884 if trInfo != nil {
885 defer trInfo.tr.Finish()
886 trInfo.tr.LazyLog(&trInfo.firstLine, false)
887 defer func() {
888 if err != nil && err != io.EOF {
889 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
890 trInfo.tr.SetError()
891 }
892 }()
893 }
894
895 binlog := binarylog.GetMethodLogger(stream.Method())
896 if binlog != nil {
897 ctx := stream.Context()
898 md, _ := metadata.FromIncomingContext(ctx)
899 logEntry := &binarylog.ClientHeader{
900 Header: md,
901 MethodName: stream.Method(),
902 PeerAddr: nil,
903 }
904 if deadline, ok := ctx.Deadline(); ok {
905 logEntry.Timeout = time.Until(deadline)
906 if logEntry.Timeout < 0 {
907 logEntry.Timeout = 0
908 }
909 }
910 if a := md[":authority"]; len(a) > 0 {
911 logEntry.Authority = a[0]
912 }
913 if peer, ok := peer.FromContext(ctx); ok {
914 logEntry.PeerAddr = peer.Addr
915 }
916 binlog.Log(logEntry)
917 }
918
919 // comp and cp are used for compression. decomp and dc are used for
920 // decompression. If comp and decomp are both set, they are the same;
921 // however they are kept separate to ensure that at most one of the
922 // compressor/decompressor variable pairs are set for use later.
923 var comp, decomp encoding.Compressor
924 var cp Compressor
925 var dc Decompressor
926
927 // If dc is set and matches the stream's compression, use it. Otherwise, try
928 // to find a matching registered compressor for decomp.
929 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
930 dc = s.opts.dc
931 } else if rc != "" && rc != encoding.Identity {
932 decomp = encoding.GetCompressor(rc)
933 if decomp == nil {
934 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
935 t.WriteStatus(stream, st)
936 return st.Err()
937 }
938 }
939
940 // If cp is set, use it. Otherwise, attempt to compress the response using
941 // the incoming message compression method.
942 //
943 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
944 if s.opts.cp != nil {
945 cp = s.opts.cp
946 stream.SetSendCompress(cp.Type())
947 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
948 // Legacy compressor not specified; attempt to respond with same encoding.
949 comp = encoding.GetCompressor(rc)
950 if comp != nil {
951 stream.SetSendCompress(rc)
952 }
953 }
954
955 var payInfo *payloadInfo
956 if sh != nil || binlog != nil {
957 payInfo = &payloadInfo{}
958 }
959 d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
960 if err != nil {
961 if st, ok := status.FromError(err); ok {
962 if e := t.WriteStatus(stream, st); e != nil {
963 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
964 }
965 }
966 return err
967 }
968 if channelz.IsOn() {
969 t.IncrMsgRecv()
970 }
971 df := func(v interface{}) error {
972 if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
973 return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
974 }
975 if sh != nil {
976 sh.HandleRPC(stream.Context(), &stats.InPayload{
977 RecvTime: time.Now(),
978 Payload: v,
979 WireLength: payInfo.wireLength,
980 Data: d,
981 Length: len(d),
982 })
983 }
984 if binlog != nil {
985 binlog.Log(&binarylog.ClientMessage{
986 Message: d,
987 })
988 }
989 if trInfo != nil {
990 trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
991 }
992 return nil
993 }
994 ctx := NewContextWithServerTransportStream(stream.Context(), stream)
995 reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
996 if appErr != nil {
997 appStatus, ok := status.FromError(appErr)
998 if !ok {
999 // Convert appErr if it is not a grpc status error.
1000 appErr = status.Error(codes.Unknown, appErr.Error())
1001 appStatus, _ = status.FromError(appErr)
1002 }
1003 if trInfo != nil {
1004 trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1005 trInfo.tr.SetError()
1006 }
1007 if e := t.WriteStatus(stream, appStatus); e != nil {
1008 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
1009 }
1010 if binlog != nil {
1011 if h, _ := stream.Header(); h.Len() > 0 {
1012 // Only log serverHeader if there was header. Otherwise it can
1013 // be trailer only.
1014 binlog.Log(&binarylog.ServerHeader{
1015 Header: h,
1016 })
1017 }
1018 binlog.Log(&binarylog.ServerTrailer{
1019 Trailer: stream.Trailer(),
1020 Err: appErr,
1021 })
1022 }
1023 return appErr
1024 }
1025 if trInfo != nil {
1026 trInfo.tr.LazyLog(stringer("OK"), false)
1027 }
1028 opts := &transport.Options{Last: true}
1029
1030 if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
1031 if err == io.EOF {
1032 // The entire stream is done (for unary RPC only).
1033 return err
1034 }
1035 if s, ok := status.FromError(err); ok {
1036 if e := t.WriteStatus(stream, s); e != nil {
1037 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
1038 }
1039 } else {
1040 switch st := err.(type) {
1041 case transport.ConnectionError:
1042 // Nothing to do here.
1043 default:
1044 panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
1045 }
1046 }
1047 if binlog != nil {
1048 h, _ := stream.Header()
1049 binlog.Log(&binarylog.ServerHeader{
1050 Header: h,
1051 })
1052 binlog.Log(&binarylog.ServerTrailer{
1053 Trailer: stream.Trailer(),
1054 Err: appErr,
1055 })
1056 }
1057 return err
1058 }
1059 if binlog != nil {
1060 h, _ := stream.Header()
1061 binlog.Log(&binarylog.ServerHeader{
1062 Header: h,
1063 })
1064 binlog.Log(&binarylog.ServerMessage{
1065 Message: reply,
1066 })
1067 }
1068 if channelz.IsOn() {
1069 t.IncrMsgSent()
1070 }
1071 if trInfo != nil {
1072 trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
1073 }
1074 // TODO: Should we be logging if writing status failed here, like above?
1075 // Should the logging be in WriteStatus? Should we ignore the WriteStatus
1076 // error or allow the stats handler to see it?
1077 err = t.WriteStatus(stream, statusOK)
1078 if binlog != nil {
1079 binlog.Log(&binarylog.ServerTrailer{
1080 Trailer: stream.Trailer(),
1081 Err: appErr,
1082 })
1083 }
1084 return err
1085}
1086
1087func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
1088 if channelz.IsOn() {
1089 s.incrCallsStarted()
1090 defer func() {
1091 if err != nil && err != io.EOF {
1092 s.incrCallsFailed()
1093 } else {
1094 s.incrCallsSucceeded()
1095 }
1096 }()
1097 }
1098 sh := s.opts.statsHandler
1099 if sh != nil {
1100 beginTime := time.Now()
1101 begin := &stats.Begin{
1102 BeginTime: beginTime,
1103 }
1104 sh.HandleRPC(stream.Context(), begin)
1105 defer func() {
1106 end := &stats.End{
1107 BeginTime: beginTime,
1108 EndTime: time.Now(),
1109 }
1110 if err != nil && err != io.EOF {
1111 end.Error = toRPCErr(err)
1112 }
1113 sh.HandleRPC(stream.Context(), end)
1114 }()
1115 }
1116 ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1117 ss := &serverStream{
1118 ctx: ctx,
1119 t: t,
1120 s: stream,
1121 p: &parser{r: stream},
1122 codec: s.getCodec(stream.ContentSubtype()),
1123 maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
1124 maxSendMessageSize: s.opts.maxSendMessageSize,
1125 trInfo: trInfo,
1126 statsHandler: sh,
1127 }
1128
1129 ss.binlog = binarylog.GetMethodLogger(stream.Method())
1130 if ss.binlog != nil {
1131 md, _ := metadata.FromIncomingContext(ctx)
1132 logEntry := &binarylog.ClientHeader{
1133 Header: md,
1134 MethodName: stream.Method(),
1135 PeerAddr: nil,
1136 }
1137 if deadline, ok := ctx.Deadline(); ok {
1138 logEntry.Timeout = time.Until(deadline)
1139 if logEntry.Timeout < 0 {
1140 logEntry.Timeout = 0
1141 }
1142 }
1143 if a := md[":authority"]; len(a) > 0 {
1144 logEntry.Authority = a[0]
1145 }
1146 if peer, ok := peer.FromContext(ss.Context()); ok {
1147 logEntry.PeerAddr = peer.Addr
1148 }
1149 ss.binlog.Log(logEntry)
1150 }
1151
1152 // If dc is set and matches the stream's compression, use it. Otherwise, try
1153 // to find a matching registered compressor for decomp.
1154 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1155 ss.dc = s.opts.dc
1156 } else if rc != "" && rc != encoding.Identity {
1157 ss.decomp = encoding.GetCompressor(rc)
1158 if ss.decomp == nil {
1159 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1160 t.WriteStatus(ss.s, st)
1161 return st.Err()
1162 }
1163 }
1164
1165 // If cp is set, use it. Otherwise, attempt to compress the response using
1166 // the incoming message compression method.
1167 //
1168 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1169 if s.opts.cp != nil {
1170 ss.cp = s.opts.cp
1171 stream.SetSendCompress(s.opts.cp.Type())
1172 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1173 // Legacy compressor not specified; attempt to respond with same encoding.
1174 ss.comp = encoding.GetCompressor(rc)
1175 if ss.comp != nil {
1176 stream.SetSendCompress(rc)
1177 }
1178 }
1179
1180 if trInfo != nil {
1181 trInfo.tr.LazyLog(&trInfo.firstLine, false)
1182 defer func() {
1183 ss.mu.Lock()
1184 if err != nil && err != io.EOF {
1185 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1186 ss.trInfo.tr.SetError()
1187 }
1188 ss.trInfo.tr.Finish()
1189 ss.trInfo.tr = nil
1190 ss.mu.Unlock()
1191 }()
1192 }
1193 var appErr error
1194 var server interface{}
1195 if srv != nil {
1196 server = srv.server
1197 }
1198 if s.opts.streamInt == nil {
1199 appErr = sd.Handler(server, ss)
1200 } else {
1201 info := &StreamServerInfo{
1202 FullMethod: stream.Method(),
1203 IsClientStream: sd.ClientStreams,
1204 IsServerStream: sd.ServerStreams,
1205 }
1206 appErr = s.opts.streamInt(server, ss, info, sd.Handler)
1207 }
1208 if appErr != nil {
1209 appStatus, ok := status.FromError(appErr)
1210 if !ok {
1211 appStatus = status.New(codes.Unknown, appErr.Error())
1212 appErr = appStatus.Err()
1213 }
1214 if trInfo != nil {
1215 ss.mu.Lock()
1216 ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1217 ss.trInfo.tr.SetError()
1218 ss.mu.Unlock()
1219 }
1220 t.WriteStatus(ss.s, appStatus)
1221 if ss.binlog != nil {
1222 ss.binlog.Log(&binarylog.ServerTrailer{
1223 Trailer: ss.s.Trailer(),
1224 Err: appErr,
1225 })
1226 }
1227 // TODO: Should we log an error from WriteStatus here and below?
1228 return appErr
1229 }
1230 if trInfo != nil {
1231 ss.mu.Lock()
1232 ss.trInfo.tr.LazyLog(stringer("OK"), false)
1233 ss.mu.Unlock()
1234 }
1235 err = t.WriteStatus(ss.s, statusOK)
1236 if ss.binlog != nil {
1237 ss.binlog.Log(&binarylog.ServerTrailer{
1238 Trailer: ss.s.Trailer(),
1239 Err: appErr,
1240 })
1241 }
1242 return err
1243}
1244
1245func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
1246 sm := stream.Method()
1247 if sm != "" && sm[0] == '/' {
1248 sm = sm[1:]
1249 }
1250 pos := strings.LastIndex(sm, "/")
1251 if pos == -1 {
1252 if trInfo != nil {
1253 trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
1254 trInfo.tr.SetError()
1255 }
1256 errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
1257 if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
1258 if trInfo != nil {
1259 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1260 trInfo.tr.SetError()
1261 }
1262 grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
1263 }
1264 if trInfo != nil {
1265 trInfo.tr.Finish()
1266 }
1267 return
1268 }
1269 service := sm[:pos]
1270 method := sm[pos+1:]
1271
1272 srv, knownService := s.m[service]
1273 if knownService {
1274 if md, ok := srv.md[method]; ok {
1275 s.processUnaryRPC(t, stream, srv, md, trInfo)
1276 return
1277 }
1278 if sd, ok := srv.sd[method]; ok {
1279 s.processStreamingRPC(t, stream, srv, sd, trInfo)
1280 return
1281 }
1282 }
1283 // Unknown service, or known server unknown method.
1284 if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
1285 s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
1286 return
1287 }
1288 var errDesc string
1289 if !knownService {
1290 errDesc = fmt.Sprintf("unknown service %v", service)
1291 } else {
1292 errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
1293 }
1294 if trInfo != nil {
1295 trInfo.tr.LazyPrintf("%s", errDesc)
1296 trInfo.tr.SetError()
1297 }
1298 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1299 if trInfo != nil {
1300 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1301 trInfo.tr.SetError()
1302 }
1303 grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
1304 }
1305 if trInfo != nil {
1306 trInfo.tr.Finish()
1307 }
1308}
1309
1310// The key to save ServerTransportStream in the context.
1311type streamKey struct{}
1312
1313// NewContextWithServerTransportStream creates a new context from ctx and
1314// attaches stream to it.
1315//
1316// This API is EXPERIMENTAL.
1317func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
1318 return context.WithValue(ctx, streamKey{}, stream)
1319}
1320
1321// ServerTransportStream is a minimal interface that a transport stream must
1322// implement. This can be used to mock an actual transport stream for tests of
1323// handler code that use, for example, grpc.SetHeader (which requires some
1324// stream to be in context).
1325//
1326// See also NewContextWithServerTransportStream.
1327//
1328// This API is EXPERIMENTAL.
1329type ServerTransportStream interface {
1330 Method() string
1331 SetHeader(md metadata.MD) error
1332 SendHeader(md metadata.MD) error
1333 SetTrailer(md metadata.MD) error
1334}
1335
1336// ServerTransportStreamFromContext returns the ServerTransportStream saved in
1337// ctx. Returns nil if the given context has no stream associated with it
1338// (which implies it is not an RPC invocation context).
1339//
1340// This API is EXPERIMENTAL.
1341func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
1342 s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
1343 return s
1344}
1345
1346// Stop stops the gRPC server. It immediately closes all open
1347// connections and listeners.
1348// It cancels all active RPCs on the server side and the corresponding
1349// pending RPCs on the client side will get notified by connection
1350// errors.
1351func (s *Server) Stop() {
1352 s.quit.Fire()
1353
1354 defer func() {
1355 s.serveWG.Wait()
1356 s.done.Fire()
1357 }()
1358
1359 s.channelzRemoveOnce.Do(func() {
1360 if channelz.IsOn() {
1361 channelz.RemoveEntry(s.channelzID)
1362 }
1363 })
1364
1365 s.mu.Lock()
1366 listeners := s.lis
1367 s.lis = nil
1368 st := s.conns
1369 s.conns = nil
1370 // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
1371 s.cv.Broadcast()
1372 s.mu.Unlock()
1373
1374 for lis := range listeners {
1375 lis.Close()
1376 }
1377 for c := range st {
1378 c.Close()
1379 }
1380
1381 s.mu.Lock()
1382 if s.events != nil {
1383 s.events.Finish()
1384 s.events = nil
1385 }
1386 s.mu.Unlock()
1387}
1388
1389// GracefulStop stops the gRPC server gracefully. It stops the server from
1390// accepting new connections and RPCs and blocks until all the pending RPCs are
1391// finished.
1392func (s *Server) GracefulStop() {
1393 s.quit.Fire()
1394 defer s.done.Fire()
1395
1396 s.channelzRemoveOnce.Do(func() {
1397 if channelz.IsOn() {
1398 channelz.RemoveEntry(s.channelzID)
1399 }
1400 })
1401 s.mu.Lock()
1402 if s.conns == nil {
1403 s.mu.Unlock()
1404 return
1405 }
1406
1407 for lis := range s.lis {
1408 lis.Close()
1409 }
1410 s.lis = nil
1411 if !s.drain {
1412 for st := range s.conns {
1413 st.Drain()
1414 }
1415 s.drain = true
1416 }
1417
1418 // Wait for serving threads to be ready to exit. Only then can we be sure no
1419 // new conns will be created.
1420 s.mu.Unlock()
1421 s.serveWG.Wait()
1422 s.mu.Lock()
1423
1424 for len(s.conns) != 0 {
1425 s.cv.Wait()
1426 }
1427 s.conns = nil
1428 if s.events != nil {
1429 s.events.Finish()
1430 s.events = nil
1431 }
1432 s.mu.Unlock()
1433}
1434
1435// contentSubtype must be lowercase
1436// cannot return nil
1437func (s *Server) getCodec(contentSubtype string) baseCodec {
1438 if s.opts.codec != nil {
1439 return s.opts.codec
1440 }
1441 if contentSubtype == "" {
1442 return encoding.GetCodec(proto.Name)
1443 }
1444 codec := encoding.GetCodec(contentSubtype)
1445 if codec == nil {
1446 return encoding.GetCodec(proto.Name)
1447 }
1448 return codec
1449}
1450
1451// SetHeader sets the header metadata.
1452// When called multiple times, all the provided metadata will be merged.
1453// All the metadata will be sent out when one of the following happens:
1454// - grpc.SendHeader() is called;
1455// - The first response is sent out;
1456// - An RPC status is sent out (error or success).
1457func SetHeader(ctx context.Context, md metadata.MD) error {
1458 if md.Len() == 0 {
1459 return nil
1460 }
1461 stream := ServerTransportStreamFromContext(ctx)
1462 if stream == nil {
1463 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1464 }
1465 return stream.SetHeader(md)
1466}
1467
1468// SendHeader sends header metadata. It may be called at most once.
1469// The provided md and headers set by SetHeader() will be sent.
1470func SendHeader(ctx context.Context, md metadata.MD) error {
1471 stream := ServerTransportStreamFromContext(ctx)
1472 if stream == nil {
1473 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1474 }
1475 if err := stream.SendHeader(md); err != nil {
1476 return toRPCErr(err)
1477 }
1478 return nil
1479}
1480
1481// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
1482// When called more than once, all the provided metadata will be merged.
1483func SetTrailer(ctx context.Context, md metadata.MD) error {
1484 if md.Len() == 0 {
1485 return nil
1486 }
1487 stream := ServerTransportStreamFromContext(ctx)
1488 if stream == nil {
1489 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1490 }
1491 return stream.SetTrailer(md)
1492}
1493
1494// Method returns the method string for the server context. The returned
1495// string is in the format of "/service/method".
1496func Method(ctx context.Context) (string, bool) {
1497 s := ServerTransportStreamFromContext(ctx)
1498 if s == nil {
1499 return "", false
1500 }
1501 return s.Method(), true
1502}
1503
1504type channelzServer struct {
1505 s *Server
1506}
1507
1508func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
1509 return c.s.channelzMetric()
1510}