blob: 495a4f9b825ea58988181542bb5c25470a18c48a [file] [log] [blame]
Zack Williamse940c7a2019-08-21 14:25:39 -07001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "fmt"
25 "io"
26 "math"
27 "net"
28 "net/http"
29 "reflect"
30 "runtime"
31 "strings"
32 "sync"
33 "sync/atomic"
34 "time"
35
36 "golang.org/x/net/trace"
37
38 "google.golang.org/grpc/codes"
39 "google.golang.org/grpc/credentials"
40 "google.golang.org/grpc/encoding"
41 "google.golang.org/grpc/encoding/proto"
42 "google.golang.org/grpc/grpclog"
43 "google.golang.org/grpc/internal/binarylog"
44 "google.golang.org/grpc/internal/channelz"
45 "google.golang.org/grpc/internal/transport"
46 "google.golang.org/grpc/keepalive"
47 "google.golang.org/grpc/metadata"
48 "google.golang.org/grpc/peer"
49 "google.golang.org/grpc/stats"
50 "google.golang.org/grpc/status"
51 "google.golang.org/grpc/tap"
52)
53
54const (
55 defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
56 defaultServerMaxSendMessageSize = math.MaxInt32
57)
58
59type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
60
61// MethodDesc represents an RPC service's method specification.
62type MethodDesc struct {
63 MethodName string
64 Handler methodHandler
65}
66
67// ServiceDesc represents an RPC service's specification.
68type ServiceDesc struct {
69 ServiceName string
70 // The pointer to the service interface. Used to check whether the user
71 // provided implementation satisfies the interface requirements.
72 HandlerType interface{}
73 Methods []MethodDesc
74 Streams []StreamDesc
75 Metadata interface{}
76}
77
78// service consists of the information of the server serving this service and
79// the methods in this service.
80type service struct {
81 server interface{} // the server for service methods
82 md map[string]*MethodDesc
83 sd map[string]*StreamDesc
84 mdata interface{}
85}
86
87// Server is a gRPC server to serve RPC requests.
88type Server struct {
89 opts serverOptions
90
91 mu sync.Mutex // guards following
92 lis map[net.Listener]bool
93 conns map[io.Closer]bool
94 serve bool
95 drain bool
96 cv *sync.Cond // signaled when connections close for GracefulStop
97 m map[string]*service // service name -> service info
98 events trace.EventLog
99
100 quit chan struct{}
101 done chan struct{}
102 quitOnce sync.Once
103 doneOnce sync.Once
104 channelzRemoveOnce sync.Once
105 serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
106
107 channelzID int64 // channelz unique identification number
108 czData *channelzData
109}
110
111type serverOptions struct {
112 creds credentials.TransportCredentials
113 codec baseCodec
114 cp Compressor
115 dc Decompressor
116 unaryInt UnaryServerInterceptor
117 streamInt StreamServerInterceptor
118 inTapHandle tap.ServerInHandle
119 statsHandler stats.Handler
120 maxConcurrentStreams uint32
121 maxReceiveMessageSize int
122 maxSendMessageSize int
123 unknownStreamDesc *StreamDesc
124 keepaliveParams keepalive.ServerParameters
125 keepalivePolicy keepalive.EnforcementPolicy
126 initialWindowSize int32
127 initialConnWindowSize int32
128 writeBufferSize int
129 readBufferSize int
130 connectionTimeout time.Duration
131 maxHeaderListSize *uint32
132}
133
134var defaultServerOptions = serverOptions{
135 maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
136 maxSendMessageSize: defaultServerMaxSendMessageSize,
137 connectionTimeout: 120 * time.Second,
138 writeBufferSize: defaultWriteBufSize,
139 readBufferSize: defaultReadBufSize,
140}
141
142// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
143type ServerOption interface {
144 apply(*serverOptions)
145}
146
147// EmptyServerOption does not alter the server configuration. It can be embedded
148// in another structure to build custom server options.
149//
150// This API is EXPERIMENTAL.
151type EmptyServerOption struct{}
152
153func (EmptyServerOption) apply(*serverOptions) {}
154
155// funcServerOption wraps a function that modifies serverOptions into an
156// implementation of the ServerOption interface.
157type funcServerOption struct {
158 f func(*serverOptions)
159}
160
161func (fdo *funcServerOption) apply(do *serverOptions) {
162 fdo.f(do)
163}
164
165func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
166 return &funcServerOption{
167 f: f,
168 }
169}
170
171// WriteBufferSize determines how much data can be batched before doing a write on the wire.
172// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
173// The default value for this buffer is 32KB.
174// Zero will disable the write buffer such that each write will be on underlying connection.
175// Note: A Send call may not directly translate to a write.
176func WriteBufferSize(s int) ServerOption {
177 return newFuncServerOption(func(o *serverOptions) {
178 o.writeBufferSize = s
179 })
180}
181
182// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
183// for one read syscall.
184// The default value for this buffer is 32KB.
185// Zero will disable read buffer for a connection so data framer can access the underlying
186// conn directly.
187func ReadBufferSize(s int) ServerOption {
188 return newFuncServerOption(func(o *serverOptions) {
189 o.readBufferSize = s
190 })
191}
192
193// InitialWindowSize returns a ServerOption that sets window size for stream.
194// The lower bound for window size is 64K and any value smaller than that will be ignored.
195func InitialWindowSize(s int32) ServerOption {
196 return newFuncServerOption(func(o *serverOptions) {
197 o.initialWindowSize = s
198 })
199}
200
201// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
202// The lower bound for window size is 64K and any value smaller than that will be ignored.
203func InitialConnWindowSize(s int32) ServerOption {
204 return newFuncServerOption(func(o *serverOptions) {
205 o.initialConnWindowSize = s
206 })
207}
208
209// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
210func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
211 if kp.Time > 0 && kp.Time < time.Second {
212 grpclog.Warning("Adjusting keepalive ping interval to minimum period of 1s")
213 kp.Time = time.Second
214 }
215
216 return newFuncServerOption(func(o *serverOptions) {
217 o.keepaliveParams = kp
218 })
219}
220
221// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
222func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
223 return newFuncServerOption(func(o *serverOptions) {
224 o.keepalivePolicy = kep
225 })
226}
227
228// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
229//
230// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
231func CustomCodec(codec Codec) ServerOption {
232 return newFuncServerOption(func(o *serverOptions) {
233 o.codec = codec
234 })
235}
236
237// RPCCompressor returns a ServerOption that sets a compressor for outbound
238// messages. For backward compatibility, all outbound messages will be sent
239// using this compressor, regardless of incoming message compression. By
240// default, server messages will be sent using the same compressor with which
241// request messages were sent.
242//
243// Deprecated: use encoding.RegisterCompressor instead.
244func RPCCompressor(cp Compressor) ServerOption {
245 return newFuncServerOption(func(o *serverOptions) {
246 o.cp = cp
247 })
248}
249
250// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
251// messages. It has higher priority than decompressors registered via
252// encoding.RegisterCompressor.
253//
254// Deprecated: use encoding.RegisterCompressor instead.
255func RPCDecompressor(dc Decompressor) ServerOption {
256 return newFuncServerOption(func(o *serverOptions) {
257 o.dc = dc
258 })
259}
260
261// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
262// If this is not set, gRPC uses the default limit.
263//
264// Deprecated: use MaxRecvMsgSize instead.
265func MaxMsgSize(m int) ServerOption {
266 return MaxRecvMsgSize(m)
267}
268
269// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
270// If this is not set, gRPC uses the default 4MB.
271func MaxRecvMsgSize(m int) ServerOption {
272 return newFuncServerOption(func(o *serverOptions) {
273 o.maxReceiveMessageSize = m
274 })
275}
276
277// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
278// If this is not set, gRPC uses the default `math.MaxInt32`.
279func MaxSendMsgSize(m int) ServerOption {
280 return newFuncServerOption(func(o *serverOptions) {
281 o.maxSendMessageSize = m
282 })
283}
284
285// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
286// of concurrent streams to each ServerTransport.
287func MaxConcurrentStreams(n uint32) ServerOption {
288 return newFuncServerOption(func(o *serverOptions) {
289 o.maxConcurrentStreams = n
290 })
291}
292
293// Creds returns a ServerOption that sets credentials for server connections.
294func Creds(c credentials.TransportCredentials) ServerOption {
295 return newFuncServerOption(func(o *serverOptions) {
296 o.creds = c
297 })
298}
299
300// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
301// server. Only one unary interceptor can be installed. The construction of multiple
302// interceptors (e.g., chaining) can be implemented at the caller.
303func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
304 return newFuncServerOption(func(o *serverOptions) {
305 if o.unaryInt != nil {
306 panic("The unary server interceptor was already set and may not be reset.")
307 }
308 o.unaryInt = i
309 })
310}
311
312// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
313// server. Only one stream interceptor can be installed.
314func StreamInterceptor(i StreamServerInterceptor) ServerOption {
315 return newFuncServerOption(func(o *serverOptions) {
316 if o.streamInt != nil {
317 panic("The stream server interceptor was already set and may not be reset.")
318 }
319 o.streamInt = i
320 })
321}
322
323// InTapHandle returns a ServerOption that sets the tap handle for all the server
324// transport to be created. Only one can be installed.
325func InTapHandle(h tap.ServerInHandle) ServerOption {
326 return newFuncServerOption(func(o *serverOptions) {
327 if o.inTapHandle != nil {
328 panic("The tap handle was already set and may not be reset.")
329 }
330 o.inTapHandle = h
331 })
332}
333
334// StatsHandler returns a ServerOption that sets the stats handler for the server.
335func StatsHandler(h stats.Handler) ServerOption {
336 return newFuncServerOption(func(o *serverOptions) {
337 o.statsHandler = h
338 })
339}
340
341// UnknownServiceHandler returns a ServerOption that allows for adding a custom
342// unknown service handler. The provided method is a bidi-streaming RPC service
343// handler that will be invoked instead of returning the "unimplemented" gRPC
344// error whenever a request is received for an unregistered service or method.
345// The handling function has full access to the Context of the request and the
346// stream, and the invocation bypasses interceptors.
347func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
348 return newFuncServerOption(func(o *serverOptions) {
349 o.unknownStreamDesc = &StreamDesc{
350 StreamName: "unknown_service_handler",
351 Handler: streamHandler,
352 // We need to assume that the users of the streamHandler will want to use both.
353 ClientStreams: true,
354 ServerStreams: true,
355 }
356 })
357}
358
359// ConnectionTimeout returns a ServerOption that sets the timeout for
360// connection establishment (up to and including HTTP/2 handshaking) for all
361// new connections. If this is not set, the default is 120 seconds. A zero or
362// negative value will result in an immediate timeout.
363//
364// This API is EXPERIMENTAL.
365func ConnectionTimeout(d time.Duration) ServerOption {
366 return newFuncServerOption(func(o *serverOptions) {
367 o.connectionTimeout = d
368 })
369}
370
371// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
372// of header list that the server is prepared to accept.
373func MaxHeaderListSize(s uint32) ServerOption {
374 return newFuncServerOption(func(o *serverOptions) {
375 o.maxHeaderListSize = &s
376 })
377}
378
379// NewServer creates a gRPC server which has no service registered and has not
380// started to accept requests yet.
381func NewServer(opt ...ServerOption) *Server {
382 opts := defaultServerOptions
383 for _, o := range opt {
384 o.apply(&opts)
385 }
386 s := &Server{
387 lis: make(map[net.Listener]bool),
388 opts: opts,
389 conns: make(map[io.Closer]bool),
390 m: make(map[string]*service),
391 quit: make(chan struct{}),
392 done: make(chan struct{}),
393 czData: new(channelzData),
394 }
395 s.cv = sync.NewCond(&s.mu)
396 if EnableTracing {
397 _, file, line, _ := runtime.Caller(1)
398 s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
399 }
400
401 if channelz.IsOn() {
402 s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
403 }
404 return s
405}
406
407// printf records an event in s's event log, unless s has been stopped.
408// REQUIRES s.mu is held.
409func (s *Server) printf(format string, a ...interface{}) {
410 if s.events != nil {
411 s.events.Printf(format, a...)
412 }
413}
414
415// errorf records an error in s's event log, unless s has been stopped.
416// REQUIRES s.mu is held.
417func (s *Server) errorf(format string, a ...interface{}) {
418 if s.events != nil {
419 s.events.Errorf(format, a...)
420 }
421}
422
423// RegisterService registers a service and its implementation to the gRPC
424// server. It is called from the IDL generated code. This must be called before
425// invoking Serve.
426func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
427 ht := reflect.TypeOf(sd.HandlerType).Elem()
428 st := reflect.TypeOf(ss)
429 if !st.Implements(ht) {
430 grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
431 }
432 s.register(sd, ss)
433}
434
435func (s *Server) register(sd *ServiceDesc, ss interface{}) {
436 s.mu.Lock()
437 defer s.mu.Unlock()
438 s.printf("RegisterService(%q)", sd.ServiceName)
439 if s.serve {
440 grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
441 }
442 if _, ok := s.m[sd.ServiceName]; ok {
443 grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
444 }
445 srv := &service{
446 server: ss,
447 md: make(map[string]*MethodDesc),
448 sd: make(map[string]*StreamDesc),
449 mdata: sd.Metadata,
450 }
451 for i := range sd.Methods {
452 d := &sd.Methods[i]
453 srv.md[d.MethodName] = d
454 }
455 for i := range sd.Streams {
456 d := &sd.Streams[i]
457 srv.sd[d.StreamName] = d
458 }
459 s.m[sd.ServiceName] = srv
460}
461
462// MethodInfo contains the information of an RPC including its method name and type.
463type MethodInfo struct {
464 // Name is the method name only, without the service name or package name.
465 Name string
466 // IsClientStream indicates whether the RPC is a client streaming RPC.
467 IsClientStream bool
468 // IsServerStream indicates whether the RPC is a server streaming RPC.
469 IsServerStream bool
470}
471
472// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
473type ServiceInfo struct {
474 Methods []MethodInfo
475 // Metadata is the metadata specified in ServiceDesc when registering service.
476 Metadata interface{}
477}
478
479// GetServiceInfo returns a map from service names to ServiceInfo.
480// Service names include the package names, in the form of <package>.<service>.
481func (s *Server) GetServiceInfo() map[string]ServiceInfo {
482 ret := make(map[string]ServiceInfo)
483 for n, srv := range s.m {
484 methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
485 for m := range srv.md {
486 methods = append(methods, MethodInfo{
487 Name: m,
488 IsClientStream: false,
489 IsServerStream: false,
490 })
491 }
492 for m, d := range srv.sd {
493 methods = append(methods, MethodInfo{
494 Name: m,
495 IsClientStream: d.ClientStreams,
496 IsServerStream: d.ServerStreams,
497 })
498 }
499
500 ret[n] = ServiceInfo{
501 Methods: methods,
502 Metadata: srv.mdata,
503 }
504 }
505 return ret
506}
507
508// ErrServerStopped indicates that the operation is now illegal because of
509// the server being stopped.
510var ErrServerStopped = errors.New("grpc: the server has been stopped")
511
512func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
513 if s.opts.creds == nil {
514 return rawConn, nil, nil
515 }
516 return s.opts.creds.ServerHandshake(rawConn)
517}
518
519type listenSocket struct {
520 net.Listener
521 channelzID int64
522}
523
524func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
525 return &channelz.SocketInternalMetric{
526 SocketOptions: channelz.GetSocketOption(l.Listener),
527 LocalAddr: l.Listener.Addr(),
528 }
529}
530
531func (l *listenSocket) Close() error {
532 err := l.Listener.Close()
533 if channelz.IsOn() {
534 channelz.RemoveEntry(l.channelzID)
535 }
536 return err
537}
538
539// Serve accepts incoming connections on the listener lis, creating a new
540// ServerTransport and service goroutine for each. The service goroutines
541// read gRPC requests and then call the registered handlers to reply to them.
542// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
543// this method returns.
544// Serve will return a non-nil error unless Stop or GracefulStop is called.
545func (s *Server) Serve(lis net.Listener) error {
546 s.mu.Lock()
547 s.printf("serving")
548 s.serve = true
549 if s.lis == nil {
550 // Serve called after Stop or GracefulStop.
551 s.mu.Unlock()
552 lis.Close()
553 return ErrServerStopped
554 }
555
556 s.serveWG.Add(1)
557 defer func() {
558 s.serveWG.Done()
559 select {
560 // Stop or GracefulStop called; block until done and return nil.
561 case <-s.quit:
562 <-s.done
563 default:
564 }
565 }()
566
567 ls := &listenSocket{Listener: lis}
568 s.lis[ls] = true
569
570 if channelz.IsOn() {
571 ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
572 }
573 s.mu.Unlock()
574
575 defer func() {
576 s.mu.Lock()
577 if s.lis != nil && s.lis[ls] {
578 ls.Close()
579 delete(s.lis, ls)
580 }
581 s.mu.Unlock()
582 }()
583
584 var tempDelay time.Duration // how long to sleep on accept failure
585
586 for {
587 rawConn, err := lis.Accept()
588 if err != nil {
589 if ne, ok := err.(interface {
590 Temporary() bool
591 }); ok && ne.Temporary() {
592 if tempDelay == 0 {
593 tempDelay = 5 * time.Millisecond
594 } else {
595 tempDelay *= 2
596 }
597 if max := 1 * time.Second; tempDelay > max {
598 tempDelay = max
599 }
600 s.mu.Lock()
601 s.printf("Accept error: %v; retrying in %v", err, tempDelay)
602 s.mu.Unlock()
603 timer := time.NewTimer(tempDelay)
604 select {
605 case <-timer.C:
606 case <-s.quit:
607 timer.Stop()
608 return nil
609 }
610 continue
611 }
612 s.mu.Lock()
613 s.printf("done serving; Accept = %v", err)
614 s.mu.Unlock()
615
616 select {
617 case <-s.quit:
618 return nil
619 default:
620 }
621 return err
622 }
623 tempDelay = 0
624 // Start a new goroutine to deal with rawConn so we don't stall this Accept
625 // loop goroutine.
626 //
627 // Make sure we account for the goroutine so GracefulStop doesn't nil out
628 // s.conns before this conn can be added.
629 s.serveWG.Add(1)
630 go func() {
631 s.handleRawConn(rawConn)
632 s.serveWG.Done()
633 }()
634 }
635}
636
637// handleRawConn forks a goroutine to handle a just-accepted connection that
638// has not had any I/O performed on it yet.
639func (s *Server) handleRawConn(rawConn net.Conn) {
640 rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
641 conn, authInfo, err := s.useTransportAuthenticator(rawConn)
642 if err != nil {
643 // ErrConnDispatched means that the connection was dispatched away from
644 // gRPC; those connections should be left open.
645 if err != credentials.ErrConnDispatched {
646 s.mu.Lock()
647 s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
648 s.mu.Unlock()
649 grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
650 rawConn.Close()
651 }
652 rawConn.SetDeadline(time.Time{})
653 return
654 }
655
656 s.mu.Lock()
657 if s.conns == nil {
658 s.mu.Unlock()
659 conn.Close()
660 return
661 }
662 s.mu.Unlock()
663
664 // Finish handshaking (HTTP2)
665 st := s.newHTTP2Transport(conn, authInfo)
666 if st == nil {
667 return
668 }
669
670 rawConn.SetDeadline(time.Time{})
671 if !s.addConn(st) {
672 return
673 }
674 go func() {
675 s.serveStreams(st)
676 s.removeConn(st)
677 }()
678}
679
680// newHTTP2Transport sets up a http/2 transport (using the
681// gRPC http2 server transport in transport/http2_server.go).
682func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
683 config := &transport.ServerConfig{
684 MaxStreams: s.opts.maxConcurrentStreams,
685 AuthInfo: authInfo,
686 InTapHandle: s.opts.inTapHandle,
687 StatsHandler: s.opts.statsHandler,
688 KeepaliveParams: s.opts.keepaliveParams,
689 KeepalivePolicy: s.opts.keepalivePolicy,
690 InitialWindowSize: s.opts.initialWindowSize,
691 InitialConnWindowSize: s.opts.initialConnWindowSize,
692 WriteBufferSize: s.opts.writeBufferSize,
693 ReadBufferSize: s.opts.readBufferSize,
694 ChannelzParentID: s.channelzID,
695 MaxHeaderListSize: s.opts.maxHeaderListSize,
696 }
697 st, err := transport.NewServerTransport("http2", c, config)
698 if err != nil {
699 s.mu.Lock()
700 s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
701 s.mu.Unlock()
702 c.Close()
703 grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
704 return nil
705 }
706
707 return st
708}
709
710func (s *Server) serveStreams(st transport.ServerTransport) {
711 defer st.Close()
712 var wg sync.WaitGroup
713 st.HandleStreams(func(stream *transport.Stream) {
714 wg.Add(1)
715 go func() {
716 defer wg.Done()
717 s.handleStream(st, stream, s.traceInfo(st, stream))
718 }()
719 }, func(ctx context.Context, method string) context.Context {
720 if !EnableTracing {
721 return ctx
722 }
723 tr := trace.New("grpc.Recv."+methodFamily(method), method)
724 return trace.NewContext(ctx, tr)
725 })
726 wg.Wait()
727}
728
729var _ http.Handler = (*Server)(nil)
730
731// ServeHTTP implements the Go standard library's http.Handler
732// interface by responding to the gRPC request r, by looking up
733// the requested gRPC method in the gRPC server s.
734//
735// The provided HTTP request must have arrived on an HTTP/2
736// connection. When using the Go standard library's server,
737// practically this means that the Request must also have arrived
738// over TLS.
739//
740// To share one port (such as 443 for https) between gRPC and an
741// existing http.Handler, use a root http.Handler such as:
742//
743// if r.ProtoMajor == 2 && strings.HasPrefix(
744// r.Header.Get("Content-Type"), "application/grpc") {
745// grpcServer.ServeHTTP(w, r)
746// } else {
747// yourMux.ServeHTTP(w, r)
748// }
749//
750// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
751// separate from grpc-go's HTTP/2 server. Performance and features may vary
752// between the two paths. ServeHTTP does not support some gRPC features
753// available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
754// and subject to change.
755func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
756 st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
757 if err != nil {
758 http.Error(w, err.Error(), http.StatusInternalServerError)
759 return
760 }
761 if !s.addConn(st) {
762 return
763 }
764 defer s.removeConn(st)
765 s.serveStreams(st)
766}
767
768// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
769// If tracing is not enabled, it returns nil.
770func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
771 tr, ok := trace.FromContext(stream.Context())
772 if !ok {
773 return nil
774 }
775
776 trInfo = &traceInfo{
777 tr: tr,
778 firstLine: firstLine{
779 client: false,
780 remoteAddr: st.RemoteAddr(),
781 },
782 }
783 if dl, ok := stream.Context().Deadline(); ok {
784 trInfo.firstLine.deadline = time.Until(dl)
785 }
786 return trInfo
787}
788
789func (s *Server) addConn(c io.Closer) bool {
790 s.mu.Lock()
791 defer s.mu.Unlock()
792 if s.conns == nil {
793 c.Close()
794 return false
795 }
796 if s.drain {
797 // Transport added after we drained our existing conns: drain it
798 // immediately.
799 c.(transport.ServerTransport).Drain()
800 }
801 s.conns[c] = true
802 return true
803}
804
805func (s *Server) removeConn(c io.Closer) {
806 s.mu.Lock()
807 defer s.mu.Unlock()
808 if s.conns != nil {
809 delete(s.conns, c)
810 s.cv.Broadcast()
811 }
812}
813
814func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
815 return &channelz.ServerInternalMetric{
816 CallsStarted: atomic.LoadInt64(&s.czData.callsStarted),
817 CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded),
818 CallsFailed: atomic.LoadInt64(&s.czData.callsFailed),
819 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
820 }
821}
822
823func (s *Server) incrCallsStarted() {
824 atomic.AddInt64(&s.czData.callsStarted, 1)
825 atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
826}
827
828func (s *Server) incrCallsSucceeded() {
829 atomic.AddInt64(&s.czData.callsSucceeded, 1)
830}
831
832func (s *Server) incrCallsFailed() {
833 atomic.AddInt64(&s.czData.callsFailed, 1)
834}
835
836func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
837 data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
838 if err != nil {
839 grpclog.Errorln("grpc: server failed to encode response: ", err)
840 return err
841 }
842 compData, err := compress(data, cp, comp)
843 if err != nil {
844 grpclog.Errorln("grpc: server failed to compress response: ", err)
845 return err
846 }
847 hdr, payload := msgHeader(data, compData)
848 // TODO(dfawley): should we be checking len(data) instead?
849 if len(payload) > s.opts.maxSendMessageSize {
850 return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
851 }
852 err = t.Write(stream, hdr, payload, opts)
853 if err == nil && s.opts.statsHandler != nil {
854 s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
855 }
856 return err
857}
858
859func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
860 if channelz.IsOn() {
861 s.incrCallsStarted()
862 defer func() {
863 if err != nil && err != io.EOF {
864 s.incrCallsFailed()
865 } else {
866 s.incrCallsSucceeded()
867 }
868 }()
869 }
870 sh := s.opts.statsHandler
871 if sh != nil {
872 beginTime := time.Now()
873 begin := &stats.Begin{
874 BeginTime: beginTime,
875 }
876 sh.HandleRPC(stream.Context(), begin)
877 defer func() {
878 end := &stats.End{
879 BeginTime: beginTime,
880 EndTime: time.Now(),
881 }
882 if err != nil && err != io.EOF {
883 end.Error = toRPCErr(err)
884 }
885 sh.HandleRPC(stream.Context(), end)
886 }()
887 }
888 if trInfo != nil {
889 defer trInfo.tr.Finish()
890 trInfo.tr.LazyLog(&trInfo.firstLine, false)
891 defer func() {
892 if err != nil && err != io.EOF {
893 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
894 trInfo.tr.SetError()
895 }
896 }()
897 }
898
899 binlog := binarylog.GetMethodLogger(stream.Method())
900 if binlog != nil {
901 ctx := stream.Context()
902 md, _ := metadata.FromIncomingContext(ctx)
903 logEntry := &binarylog.ClientHeader{
904 Header: md,
905 MethodName: stream.Method(),
906 PeerAddr: nil,
907 }
908 if deadline, ok := ctx.Deadline(); ok {
909 logEntry.Timeout = time.Until(deadline)
910 if logEntry.Timeout < 0 {
911 logEntry.Timeout = 0
912 }
913 }
914 if a := md[":authority"]; len(a) > 0 {
915 logEntry.Authority = a[0]
916 }
917 if peer, ok := peer.FromContext(ctx); ok {
918 logEntry.PeerAddr = peer.Addr
919 }
920 binlog.Log(logEntry)
921 }
922
923 // comp and cp are used for compression. decomp and dc are used for
924 // decompression. If comp and decomp are both set, they are the same;
925 // however they are kept separate to ensure that at most one of the
926 // compressor/decompressor variable pairs are set for use later.
927 var comp, decomp encoding.Compressor
928 var cp Compressor
929 var dc Decompressor
930
931 // If dc is set and matches the stream's compression, use it. Otherwise, try
932 // to find a matching registered compressor for decomp.
933 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
934 dc = s.opts.dc
935 } else if rc != "" && rc != encoding.Identity {
936 decomp = encoding.GetCompressor(rc)
937 if decomp == nil {
938 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
939 t.WriteStatus(stream, st)
940 return st.Err()
941 }
942 }
943
944 // If cp is set, use it. Otherwise, attempt to compress the response using
945 // the incoming message compression method.
946 //
947 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
948 if s.opts.cp != nil {
949 cp = s.opts.cp
950 stream.SetSendCompress(cp.Type())
951 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
952 // Legacy compressor not specified; attempt to respond with same encoding.
953 comp = encoding.GetCompressor(rc)
954 if comp != nil {
955 stream.SetSendCompress(rc)
956 }
957 }
958
959 var payInfo *payloadInfo
960 if sh != nil || binlog != nil {
961 payInfo = &payloadInfo{}
962 }
963 d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
964 if err != nil {
965 if st, ok := status.FromError(err); ok {
966 if e := t.WriteStatus(stream, st); e != nil {
967 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
968 }
969 }
970 return err
971 }
972 if channelz.IsOn() {
973 t.IncrMsgRecv()
974 }
975 df := func(v interface{}) error {
976 if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
977 return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
978 }
979 if sh != nil {
980 sh.HandleRPC(stream.Context(), &stats.InPayload{
981 RecvTime: time.Now(),
982 Payload: v,
983 Data: d,
984 Length: len(d),
985 })
986 }
987 if binlog != nil {
988 binlog.Log(&binarylog.ClientMessage{
989 Message: d,
990 })
991 }
992 if trInfo != nil {
993 trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
994 }
995 return nil
996 }
997 ctx := NewContextWithServerTransportStream(stream.Context(), stream)
998 reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
999 if appErr != nil {
1000 appStatus, ok := status.FromError(appErr)
1001 if !ok {
1002 // Convert appErr if it is not a grpc status error.
1003 appErr = status.Error(codes.Unknown, appErr.Error())
1004 appStatus, _ = status.FromError(appErr)
1005 }
1006 if trInfo != nil {
1007 trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1008 trInfo.tr.SetError()
1009 }
1010 if e := t.WriteStatus(stream, appStatus); e != nil {
1011 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
1012 }
1013 if binlog != nil {
1014 if h, _ := stream.Header(); h.Len() > 0 {
1015 // Only log serverHeader if there was header. Otherwise it can
1016 // be trailer only.
1017 binlog.Log(&binarylog.ServerHeader{
1018 Header: h,
1019 })
1020 }
1021 binlog.Log(&binarylog.ServerTrailer{
1022 Trailer: stream.Trailer(),
1023 Err: appErr,
1024 })
1025 }
1026 return appErr
1027 }
1028 if trInfo != nil {
1029 trInfo.tr.LazyLog(stringer("OK"), false)
1030 }
1031 opts := &transport.Options{Last: true}
1032
1033 if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
1034 if err == io.EOF {
1035 // The entire stream is done (for unary RPC only).
1036 return err
1037 }
1038 if s, ok := status.FromError(err); ok {
1039 if e := t.WriteStatus(stream, s); e != nil {
1040 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
1041 }
1042 } else {
1043 switch st := err.(type) {
1044 case transport.ConnectionError:
1045 // Nothing to do here.
1046 default:
1047 panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
1048 }
1049 }
1050 if binlog != nil {
1051 h, _ := stream.Header()
1052 binlog.Log(&binarylog.ServerHeader{
1053 Header: h,
1054 })
1055 binlog.Log(&binarylog.ServerTrailer{
1056 Trailer: stream.Trailer(),
1057 Err: appErr,
1058 })
1059 }
1060 return err
1061 }
1062 if binlog != nil {
1063 h, _ := stream.Header()
1064 binlog.Log(&binarylog.ServerHeader{
1065 Header: h,
1066 })
1067 binlog.Log(&binarylog.ServerMessage{
1068 Message: reply,
1069 })
1070 }
1071 if channelz.IsOn() {
1072 t.IncrMsgSent()
1073 }
1074 if trInfo != nil {
1075 trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
1076 }
1077 // TODO: Should we be logging if writing status failed here, like above?
1078 // Should the logging be in WriteStatus? Should we ignore the WriteStatus
1079 // error or allow the stats handler to see it?
1080 err = t.WriteStatus(stream, status.New(codes.OK, ""))
1081 if binlog != nil {
1082 binlog.Log(&binarylog.ServerTrailer{
1083 Trailer: stream.Trailer(),
1084 Err: appErr,
1085 })
1086 }
1087 return err
1088}
1089
1090func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
1091 if channelz.IsOn() {
1092 s.incrCallsStarted()
1093 defer func() {
1094 if err != nil && err != io.EOF {
1095 s.incrCallsFailed()
1096 } else {
1097 s.incrCallsSucceeded()
1098 }
1099 }()
1100 }
1101 sh := s.opts.statsHandler
1102 if sh != nil {
1103 beginTime := time.Now()
1104 begin := &stats.Begin{
1105 BeginTime: beginTime,
1106 }
1107 sh.HandleRPC(stream.Context(), begin)
1108 defer func() {
1109 end := &stats.End{
1110 BeginTime: beginTime,
1111 EndTime: time.Now(),
1112 }
1113 if err != nil && err != io.EOF {
1114 end.Error = toRPCErr(err)
1115 }
1116 sh.HandleRPC(stream.Context(), end)
1117 }()
1118 }
1119 ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1120 ss := &serverStream{
1121 ctx: ctx,
1122 t: t,
1123 s: stream,
1124 p: &parser{r: stream},
1125 codec: s.getCodec(stream.ContentSubtype()),
1126 maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
1127 maxSendMessageSize: s.opts.maxSendMessageSize,
1128 trInfo: trInfo,
1129 statsHandler: sh,
1130 }
1131
1132 ss.binlog = binarylog.GetMethodLogger(stream.Method())
1133 if ss.binlog != nil {
1134 md, _ := metadata.FromIncomingContext(ctx)
1135 logEntry := &binarylog.ClientHeader{
1136 Header: md,
1137 MethodName: stream.Method(),
1138 PeerAddr: nil,
1139 }
1140 if deadline, ok := ctx.Deadline(); ok {
1141 logEntry.Timeout = time.Until(deadline)
1142 if logEntry.Timeout < 0 {
1143 logEntry.Timeout = 0
1144 }
1145 }
1146 if a := md[":authority"]; len(a) > 0 {
1147 logEntry.Authority = a[0]
1148 }
1149 if peer, ok := peer.FromContext(ss.Context()); ok {
1150 logEntry.PeerAddr = peer.Addr
1151 }
1152 ss.binlog.Log(logEntry)
1153 }
1154
1155 // If dc is set and matches the stream's compression, use it. Otherwise, try
1156 // to find a matching registered compressor for decomp.
1157 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1158 ss.dc = s.opts.dc
1159 } else if rc != "" && rc != encoding.Identity {
1160 ss.decomp = encoding.GetCompressor(rc)
1161 if ss.decomp == nil {
1162 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1163 t.WriteStatus(ss.s, st)
1164 return st.Err()
1165 }
1166 }
1167
1168 // If cp is set, use it. Otherwise, attempt to compress the response using
1169 // the incoming message compression method.
1170 //
1171 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1172 if s.opts.cp != nil {
1173 ss.cp = s.opts.cp
1174 stream.SetSendCompress(s.opts.cp.Type())
1175 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1176 // Legacy compressor not specified; attempt to respond with same encoding.
1177 ss.comp = encoding.GetCompressor(rc)
1178 if ss.comp != nil {
1179 stream.SetSendCompress(rc)
1180 }
1181 }
1182
1183 if trInfo != nil {
1184 trInfo.tr.LazyLog(&trInfo.firstLine, false)
1185 defer func() {
1186 ss.mu.Lock()
1187 if err != nil && err != io.EOF {
1188 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1189 ss.trInfo.tr.SetError()
1190 }
1191 ss.trInfo.tr.Finish()
1192 ss.trInfo.tr = nil
1193 ss.mu.Unlock()
1194 }()
1195 }
1196 var appErr error
1197 var server interface{}
1198 if srv != nil {
1199 server = srv.server
1200 }
1201 if s.opts.streamInt == nil {
1202 appErr = sd.Handler(server, ss)
1203 } else {
1204 info := &StreamServerInfo{
1205 FullMethod: stream.Method(),
1206 IsClientStream: sd.ClientStreams,
1207 IsServerStream: sd.ServerStreams,
1208 }
1209 appErr = s.opts.streamInt(server, ss, info, sd.Handler)
1210 }
1211 if appErr != nil {
1212 appStatus, ok := status.FromError(appErr)
1213 if !ok {
1214 appStatus = status.New(codes.Unknown, appErr.Error())
1215 appErr = appStatus.Err()
1216 }
1217 if trInfo != nil {
1218 ss.mu.Lock()
1219 ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1220 ss.trInfo.tr.SetError()
1221 ss.mu.Unlock()
1222 }
1223 t.WriteStatus(ss.s, appStatus)
1224 if ss.binlog != nil {
1225 ss.binlog.Log(&binarylog.ServerTrailer{
1226 Trailer: ss.s.Trailer(),
1227 Err: appErr,
1228 })
1229 }
1230 // TODO: Should we log an error from WriteStatus here and below?
1231 return appErr
1232 }
1233 if trInfo != nil {
1234 ss.mu.Lock()
1235 ss.trInfo.tr.LazyLog(stringer("OK"), false)
1236 ss.mu.Unlock()
1237 }
1238 err = t.WriteStatus(ss.s, status.New(codes.OK, ""))
1239 if ss.binlog != nil {
1240 ss.binlog.Log(&binarylog.ServerTrailer{
1241 Trailer: ss.s.Trailer(),
1242 Err: appErr,
1243 })
1244 }
1245 return err
1246}
1247
1248func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
1249 sm := stream.Method()
1250 if sm != "" && sm[0] == '/' {
1251 sm = sm[1:]
1252 }
1253 pos := strings.LastIndex(sm, "/")
1254 if pos == -1 {
1255 if trInfo != nil {
1256 trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
1257 trInfo.tr.SetError()
1258 }
1259 errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
1260 if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
1261 if trInfo != nil {
1262 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1263 trInfo.tr.SetError()
1264 }
1265 grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
1266 }
1267 if trInfo != nil {
1268 trInfo.tr.Finish()
1269 }
1270 return
1271 }
1272 service := sm[:pos]
1273 method := sm[pos+1:]
1274
1275 srv, knownService := s.m[service]
1276 if knownService {
1277 if md, ok := srv.md[method]; ok {
1278 s.processUnaryRPC(t, stream, srv, md, trInfo)
1279 return
1280 }
1281 if sd, ok := srv.sd[method]; ok {
1282 s.processStreamingRPC(t, stream, srv, sd, trInfo)
1283 return
1284 }
1285 }
1286 // Unknown service, or known server unknown method.
1287 if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
1288 s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
1289 return
1290 }
1291 var errDesc string
1292 if !knownService {
1293 errDesc = fmt.Sprintf("unknown service %v", service)
1294 } else {
1295 errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
1296 }
1297 if trInfo != nil {
1298 trInfo.tr.LazyPrintf("%s", errDesc)
1299 trInfo.tr.SetError()
1300 }
1301 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1302 if trInfo != nil {
1303 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1304 trInfo.tr.SetError()
1305 }
1306 grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
1307 }
1308 if trInfo != nil {
1309 trInfo.tr.Finish()
1310 }
1311}
1312
1313// The key to save ServerTransportStream in the context.
1314type streamKey struct{}
1315
1316// NewContextWithServerTransportStream creates a new context from ctx and
1317// attaches stream to it.
1318//
1319// This API is EXPERIMENTAL.
1320func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
1321 return context.WithValue(ctx, streamKey{}, stream)
1322}
1323
1324// ServerTransportStream is a minimal interface that a transport stream must
1325// implement. This can be used to mock an actual transport stream for tests of
1326// handler code that use, for example, grpc.SetHeader (which requires some
1327// stream to be in context).
1328//
1329// See also NewContextWithServerTransportStream.
1330//
1331// This API is EXPERIMENTAL.
1332type ServerTransportStream interface {
1333 Method() string
1334 SetHeader(md metadata.MD) error
1335 SendHeader(md metadata.MD) error
1336 SetTrailer(md metadata.MD) error
1337}
1338
1339// ServerTransportStreamFromContext returns the ServerTransportStream saved in
1340// ctx. Returns nil if the given context has no stream associated with it
1341// (which implies it is not an RPC invocation context).
1342//
1343// This API is EXPERIMENTAL.
1344func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
1345 s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
1346 return s
1347}
1348
1349// Stop stops the gRPC server. It immediately closes all open
1350// connections and listeners.
1351// It cancels all active RPCs on the server side and the corresponding
1352// pending RPCs on the client side will get notified by connection
1353// errors.
1354func (s *Server) Stop() {
1355 s.quitOnce.Do(func() {
1356 close(s.quit)
1357 })
1358
1359 defer func() {
1360 s.serveWG.Wait()
1361 s.doneOnce.Do(func() {
1362 close(s.done)
1363 })
1364 }()
1365
1366 s.channelzRemoveOnce.Do(func() {
1367 if channelz.IsOn() {
1368 channelz.RemoveEntry(s.channelzID)
1369 }
1370 })
1371
1372 s.mu.Lock()
1373 listeners := s.lis
1374 s.lis = nil
1375 st := s.conns
1376 s.conns = nil
1377 // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
1378 s.cv.Broadcast()
1379 s.mu.Unlock()
1380
1381 for lis := range listeners {
1382 lis.Close()
1383 }
1384 for c := range st {
1385 c.Close()
1386 }
1387
1388 s.mu.Lock()
1389 if s.events != nil {
1390 s.events.Finish()
1391 s.events = nil
1392 }
1393 s.mu.Unlock()
1394}
1395
1396// GracefulStop stops the gRPC server gracefully. It stops the server from
1397// accepting new connections and RPCs and blocks until all the pending RPCs are
1398// finished.
1399func (s *Server) GracefulStop() {
1400 s.quitOnce.Do(func() {
1401 close(s.quit)
1402 })
1403
1404 defer func() {
1405 s.doneOnce.Do(func() {
1406 close(s.done)
1407 })
1408 }()
1409
1410 s.channelzRemoveOnce.Do(func() {
1411 if channelz.IsOn() {
1412 channelz.RemoveEntry(s.channelzID)
1413 }
1414 })
1415 s.mu.Lock()
1416 if s.conns == nil {
1417 s.mu.Unlock()
1418 return
1419 }
1420
1421 for lis := range s.lis {
1422 lis.Close()
1423 }
1424 s.lis = nil
1425 if !s.drain {
1426 for c := range s.conns {
1427 c.(transport.ServerTransport).Drain()
1428 }
1429 s.drain = true
1430 }
1431
1432 // Wait for serving threads to be ready to exit. Only then can we be sure no
1433 // new conns will be created.
1434 s.mu.Unlock()
1435 s.serveWG.Wait()
1436 s.mu.Lock()
1437
1438 for len(s.conns) != 0 {
1439 s.cv.Wait()
1440 }
1441 s.conns = nil
1442 if s.events != nil {
1443 s.events.Finish()
1444 s.events = nil
1445 }
1446 s.mu.Unlock()
1447}
1448
1449// contentSubtype must be lowercase
1450// cannot return nil
1451func (s *Server) getCodec(contentSubtype string) baseCodec {
1452 if s.opts.codec != nil {
1453 return s.opts.codec
1454 }
1455 if contentSubtype == "" {
1456 return encoding.GetCodec(proto.Name)
1457 }
1458 codec := encoding.GetCodec(contentSubtype)
1459 if codec == nil {
1460 return encoding.GetCodec(proto.Name)
1461 }
1462 return codec
1463}
1464
1465// SetHeader sets the header metadata.
1466// When called multiple times, all the provided metadata will be merged.
1467// All the metadata will be sent out when one of the following happens:
1468// - grpc.SendHeader() is called;
1469// - The first response is sent out;
1470// - An RPC status is sent out (error or success).
1471func SetHeader(ctx context.Context, md metadata.MD) error {
1472 if md.Len() == 0 {
1473 return nil
1474 }
1475 stream := ServerTransportStreamFromContext(ctx)
1476 if stream == nil {
1477 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1478 }
1479 return stream.SetHeader(md)
1480}
1481
1482// SendHeader sends header metadata. It may be called at most once.
1483// The provided md and headers set by SetHeader() will be sent.
1484func SendHeader(ctx context.Context, md metadata.MD) error {
1485 stream := ServerTransportStreamFromContext(ctx)
1486 if stream == nil {
1487 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1488 }
1489 if err := stream.SendHeader(md); err != nil {
1490 return toRPCErr(err)
1491 }
1492 return nil
1493}
1494
1495// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
1496// When called more than once, all the provided metadata will be merged.
1497func SetTrailer(ctx context.Context, md metadata.MD) error {
1498 if md.Len() == 0 {
1499 return nil
1500 }
1501 stream := ServerTransportStreamFromContext(ctx)
1502 if stream == nil {
1503 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1504 }
1505 return stream.SetTrailer(md)
1506}
1507
1508// Method returns the method string for the server context. The returned
1509// string is in the format of "/service/method".
1510func Method(ctx context.Context) (string, bool) {
1511 s := ServerTransportStreamFromContext(ctx)
1512 if s == nil {
1513 return "", false
1514 }
1515 return s.Method(), true
1516}
1517
1518type channelzServer struct {
1519 s *Server
1520}
1521
1522func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
1523 return c.s.channelzMetric()
1524}