blob: 617289e2e3602839902b01e3709ba0de7d1ccd76 [file] [log] [blame]
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -07001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "fmt"
25 "io"
26 "math"
27 "net"
28 "net/http"
29 "reflect"
30 "runtime"
31 "strings"
32 "sync"
33 "sync/atomic"
34 "time"
35
36 "golang.org/x/net/trace"
37
38 "google.golang.org/grpc/codes"
39 "google.golang.org/grpc/credentials"
40 "google.golang.org/grpc/encoding"
41 "google.golang.org/grpc/encoding/proto"
42 "google.golang.org/grpc/grpclog"
43 "google.golang.org/grpc/internal/binarylog"
44 "google.golang.org/grpc/internal/channelz"
45 "google.golang.org/grpc/internal/transport"
46 "google.golang.org/grpc/keepalive"
47 "google.golang.org/grpc/metadata"
48 "google.golang.org/grpc/peer"
49 "google.golang.org/grpc/stats"
50 "google.golang.org/grpc/status"
51 "google.golang.org/grpc/tap"
52)
53
54const (
55 defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
56 defaultServerMaxSendMessageSize = math.MaxInt32
57)
58
59type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
60
61// MethodDesc represents an RPC service's method specification.
62type MethodDesc struct {
63 MethodName string
64 Handler methodHandler
65}
66
67// ServiceDesc represents an RPC service's specification.
68type ServiceDesc struct {
69 ServiceName string
70 // The pointer to the service interface. Used to check whether the user
71 // provided implementation satisfies the interface requirements.
72 HandlerType interface{}
73 Methods []MethodDesc
74 Streams []StreamDesc
75 Metadata interface{}
76}
77
78// service consists of the information of the server serving this service and
79// the methods in this service.
80type service struct {
81 server interface{} // the server for service methods
82 md map[string]*MethodDesc
83 sd map[string]*StreamDesc
84 mdata interface{}
85}
86
87// Server is a gRPC server to serve RPC requests.
88type Server struct {
89 opts serverOptions
90
91 mu sync.Mutex // guards following
92 lis map[net.Listener]bool
93 conns map[transport.ServerTransport]bool
94 serve bool
95 drain bool
96 cv *sync.Cond // signaled when connections close for GracefulStop
97 m map[string]*service // service name -> service info
98 events trace.EventLog
99
100 quit chan struct{}
101 done chan struct{}
102 quitOnce sync.Once
103 doneOnce sync.Once
104 channelzRemoveOnce sync.Once
105 serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
106
107 channelzID int64 // channelz unique identification number
108 czData *channelzData
109}
110
111type serverOptions struct {
112 creds credentials.TransportCredentials
113 codec baseCodec
114 cp Compressor
115 dc Decompressor
116 unaryInt UnaryServerInterceptor
117 streamInt StreamServerInterceptor
118 inTapHandle tap.ServerInHandle
119 statsHandler stats.Handler
120 maxConcurrentStreams uint32
121 maxReceiveMessageSize int
122 maxSendMessageSize int
123 unknownStreamDesc *StreamDesc
124 keepaliveParams keepalive.ServerParameters
125 keepalivePolicy keepalive.EnforcementPolicy
126 initialWindowSize int32
127 initialConnWindowSize int32
128 writeBufferSize int
129 readBufferSize int
130 connectionTimeout time.Duration
131 maxHeaderListSize *uint32
132}
133
134var defaultServerOptions = serverOptions{
135 maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
136 maxSendMessageSize: defaultServerMaxSendMessageSize,
137 connectionTimeout: 120 * time.Second,
138 writeBufferSize: defaultWriteBufSize,
139 readBufferSize: defaultReadBufSize,
140}
141
142// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
143type ServerOption interface {
144 apply(*serverOptions)
145}
146
147// EmptyServerOption does not alter the server configuration. It can be embedded
148// in another structure to build custom server options.
149//
150// This API is EXPERIMENTAL.
151type EmptyServerOption struct{}
152
153func (EmptyServerOption) apply(*serverOptions) {}
154
155// funcServerOption wraps a function that modifies serverOptions into an
156// implementation of the ServerOption interface.
157type funcServerOption struct {
158 f func(*serverOptions)
159}
160
161func (fdo *funcServerOption) apply(do *serverOptions) {
162 fdo.f(do)
163}
164
165func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
166 return &funcServerOption{
167 f: f,
168 }
169}
170
171// WriteBufferSize determines how much data can be batched before doing a write on the wire.
172// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
173// The default value for this buffer is 32KB.
174// Zero will disable the write buffer such that each write will be on underlying connection.
175// Note: A Send call may not directly translate to a write.
176func WriteBufferSize(s int) ServerOption {
177 return newFuncServerOption(func(o *serverOptions) {
178 o.writeBufferSize = s
179 })
180}
181
182// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
183// for one read syscall.
184// The default value for this buffer is 32KB.
185// Zero will disable read buffer for a connection so data framer can access the underlying
186// conn directly.
187func ReadBufferSize(s int) ServerOption {
188 return newFuncServerOption(func(o *serverOptions) {
189 o.readBufferSize = s
190 })
191}
192
193// InitialWindowSize returns a ServerOption that sets window size for stream.
194// The lower bound for window size is 64K and any value smaller than that will be ignored.
195func InitialWindowSize(s int32) ServerOption {
196 return newFuncServerOption(func(o *serverOptions) {
197 o.initialWindowSize = s
198 })
199}
200
201// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
202// The lower bound for window size is 64K and any value smaller than that will be ignored.
203func InitialConnWindowSize(s int32) ServerOption {
204 return newFuncServerOption(func(o *serverOptions) {
205 o.initialConnWindowSize = s
206 })
207}
208
209// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
210func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
211 if kp.Time > 0 && kp.Time < time.Second {
212 grpclog.Warning("Adjusting keepalive ping interval to minimum period of 1s")
213 kp.Time = time.Second
214 }
215
216 return newFuncServerOption(func(o *serverOptions) {
217 o.keepaliveParams = kp
218 })
219}
220
221// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
222func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
223 return newFuncServerOption(func(o *serverOptions) {
224 o.keepalivePolicy = kep
225 })
226}
227
228// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
229//
230// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
231func CustomCodec(codec Codec) ServerOption {
232 return newFuncServerOption(func(o *serverOptions) {
233 o.codec = codec
234 })
235}
236
237// RPCCompressor returns a ServerOption that sets a compressor for outbound
238// messages. For backward compatibility, all outbound messages will be sent
239// using this compressor, regardless of incoming message compression. By
240// default, server messages will be sent using the same compressor with which
241// request messages were sent.
242//
243// Deprecated: use encoding.RegisterCompressor instead.
244func RPCCompressor(cp Compressor) ServerOption {
245 return newFuncServerOption(func(o *serverOptions) {
246 o.cp = cp
247 })
248}
249
250// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
251// messages. It has higher priority than decompressors registered via
252// encoding.RegisterCompressor.
253//
254// Deprecated: use encoding.RegisterCompressor instead.
255func RPCDecompressor(dc Decompressor) ServerOption {
256 return newFuncServerOption(func(o *serverOptions) {
257 o.dc = dc
258 })
259}
260
261// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
262// If this is not set, gRPC uses the default limit.
263//
264// Deprecated: use MaxRecvMsgSize instead.
265func MaxMsgSize(m int) ServerOption {
266 return MaxRecvMsgSize(m)
267}
268
269// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
270// If this is not set, gRPC uses the default 4MB.
271func MaxRecvMsgSize(m int) ServerOption {
272 return newFuncServerOption(func(o *serverOptions) {
273 o.maxReceiveMessageSize = m
274 })
275}
276
277// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
278// If this is not set, gRPC uses the default `math.MaxInt32`.
279func MaxSendMsgSize(m int) ServerOption {
280 return newFuncServerOption(func(o *serverOptions) {
281 o.maxSendMessageSize = m
282 })
283}
284
285// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
286// of concurrent streams to each ServerTransport.
287func MaxConcurrentStreams(n uint32) ServerOption {
288 return newFuncServerOption(func(o *serverOptions) {
289 o.maxConcurrentStreams = n
290 })
291}
292
293// Creds returns a ServerOption that sets credentials for server connections.
294func Creds(c credentials.TransportCredentials) ServerOption {
295 return newFuncServerOption(func(o *serverOptions) {
296 o.creds = c
297 })
298}
299
300// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
301// server. Only one unary interceptor can be installed. The construction of multiple
302// interceptors (e.g., chaining) can be implemented at the caller.
303func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
304 return newFuncServerOption(func(o *serverOptions) {
305 if o.unaryInt != nil {
306 panic("The unary server interceptor was already set and may not be reset.")
307 }
308 o.unaryInt = i
309 })
310}
311
312// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
313// server. Only one stream interceptor can be installed.
314func StreamInterceptor(i StreamServerInterceptor) ServerOption {
315 return newFuncServerOption(func(o *serverOptions) {
316 if o.streamInt != nil {
317 panic("The stream server interceptor was already set and may not be reset.")
318 }
319 o.streamInt = i
320 })
321}
322
323// InTapHandle returns a ServerOption that sets the tap handle for all the server
324// transport to be created. Only one can be installed.
325func InTapHandle(h tap.ServerInHandle) ServerOption {
326 return newFuncServerOption(func(o *serverOptions) {
327 if o.inTapHandle != nil {
328 panic("The tap handle was already set and may not be reset.")
329 }
330 o.inTapHandle = h
331 })
332}
333
334// StatsHandler returns a ServerOption that sets the stats handler for the server.
335func StatsHandler(h stats.Handler) ServerOption {
336 return newFuncServerOption(func(o *serverOptions) {
337 o.statsHandler = h
338 })
339}
340
341// UnknownServiceHandler returns a ServerOption that allows for adding a custom
342// unknown service handler. The provided method is a bidi-streaming RPC service
343// handler that will be invoked instead of returning the "unimplemented" gRPC
344// error whenever a request is received for an unregistered service or method.
345// The handling function has full access to the Context of the request and the
346// stream, and the invocation bypasses interceptors.
347func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
348 return newFuncServerOption(func(o *serverOptions) {
349 o.unknownStreamDesc = &StreamDesc{
350 StreamName: "unknown_service_handler",
351 Handler: streamHandler,
352 // We need to assume that the users of the streamHandler will want to use both.
353 ClientStreams: true,
354 ServerStreams: true,
355 }
356 })
357}
358
359// ConnectionTimeout returns a ServerOption that sets the timeout for
360// connection establishment (up to and including HTTP/2 handshaking) for all
361// new connections. If this is not set, the default is 120 seconds. A zero or
362// negative value will result in an immediate timeout.
363//
364// This API is EXPERIMENTAL.
365func ConnectionTimeout(d time.Duration) ServerOption {
366 return newFuncServerOption(func(o *serverOptions) {
367 o.connectionTimeout = d
368 })
369}
370
371// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
372// of header list that the server is prepared to accept.
373func MaxHeaderListSize(s uint32) ServerOption {
374 return newFuncServerOption(func(o *serverOptions) {
375 o.maxHeaderListSize = &s
376 })
377}
378
379// NewServer creates a gRPC server which has no service registered and has not
380// started to accept requests yet.
381func NewServer(opt ...ServerOption) *Server {
382 opts := defaultServerOptions
383 for _, o := range opt {
384 o.apply(&opts)
385 }
386 s := &Server{
387 lis: make(map[net.Listener]bool),
388 opts: opts,
389 conns: make(map[transport.ServerTransport]bool),
390 m: make(map[string]*service),
391 quit: make(chan struct{}),
392 done: make(chan struct{}),
393 czData: new(channelzData),
394 }
395 s.cv = sync.NewCond(&s.mu)
396 if EnableTracing {
397 _, file, line, _ := runtime.Caller(1)
398 s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
399 }
400
401 if channelz.IsOn() {
402 s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
403 }
404 return s
405}
406
407// printf records an event in s's event log, unless s has been stopped.
408// REQUIRES s.mu is held.
409func (s *Server) printf(format string, a ...interface{}) {
410 if s.events != nil {
411 s.events.Printf(format, a...)
412 }
413}
414
415// errorf records an error in s's event log, unless s has been stopped.
416// REQUIRES s.mu is held.
417func (s *Server) errorf(format string, a ...interface{}) {
418 if s.events != nil {
419 s.events.Errorf(format, a...)
420 }
421}
422
423// RegisterService registers a service and its implementation to the gRPC
424// server. It is called from the IDL generated code. This must be called before
425// invoking Serve.
426func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
427 ht := reflect.TypeOf(sd.HandlerType).Elem()
428 st := reflect.TypeOf(ss)
429 if !st.Implements(ht) {
430 grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
431 }
432 s.register(sd, ss)
433}
434
435func (s *Server) register(sd *ServiceDesc, ss interface{}) {
436 s.mu.Lock()
437 defer s.mu.Unlock()
438 s.printf("RegisterService(%q)", sd.ServiceName)
439 if s.serve {
440 grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
441 }
442 if _, ok := s.m[sd.ServiceName]; ok {
443 grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
444 }
445 srv := &service{
446 server: ss,
447 md: make(map[string]*MethodDesc),
448 sd: make(map[string]*StreamDesc),
449 mdata: sd.Metadata,
450 }
451 for i := range sd.Methods {
452 d := &sd.Methods[i]
453 srv.md[d.MethodName] = d
454 }
455 for i := range sd.Streams {
456 d := &sd.Streams[i]
457 srv.sd[d.StreamName] = d
458 }
459 s.m[sd.ServiceName] = srv
460}
461
462// MethodInfo contains the information of an RPC including its method name and type.
463type MethodInfo struct {
464 // Name is the method name only, without the service name or package name.
465 Name string
466 // IsClientStream indicates whether the RPC is a client streaming RPC.
467 IsClientStream bool
468 // IsServerStream indicates whether the RPC is a server streaming RPC.
469 IsServerStream bool
470}
471
472// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
473type ServiceInfo struct {
474 Methods []MethodInfo
475 // Metadata is the metadata specified in ServiceDesc when registering service.
476 Metadata interface{}
477}
478
479// GetServiceInfo returns a map from service names to ServiceInfo.
480// Service names include the package names, in the form of <package>.<service>.
481func (s *Server) GetServiceInfo() map[string]ServiceInfo {
482 ret := make(map[string]ServiceInfo)
483 for n, srv := range s.m {
484 methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
485 for m := range srv.md {
486 methods = append(methods, MethodInfo{
487 Name: m,
488 IsClientStream: false,
489 IsServerStream: false,
490 })
491 }
492 for m, d := range srv.sd {
493 methods = append(methods, MethodInfo{
494 Name: m,
495 IsClientStream: d.ClientStreams,
496 IsServerStream: d.ServerStreams,
497 })
498 }
499
500 ret[n] = ServiceInfo{
501 Methods: methods,
502 Metadata: srv.mdata,
503 }
504 }
505 return ret
506}
507
508// ErrServerStopped indicates that the operation is now illegal because of
509// the server being stopped.
510var ErrServerStopped = errors.New("grpc: the server has been stopped")
511
512func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
513 if s.opts.creds == nil {
514 return rawConn, nil, nil
515 }
516 return s.opts.creds.ServerHandshake(rawConn)
517}
518
519type listenSocket struct {
520 net.Listener
521 channelzID int64
522}
523
524func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
525 return &channelz.SocketInternalMetric{
526 SocketOptions: channelz.GetSocketOption(l.Listener),
527 LocalAddr: l.Listener.Addr(),
528 }
529}
530
531func (l *listenSocket) Close() error {
532 err := l.Listener.Close()
533 if channelz.IsOn() {
534 channelz.RemoveEntry(l.channelzID)
535 }
536 return err
537}
538
539// Serve accepts incoming connections on the listener lis, creating a new
540// ServerTransport and service goroutine for each. The service goroutines
541// read gRPC requests and then call the registered handlers to reply to them.
542// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
543// this method returns.
544// Serve will return a non-nil error unless Stop or GracefulStop is called.
545func (s *Server) Serve(lis net.Listener) error {
546 s.mu.Lock()
547 s.printf("serving")
548 s.serve = true
549 if s.lis == nil {
550 // Serve called after Stop or GracefulStop.
551 s.mu.Unlock()
552 lis.Close()
553 return ErrServerStopped
554 }
555
556 s.serveWG.Add(1)
557 defer func() {
558 s.serveWG.Done()
559 select {
560 // Stop or GracefulStop called; block until done and return nil.
561 case <-s.quit:
562 <-s.done
563 default:
564 }
565 }()
566
567 ls := &listenSocket{Listener: lis}
568 s.lis[ls] = true
569
570 if channelz.IsOn() {
571 ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
572 }
573 s.mu.Unlock()
574
575 defer func() {
576 s.mu.Lock()
577 if s.lis != nil && s.lis[ls] {
578 ls.Close()
579 delete(s.lis, ls)
580 }
581 s.mu.Unlock()
582 }()
583
584 var tempDelay time.Duration // how long to sleep on accept failure
585
586 for {
587 rawConn, err := lis.Accept()
588 if err != nil {
589 if ne, ok := err.(interface {
590 Temporary() bool
591 }); ok && ne.Temporary() {
592 if tempDelay == 0 {
593 tempDelay = 5 * time.Millisecond
594 } else {
595 tempDelay *= 2
596 }
597 if max := 1 * time.Second; tempDelay > max {
598 tempDelay = max
599 }
600 s.mu.Lock()
601 s.printf("Accept error: %v; retrying in %v", err, tempDelay)
602 s.mu.Unlock()
603 timer := time.NewTimer(tempDelay)
604 select {
605 case <-timer.C:
606 case <-s.quit:
607 timer.Stop()
608 return nil
609 }
610 continue
611 }
612 s.mu.Lock()
613 s.printf("done serving; Accept = %v", err)
614 s.mu.Unlock()
615
616 select {
617 case <-s.quit:
618 return nil
619 default:
620 }
621 return err
622 }
623 tempDelay = 0
624 // Start a new goroutine to deal with rawConn so we don't stall this Accept
625 // loop goroutine.
626 //
627 // Make sure we account for the goroutine so GracefulStop doesn't nil out
628 // s.conns before this conn can be added.
629 s.serveWG.Add(1)
630 go func() {
631 s.handleRawConn(rawConn)
632 s.serveWG.Done()
633 }()
634 }
635}
636
637// handleRawConn forks a goroutine to handle a just-accepted connection that
638// has not had any I/O performed on it yet.
639func (s *Server) handleRawConn(rawConn net.Conn) {
640 rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
641 conn, authInfo, err := s.useTransportAuthenticator(rawConn)
642 if err != nil {
643 // ErrConnDispatched means that the connection was dispatched away from
644 // gRPC; those connections should be left open.
645 if err != credentials.ErrConnDispatched {
646 s.mu.Lock()
647 s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
648 s.mu.Unlock()
649 grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
650 rawConn.Close()
651 }
652 rawConn.SetDeadline(time.Time{})
653 return
654 }
655
656 s.mu.Lock()
657 if s.conns == nil {
658 s.mu.Unlock()
659 conn.Close()
660 return
661 }
662 s.mu.Unlock()
663
664 // Finish handshaking (HTTP2)
665 st := s.newHTTP2Transport(conn, authInfo)
666 if st == nil {
667 return
668 }
669
670 rawConn.SetDeadline(time.Time{})
671 if !s.addConn(st) {
672 return
673 }
674 go func() {
675 s.serveStreams(st)
676 s.removeConn(st)
677 }()
678}
679
680// newHTTP2Transport sets up a http/2 transport (using the
681// gRPC http2 server transport in transport/http2_server.go).
682func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
683 config := &transport.ServerConfig{
684 MaxStreams: s.opts.maxConcurrentStreams,
685 AuthInfo: authInfo,
686 InTapHandle: s.opts.inTapHandle,
687 StatsHandler: s.opts.statsHandler,
688 KeepaliveParams: s.opts.keepaliveParams,
689 KeepalivePolicy: s.opts.keepalivePolicy,
690 InitialWindowSize: s.opts.initialWindowSize,
691 InitialConnWindowSize: s.opts.initialConnWindowSize,
692 WriteBufferSize: s.opts.writeBufferSize,
693 ReadBufferSize: s.opts.readBufferSize,
694 ChannelzParentID: s.channelzID,
695 MaxHeaderListSize: s.opts.maxHeaderListSize,
696 }
697 st, err := transport.NewServerTransport("http2", c, config)
698 if err != nil {
699 s.mu.Lock()
700 s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
701 s.mu.Unlock()
702 c.Close()
703 grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
704 return nil
705 }
706
707 return st
708}
709
710func (s *Server) serveStreams(st transport.ServerTransport) {
711 defer st.Close()
712 var wg sync.WaitGroup
713 st.HandleStreams(func(stream *transport.Stream) {
714 wg.Add(1)
715 go func() {
716 defer wg.Done()
717 s.handleStream(st, stream, s.traceInfo(st, stream))
718 }()
719 }, func(ctx context.Context, method string) context.Context {
720 if !EnableTracing {
721 return ctx
722 }
723 tr := trace.New("grpc.Recv."+methodFamily(method), method)
724 return trace.NewContext(ctx, tr)
725 })
726 wg.Wait()
727}
728
729var _ http.Handler = (*Server)(nil)
730
731// ServeHTTP implements the Go standard library's http.Handler
732// interface by responding to the gRPC request r, by looking up
733// the requested gRPC method in the gRPC server s.
734//
735// The provided HTTP request must have arrived on an HTTP/2
736// connection. When using the Go standard library's server,
737// practically this means that the Request must also have arrived
738// over TLS.
739//
740// To share one port (such as 443 for https) between gRPC and an
741// existing http.Handler, use a root http.Handler such as:
742//
743// if r.ProtoMajor == 2 && strings.HasPrefix(
744// r.Header.Get("Content-Type"), "application/grpc") {
745// grpcServer.ServeHTTP(w, r)
746// } else {
747// yourMux.ServeHTTP(w, r)
748// }
749//
750// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
751// separate from grpc-go's HTTP/2 server. Performance and features may vary
752// between the two paths. ServeHTTP does not support some gRPC features
753// available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
754// and subject to change.
755func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
756 st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
757 if err != nil {
758 http.Error(w, err.Error(), http.StatusInternalServerError)
759 return
760 }
761 if !s.addConn(st) {
762 return
763 }
764 defer s.removeConn(st)
765 s.serveStreams(st)
766}
767
768// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
769// If tracing is not enabled, it returns nil.
770func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
771 tr, ok := trace.FromContext(stream.Context())
772 if !ok {
773 return nil
774 }
775
776 trInfo = &traceInfo{
777 tr: tr,
778 firstLine: firstLine{
779 client: false,
780 remoteAddr: st.RemoteAddr(),
781 },
782 }
783 if dl, ok := stream.Context().Deadline(); ok {
784 trInfo.firstLine.deadline = time.Until(dl)
785 }
786 return trInfo
787}
788
789func (s *Server) addConn(st transport.ServerTransport) bool {
790 s.mu.Lock()
791 defer s.mu.Unlock()
792 if s.conns == nil {
793 st.Close()
794 return false
795 }
796 if s.drain {
797 // Transport added after we drained our existing conns: drain it
798 // immediately.
799 st.Drain()
800 }
801 s.conns[st] = true
802 return true
803}
804
805func (s *Server) removeConn(st transport.ServerTransport) {
806 s.mu.Lock()
807 defer s.mu.Unlock()
808 if s.conns != nil {
809 delete(s.conns, st)
810 s.cv.Broadcast()
811 }
812}
813
814func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
815 return &channelz.ServerInternalMetric{
816 CallsStarted: atomic.LoadInt64(&s.czData.callsStarted),
817 CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded),
818 CallsFailed: atomic.LoadInt64(&s.czData.callsFailed),
819 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
820 }
821}
822
823func (s *Server) incrCallsStarted() {
824 atomic.AddInt64(&s.czData.callsStarted, 1)
825 atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
826}
827
828func (s *Server) incrCallsSucceeded() {
829 atomic.AddInt64(&s.czData.callsSucceeded, 1)
830}
831
832func (s *Server) incrCallsFailed() {
833 atomic.AddInt64(&s.czData.callsFailed, 1)
834}
835
836func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
837 data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
838 if err != nil {
839 grpclog.Errorln("grpc: server failed to encode response: ", err)
840 return err
841 }
842 compData, err := compress(data, cp, comp)
843 if err != nil {
844 grpclog.Errorln("grpc: server failed to compress response: ", err)
845 return err
846 }
847 hdr, payload := msgHeader(data, compData)
848 // TODO(dfawley): should we be checking len(data) instead?
849 if len(payload) > s.opts.maxSendMessageSize {
850 return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
851 }
852 err = t.Write(stream, hdr, payload, opts)
853 if err == nil && s.opts.statsHandler != nil {
854 s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
855 }
856 return err
857}
858
859func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
860 if channelz.IsOn() {
861 s.incrCallsStarted()
862 defer func() {
863 if err != nil && err != io.EOF {
864 s.incrCallsFailed()
865 } else {
866 s.incrCallsSucceeded()
867 }
868 }()
869 }
870 sh := s.opts.statsHandler
871 if sh != nil {
872 beginTime := time.Now()
873 begin := &stats.Begin{
874 BeginTime: beginTime,
875 }
876 sh.HandleRPC(stream.Context(), begin)
877 defer func() {
878 end := &stats.End{
879 BeginTime: beginTime,
880 EndTime: time.Now(),
881 }
882 if err != nil && err != io.EOF {
883 end.Error = toRPCErr(err)
884 }
885 sh.HandleRPC(stream.Context(), end)
886 }()
887 }
888 if trInfo != nil {
889 defer trInfo.tr.Finish()
890 trInfo.tr.LazyLog(&trInfo.firstLine, false)
891 defer func() {
892 if err != nil && err != io.EOF {
893 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
894 trInfo.tr.SetError()
895 }
896 }()
897 }
898
899 binlog := binarylog.GetMethodLogger(stream.Method())
900 if binlog != nil {
901 ctx := stream.Context()
902 md, _ := metadata.FromIncomingContext(ctx)
903 logEntry := &binarylog.ClientHeader{
904 Header: md,
905 MethodName: stream.Method(),
906 PeerAddr: nil,
907 }
908 if deadline, ok := ctx.Deadline(); ok {
909 logEntry.Timeout = time.Until(deadline)
910 if logEntry.Timeout < 0 {
911 logEntry.Timeout = 0
912 }
913 }
914 if a := md[":authority"]; len(a) > 0 {
915 logEntry.Authority = a[0]
916 }
917 if peer, ok := peer.FromContext(ctx); ok {
918 logEntry.PeerAddr = peer.Addr
919 }
920 binlog.Log(logEntry)
921 }
922
923 // comp and cp are used for compression. decomp and dc are used for
924 // decompression. If comp and decomp are both set, they are the same;
925 // however they are kept separate to ensure that at most one of the
926 // compressor/decompressor variable pairs are set for use later.
927 var comp, decomp encoding.Compressor
928 var cp Compressor
929 var dc Decompressor
930
931 // If dc is set and matches the stream's compression, use it. Otherwise, try
932 // to find a matching registered compressor for decomp.
933 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
934 dc = s.opts.dc
935 } else if rc != "" && rc != encoding.Identity {
936 decomp = encoding.GetCompressor(rc)
937 if decomp == nil {
938 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
939 t.WriteStatus(stream, st)
940 return st.Err()
941 }
942 }
943
944 // If cp is set, use it. Otherwise, attempt to compress the response using
945 // the incoming message compression method.
946 //
947 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
948 if s.opts.cp != nil {
949 cp = s.opts.cp
950 stream.SetSendCompress(cp.Type())
951 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
952 // Legacy compressor not specified; attempt to respond with same encoding.
953 comp = encoding.GetCompressor(rc)
954 if comp != nil {
955 stream.SetSendCompress(rc)
956 }
957 }
958
959 var payInfo *payloadInfo
960 if sh != nil || binlog != nil {
961 payInfo = &payloadInfo{}
962 }
963 d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
964 if err != nil {
965 if st, ok := status.FromError(err); ok {
966 if e := t.WriteStatus(stream, st); e != nil {
967 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
968 }
969 }
970 return err
971 }
972 if channelz.IsOn() {
973 t.IncrMsgRecv()
974 }
975 df := func(v interface{}) error {
976 if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
977 return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
978 }
979 if sh != nil {
980 sh.HandleRPC(stream.Context(), &stats.InPayload{
981 RecvTime: time.Now(),
982 Payload: v,
983 WireLength: payInfo.wireLength,
984 Data: d,
985 Length: len(d),
986 })
987 }
988 if binlog != nil {
989 binlog.Log(&binarylog.ClientMessage{
990 Message: d,
991 })
992 }
993 if trInfo != nil {
994 trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
995 }
996 return nil
997 }
998 ctx := NewContextWithServerTransportStream(stream.Context(), stream)
999 reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
1000 if appErr != nil {
1001 appStatus, ok := status.FromError(appErr)
1002 if !ok {
1003 // Convert appErr if it is not a grpc status error.
1004 appErr = status.Error(codes.Unknown, appErr.Error())
1005 appStatus, _ = status.FromError(appErr)
1006 }
1007 if trInfo != nil {
1008 trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1009 trInfo.tr.SetError()
1010 }
1011 if e := t.WriteStatus(stream, appStatus); e != nil {
1012 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
1013 }
1014 if binlog != nil {
1015 if h, _ := stream.Header(); h.Len() > 0 {
1016 // Only log serverHeader if there was header. Otherwise it can
1017 // be trailer only.
1018 binlog.Log(&binarylog.ServerHeader{
1019 Header: h,
1020 })
1021 }
1022 binlog.Log(&binarylog.ServerTrailer{
1023 Trailer: stream.Trailer(),
1024 Err: appErr,
1025 })
1026 }
1027 return appErr
1028 }
1029 if trInfo != nil {
1030 trInfo.tr.LazyLog(stringer("OK"), false)
1031 }
1032 opts := &transport.Options{Last: true}
1033
1034 if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
1035 if err == io.EOF {
1036 // The entire stream is done (for unary RPC only).
1037 return err
1038 }
1039 if s, ok := status.FromError(err); ok {
1040 if e := t.WriteStatus(stream, s); e != nil {
1041 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
1042 }
1043 } else {
1044 switch st := err.(type) {
1045 case transport.ConnectionError:
1046 // Nothing to do here.
1047 default:
1048 panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
1049 }
1050 }
1051 if binlog != nil {
1052 h, _ := stream.Header()
1053 binlog.Log(&binarylog.ServerHeader{
1054 Header: h,
1055 })
1056 binlog.Log(&binarylog.ServerTrailer{
1057 Trailer: stream.Trailer(),
1058 Err: appErr,
1059 })
1060 }
1061 return err
1062 }
1063 if binlog != nil {
1064 h, _ := stream.Header()
1065 binlog.Log(&binarylog.ServerHeader{
1066 Header: h,
1067 })
1068 binlog.Log(&binarylog.ServerMessage{
1069 Message: reply,
1070 })
1071 }
1072 if channelz.IsOn() {
1073 t.IncrMsgSent()
1074 }
1075 if trInfo != nil {
1076 trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
1077 }
1078 // TODO: Should we be logging if writing status failed here, like above?
1079 // Should the logging be in WriteStatus? Should we ignore the WriteStatus
1080 // error or allow the stats handler to see it?
1081 err = t.WriteStatus(stream, status.New(codes.OK, ""))
1082 if binlog != nil {
1083 binlog.Log(&binarylog.ServerTrailer{
1084 Trailer: stream.Trailer(),
1085 Err: appErr,
1086 })
1087 }
1088 return err
1089}
1090
1091func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
1092 if channelz.IsOn() {
1093 s.incrCallsStarted()
1094 defer func() {
1095 if err != nil && err != io.EOF {
1096 s.incrCallsFailed()
1097 } else {
1098 s.incrCallsSucceeded()
1099 }
1100 }()
1101 }
1102 sh := s.opts.statsHandler
1103 if sh != nil {
1104 beginTime := time.Now()
1105 begin := &stats.Begin{
1106 BeginTime: beginTime,
1107 }
1108 sh.HandleRPC(stream.Context(), begin)
1109 defer func() {
1110 end := &stats.End{
1111 BeginTime: beginTime,
1112 EndTime: time.Now(),
1113 }
1114 if err != nil && err != io.EOF {
1115 end.Error = toRPCErr(err)
1116 }
1117 sh.HandleRPC(stream.Context(), end)
1118 }()
1119 }
1120 ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1121 ss := &serverStream{
1122 ctx: ctx,
1123 t: t,
1124 s: stream,
1125 p: &parser{r: stream},
1126 codec: s.getCodec(stream.ContentSubtype()),
1127 maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
1128 maxSendMessageSize: s.opts.maxSendMessageSize,
1129 trInfo: trInfo,
1130 statsHandler: sh,
1131 }
1132
1133 ss.binlog = binarylog.GetMethodLogger(stream.Method())
1134 if ss.binlog != nil {
1135 md, _ := metadata.FromIncomingContext(ctx)
1136 logEntry := &binarylog.ClientHeader{
1137 Header: md,
1138 MethodName: stream.Method(),
1139 PeerAddr: nil,
1140 }
1141 if deadline, ok := ctx.Deadline(); ok {
1142 logEntry.Timeout = time.Until(deadline)
1143 if logEntry.Timeout < 0 {
1144 logEntry.Timeout = 0
1145 }
1146 }
1147 if a := md[":authority"]; len(a) > 0 {
1148 logEntry.Authority = a[0]
1149 }
1150 if peer, ok := peer.FromContext(ss.Context()); ok {
1151 logEntry.PeerAddr = peer.Addr
1152 }
1153 ss.binlog.Log(logEntry)
1154 }
1155
1156 // If dc is set and matches the stream's compression, use it. Otherwise, try
1157 // to find a matching registered compressor for decomp.
1158 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1159 ss.dc = s.opts.dc
1160 } else if rc != "" && rc != encoding.Identity {
1161 ss.decomp = encoding.GetCompressor(rc)
1162 if ss.decomp == nil {
1163 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1164 t.WriteStatus(ss.s, st)
1165 return st.Err()
1166 }
1167 }
1168
1169 // If cp is set, use it. Otherwise, attempt to compress the response using
1170 // the incoming message compression method.
1171 //
1172 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1173 if s.opts.cp != nil {
1174 ss.cp = s.opts.cp
1175 stream.SetSendCompress(s.opts.cp.Type())
1176 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1177 // Legacy compressor not specified; attempt to respond with same encoding.
1178 ss.comp = encoding.GetCompressor(rc)
1179 if ss.comp != nil {
1180 stream.SetSendCompress(rc)
1181 }
1182 }
1183
1184 if trInfo != nil {
1185 trInfo.tr.LazyLog(&trInfo.firstLine, false)
1186 defer func() {
1187 ss.mu.Lock()
1188 if err != nil && err != io.EOF {
1189 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1190 ss.trInfo.tr.SetError()
1191 }
1192 ss.trInfo.tr.Finish()
1193 ss.trInfo.tr = nil
1194 ss.mu.Unlock()
1195 }()
1196 }
1197 var appErr error
1198 var server interface{}
1199 if srv != nil {
1200 server = srv.server
1201 }
1202 if s.opts.streamInt == nil {
1203 appErr = sd.Handler(server, ss)
1204 } else {
1205 info := &StreamServerInfo{
1206 FullMethod: stream.Method(),
1207 IsClientStream: sd.ClientStreams,
1208 IsServerStream: sd.ServerStreams,
1209 }
1210 appErr = s.opts.streamInt(server, ss, info, sd.Handler)
1211 }
1212 if appErr != nil {
1213 appStatus, ok := status.FromError(appErr)
1214 if !ok {
1215 appStatus = status.New(codes.Unknown, appErr.Error())
1216 appErr = appStatus.Err()
1217 }
1218 if trInfo != nil {
1219 ss.mu.Lock()
1220 ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1221 ss.trInfo.tr.SetError()
1222 ss.mu.Unlock()
1223 }
1224 t.WriteStatus(ss.s, appStatus)
1225 if ss.binlog != nil {
1226 ss.binlog.Log(&binarylog.ServerTrailer{
1227 Trailer: ss.s.Trailer(),
1228 Err: appErr,
1229 })
1230 }
1231 // TODO: Should we log an error from WriteStatus here and below?
1232 return appErr
1233 }
1234 if trInfo != nil {
1235 ss.mu.Lock()
1236 ss.trInfo.tr.LazyLog(stringer("OK"), false)
1237 ss.mu.Unlock()
1238 }
1239 err = t.WriteStatus(ss.s, status.New(codes.OK, ""))
1240 if ss.binlog != nil {
1241 ss.binlog.Log(&binarylog.ServerTrailer{
1242 Trailer: ss.s.Trailer(),
1243 Err: appErr,
1244 })
1245 }
1246 return err
1247}
1248
1249func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
1250 sm := stream.Method()
1251 if sm != "" && sm[0] == '/' {
1252 sm = sm[1:]
1253 }
1254 pos := strings.LastIndex(sm, "/")
1255 if pos == -1 {
1256 if trInfo != nil {
1257 trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
1258 trInfo.tr.SetError()
1259 }
1260 errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
1261 if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
1262 if trInfo != nil {
1263 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1264 trInfo.tr.SetError()
1265 }
1266 grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
1267 }
1268 if trInfo != nil {
1269 trInfo.tr.Finish()
1270 }
1271 return
1272 }
1273 service := sm[:pos]
1274 method := sm[pos+1:]
1275
1276 srv, knownService := s.m[service]
1277 if knownService {
1278 if md, ok := srv.md[method]; ok {
1279 s.processUnaryRPC(t, stream, srv, md, trInfo)
1280 return
1281 }
1282 if sd, ok := srv.sd[method]; ok {
1283 s.processStreamingRPC(t, stream, srv, sd, trInfo)
1284 return
1285 }
1286 }
1287 // Unknown service, or known server unknown method.
1288 if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
1289 s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
1290 return
1291 }
1292 var errDesc string
1293 if !knownService {
1294 errDesc = fmt.Sprintf("unknown service %v", service)
1295 } else {
1296 errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
1297 }
1298 if trInfo != nil {
1299 trInfo.tr.LazyPrintf("%s", errDesc)
1300 trInfo.tr.SetError()
1301 }
1302 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1303 if trInfo != nil {
1304 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1305 trInfo.tr.SetError()
1306 }
1307 grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
1308 }
1309 if trInfo != nil {
1310 trInfo.tr.Finish()
1311 }
1312}
1313
1314// The key to save ServerTransportStream in the context.
1315type streamKey struct{}
1316
1317// NewContextWithServerTransportStream creates a new context from ctx and
1318// attaches stream to it.
1319//
1320// This API is EXPERIMENTAL.
1321func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
1322 return context.WithValue(ctx, streamKey{}, stream)
1323}
1324
1325// ServerTransportStream is a minimal interface that a transport stream must
1326// implement. This can be used to mock an actual transport stream for tests of
1327// handler code that use, for example, grpc.SetHeader (which requires some
1328// stream to be in context).
1329//
1330// See also NewContextWithServerTransportStream.
1331//
1332// This API is EXPERIMENTAL.
1333type ServerTransportStream interface {
1334 Method() string
1335 SetHeader(md metadata.MD) error
1336 SendHeader(md metadata.MD) error
1337 SetTrailer(md metadata.MD) error
1338}
1339
1340// ServerTransportStreamFromContext returns the ServerTransportStream saved in
1341// ctx. Returns nil if the given context has no stream associated with it
1342// (which implies it is not an RPC invocation context).
1343//
1344// This API is EXPERIMENTAL.
1345func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
1346 s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
1347 return s
1348}
1349
1350// Stop stops the gRPC server. It immediately closes all open
1351// connections and listeners.
1352// It cancels all active RPCs on the server side and the corresponding
1353// pending RPCs on the client side will get notified by connection
1354// errors.
1355func (s *Server) Stop() {
1356 s.quitOnce.Do(func() {
1357 close(s.quit)
1358 })
1359
1360 defer func() {
1361 s.serveWG.Wait()
1362 s.doneOnce.Do(func() {
1363 close(s.done)
1364 })
1365 }()
1366
1367 s.channelzRemoveOnce.Do(func() {
1368 if channelz.IsOn() {
1369 channelz.RemoveEntry(s.channelzID)
1370 }
1371 })
1372
1373 s.mu.Lock()
1374 listeners := s.lis
1375 s.lis = nil
1376 st := s.conns
1377 s.conns = nil
1378 // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
1379 s.cv.Broadcast()
1380 s.mu.Unlock()
1381
1382 for lis := range listeners {
1383 lis.Close()
1384 }
1385 for c := range st {
1386 c.Close()
1387 }
1388
1389 s.mu.Lock()
1390 if s.events != nil {
1391 s.events.Finish()
1392 s.events = nil
1393 }
1394 s.mu.Unlock()
1395}
1396
1397// GracefulStop stops the gRPC server gracefully. It stops the server from
1398// accepting new connections and RPCs and blocks until all the pending RPCs are
1399// finished.
1400func (s *Server) GracefulStop() {
1401 s.quitOnce.Do(func() {
1402 close(s.quit)
1403 })
1404
1405 defer func() {
1406 s.doneOnce.Do(func() {
1407 close(s.done)
1408 })
1409 }()
1410
1411 s.channelzRemoveOnce.Do(func() {
1412 if channelz.IsOn() {
1413 channelz.RemoveEntry(s.channelzID)
1414 }
1415 })
1416 s.mu.Lock()
1417 if s.conns == nil {
1418 s.mu.Unlock()
1419 return
1420 }
1421
1422 for lis := range s.lis {
1423 lis.Close()
1424 }
1425 s.lis = nil
1426 if !s.drain {
1427 for st := range s.conns {
1428 st.Drain()
1429 }
1430 s.drain = true
1431 }
1432
1433 // Wait for serving threads to be ready to exit. Only then can we be sure no
1434 // new conns will be created.
1435 s.mu.Unlock()
1436 s.serveWG.Wait()
1437 s.mu.Lock()
1438
1439 for len(s.conns) != 0 {
1440 s.cv.Wait()
1441 }
1442 s.conns = nil
1443 if s.events != nil {
1444 s.events.Finish()
1445 s.events = nil
1446 }
1447 s.mu.Unlock()
1448}
1449
1450// contentSubtype must be lowercase
1451// cannot return nil
1452func (s *Server) getCodec(contentSubtype string) baseCodec {
1453 if s.opts.codec != nil {
1454 return s.opts.codec
1455 }
1456 if contentSubtype == "" {
1457 return encoding.GetCodec(proto.Name)
1458 }
1459 codec := encoding.GetCodec(contentSubtype)
1460 if codec == nil {
1461 return encoding.GetCodec(proto.Name)
1462 }
1463 return codec
1464}
1465
1466// SetHeader sets the header metadata.
1467// When called multiple times, all the provided metadata will be merged.
1468// All the metadata will be sent out when one of the following happens:
1469// - grpc.SendHeader() is called;
1470// - The first response is sent out;
1471// - An RPC status is sent out (error or success).
1472func SetHeader(ctx context.Context, md metadata.MD) error {
1473 if md.Len() == 0 {
1474 return nil
1475 }
1476 stream := ServerTransportStreamFromContext(ctx)
1477 if stream == nil {
1478 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1479 }
1480 return stream.SetHeader(md)
1481}
1482
1483// SendHeader sends header metadata. It may be called at most once.
1484// The provided md and headers set by SetHeader() will be sent.
1485func SendHeader(ctx context.Context, md metadata.MD) error {
1486 stream := ServerTransportStreamFromContext(ctx)
1487 if stream == nil {
1488 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1489 }
1490 if err := stream.SendHeader(md); err != nil {
1491 return toRPCErr(err)
1492 }
1493 return nil
1494}
1495
1496// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
1497// When called more than once, all the provided metadata will be merged.
1498func SetTrailer(ctx context.Context, md metadata.MD) error {
1499 if md.Len() == 0 {
1500 return nil
1501 }
1502 stream := ServerTransportStreamFromContext(ctx)
1503 if stream == nil {
1504 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1505 }
1506 return stream.SetTrailer(md)
1507}
1508
1509// Method returns the method string for the server context. The returned
1510// string is in the format of "/service/method".
1511func Method(ctx context.Context) (string, bool) {
1512 s := ServerTransportStreamFromContext(ctx)
1513 if s == nil {
1514 return "", false
1515 }
1516 return s.Method(), true
1517}
1518
1519type channelzServer struct {
1520 s *Server
1521}
1522
1523func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
1524 return c.s.channelzMetric()
1525}