blob: e54083d850c1f9d0024c2d0d4be946d87308dab9 [file] [log] [blame]
Takahiro Suzukid7bf8202020-12-17 20:21:59 +09001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "fmt"
25 "io"
26 "math"
27 "net"
28 "net/http"
29 "reflect"
30 "runtime"
31 "strings"
32 "sync"
33 "sync/atomic"
34 "time"
35
36 "golang.org/x/net/trace"
37
38 "google.golang.org/grpc/codes"
39 "google.golang.org/grpc/credentials"
40 "google.golang.org/grpc/encoding"
41 "google.golang.org/grpc/encoding/proto"
42 "google.golang.org/grpc/grpclog"
43 "google.golang.org/grpc/internal/binarylog"
44 "google.golang.org/grpc/internal/channelz"
45 "google.golang.org/grpc/internal/grpcsync"
46 "google.golang.org/grpc/internal/transport"
47 "google.golang.org/grpc/keepalive"
48 "google.golang.org/grpc/metadata"
49 "google.golang.org/grpc/peer"
50 "google.golang.org/grpc/stats"
51 "google.golang.org/grpc/status"
52 "google.golang.org/grpc/tap"
53)
54
55const (
56 defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
57 defaultServerMaxSendMessageSize = math.MaxInt32
58)
59
60var statusOK = status.New(codes.OK, "")
61
62type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
63
64// MethodDesc represents an RPC service's method specification.
65type MethodDesc struct {
66 MethodName string
67 Handler methodHandler
68}
69
70// ServiceDesc represents an RPC service's specification.
71type ServiceDesc struct {
72 ServiceName string
73 // The pointer to the service interface. Used to check whether the user
74 // provided implementation satisfies the interface requirements.
75 HandlerType interface{}
76 Methods []MethodDesc
77 Streams []StreamDesc
78 Metadata interface{}
79}
80
81// service consists of the information of the server serving this service and
82// the methods in this service.
83type service struct {
84 server interface{} // the server for service methods
85 md map[string]*MethodDesc
86 sd map[string]*StreamDesc
87 mdata interface{}
88}
89
90// Server is a gRPC server to serve RPC requests.
91type Server struct {
92 opts serverOptions
93
94 mu sync.Mutex // guards following
95 lis map[net.Listener]bool
96 conns map[transport.ServerTransport]bool
97 serve bool
98 drain bool
99 cv *sync.Cond // signaled when connections close for GracefulStop
100 m map[string]*service // service name -> service info
101 events trace.EventLog
102
103 quit *grpcsync.Event
104 done *grpcsync.Event
105 channelzRemoveOnce sync.Once
106 serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
107
108 channelzID int64 // channelz unique identification number
109 czData *channelzData
110}
111
112type serverOptions struct {
113 creds credentials.TransportCredentials
114 codec baseCodec
115 cp Compressor
116 dc Decompressor
117 unaryInt UnaryServerInterceptor
118 streamInt StreamServerInterceptor
119 inTapHandle tap.ServerInHandle
120 statsHandler stats.Handler
121 maxConcurrentStreams uint32
122 maxReceiveMessageSize int
123 maxSendMessageSize int
124 unknownStreamDesc *StreamDesc
125 keepaliveParams keepalive.ServerParameters
126 keepalivePolicy keepalive.EnforcementPolicy
127 initialWindowSize int32
128 initialConnWindowSize int32
129 writeBufferSize int
130 readBufferSize int
131 connectionTimeout time.Duration
132 maxHeaderListSize *uint32
133 headerTableSize *uint32
134}
135
136var defaultServerOptions = serverOptions{
137 maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
138 maxSendMessageSize: defaultServerMaxSendMessageSize,
139 connectionTimeout: 120 * time.Second,
140 writeBufferSize: defaultWriteBufSize,
141 readBufferSize: defaultReadBufSize,
142}
143
144// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
145type ServerOption interface {
146 apply(*serverOptions)
147}
148
149// EmptyServerOption does not alter the server configuration. It can be embedded
150// in another structure to build custom server options.
151//
152// This API is EXPERIMENTAL.
153type EmptyServerOption struct{}
154
155func (EmptyServerOption) apply(*serverOptions) {}
156
157// funcServerOption wraps a function that modifies serverOptions into an
158// implementation of the ServerOption interface.
159type funcServerOption struct {
160 f func(*serverOptions)
161}
162
163func (fdo *funcServerOption) apply(do *serverOptions) {
164 fdo.f(do)
165}
166
167func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
168 return &funcServerOption{
169 f: f,
170 }
171}
172
173// WriteBufferSize determines how much data can be batched before doing a write on the wire.
174// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
175// The default value for this buffer is 32KB.
176// Zero will disable the write buffer such that each write will be on underlying connection.
177// Note: A Send call may not directly translate to a write.
178func WriteBufferSize(s int) ServerOption {
179 return newFuncServerOption(func(o *serverOptions) {
180 o.writeBufferSize = s
181 })
182}
183
184// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
185// for one read syscall.
186// The default value for this buffer is 32KB.
187// Zero will disable read buffer for a connection so data framer can access the underlying
188// conn directly.
189func ReadBufferSize(s int) ServerOption {
190 return newFuncServerOption(func(o *serverOptions) {
191 o.readBufferSize = s
192 })
193}
194
195// InitialWindowSize returns a ServerOption that sets window size for stream.
196// The lower bound for window size is 64K and any value smaller than that will be ignored.
197func InitialWindowSize(s int32) ServerOption {
198 return newFuncServerOption(func(o *serverOptions) {
199 o.initialWindowSize = s
200 })
201}
202
203// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
204// The lower bound for window size is 64K and any value smaller than that will be ignored.
205func InitialConnWindowSize(s int32) ServerOption {
206 return newFuncServerOption(func(o *serverOptions) {
207 o.initialConnWindowSize = s
208 })
209}
210
211// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
212func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
213 if kp.Time > 0 && kp.Time < time.Second {
214 grpclog.Warning("Adjusting keepalive ping interval to minimum period of 1s")
215 kp.Time = time.Second
216 }
217
218 return newFuncServerOption(func(o *serverOptions) {
219 o.keepaliveParams = kp
220 })
221}
222
223// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
224func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
225 return newFuncServerOption(func(o *serverOptions) {
226 o.keepalivePolicy = kep
227 })
228}
229
230// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
231//
232// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
233func CustomCodec(codec Codec) ServerOption {
234 return newFuncServerOption(func(o *serverOptions) {
235 o.codec = codec
236 })
237}
238
239// RPCCompressor returns a ServerOption that sets a compressor for outbound
240// messages. For backward compatibility, all outbound messages will be sent
241// using this compressor, regardless of incoming message compression. By
242// default, server messages will be sent using the same compressor with which
243// request messages were sent.
244//
245// Deprecated: use encoding.RegisterCompressor instead.
246func RPCCompressor(cp Compressor) ServerOption {
247 return newFuncServerOption(func(o *serverOptions) {
248 o.cp = cp
249 })
250}
251
252// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
253// messages. It has higher priority than decompressors registered via
254// encoding.RegisterCompressor.
255//
256// Deprecated: use encoding.RegisterCompressor instead.
257func RPCDecompressor(dc Decompressor) ServerOption {
258 return newFuncServerOption(func(o *serverOptions) {
259 o.dc = dc
260 })
261}
262
263// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
264// If this is not set, gRPC uses the default limit.
265//
266// Deprecated: use MaxRecvMsgSize instead.
267func MaxMsgSize(m int) ServerOption {
268 return MaxRecvMsgSize(m)
269}
270
271// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
272// If this is not set, gRPC uses the default 4MB.
273func MaxRecvMsgSize(m int) ServerOption {
274 return newFuncServerOption(func(o *serverOptions) {
275 o.maxReceiveMessageSize = m
276 })
277}
278
279// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
280// If this is not set, gRPC uses the default `math.MaxInt32`.
281func MaxSendMsgSize(m int) ServerOption {
282 return newFuncServerOption(func(o *serverOptions) {
283 o.maxSendMessageSize = m
284 })
285}
286
287// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
288// of concurrent streams to each ServerTransport.
289func MaxConcurrentStreams(n uint32) ServerOption {
290 return newFuncServerOption(func(o *serverOptions) {
291 o.maxConcurrentStreams = n
292 })
293}
294
295// Creds returns a ServerOption that sets credentials for server connections.
296func Creds(c credentials.TransportCredentials) ServerOption {
297 return newFuncServerOption(func(o *serverOptions) {
298 o.creds = c
299 })
300}
301
302// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
303// server. Only one unary interceptor can be installed. The construction of multiple
304// interceptors (e.g., chaining) can be implemented at the caller.
305func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
306 return newFuncServerOption(func(o *serverOptions) {
307 if o.unaryInt != nil {
308 panic("The unary server interceptor was already set and may not be reset.")
309 }
310 o.unaryInt = i
311 })
312}
313
314// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
315// server. Only one stream interceptor can be installed.
316func StreamInterceptor(i StreamServerInterceptor) ServerOption {
317 return newFuncServerOption(func(o *serverOptions) {
318 if o.streamInt != nil {
319 panic("The stream server interceptor was already set and may not be reset.")
320 }
321 o.streamInt = i
322 })
323}
324
325// InTapHandle returns a ServerOption that sets the tap handle for all the server
326// transport to be created. Only one can be installed.
327func InTapHandle(h tap.ServerInHandle) ServerOption {
328 return newFuncServerOption(func(o *serverOptions) {
329 if o.inTapHandle != nil {
330 panic("The tap handle was already set and may not be reset.")
331 }
332 o.inTapHandle = h
333 })
334}
335
336// StatsHandler returns a ServerOption that sets the stats handler for the server.
337func StatsHandler(h stats.Handler) ServerOption {
338 return newFuncServerOption(func(o *serverOptions) {
339 o.statsHandler = h
340 })
341}
342
343// UnknownServiceHandler returns a ServerOption that allows for adding a custom
344// unknown service handler. The provided method is a bidi-streaming RPC service
345// handler that will be invoked instead of returning the "unimplemented" gRPC
346// error whenever a request is received for an unregistered service or method.
347// The handling function has full access to the Context of the request and the
348// stream, and the invocation bypasses interceptors.
349func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
350 return newFuncServerOption(func(o *serverOptions) {
351 o.unknownStreamDesc = &StreamDesc{
352 StreamName: "unknown_service_handler",
353 Handler: streamHandler,
354 // We need to assume that the users of the streamHandler will want to use both.
355 ClientStreams: true,
356 ServerStreams: true,
357 }
358 })
359}
360
361// ConnectionTimeout returns a ServerOption that sets the timeout for
362// connection establishment (up to and including HTTP/2 handshaking) for all
363// new connections. If this is not set, the default is 120 seconds. A zero or
364// negative value will result in an immediate timeout.
365//
366// This API is EXPERIMENTAL.
367func ConnectionTimeout(d time.Duration) ServerOption {
368 return newFuncServerOption(func(o *serverOptions) {
369 o.connectionTimeout = d
370 })
371}
372
373// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
374// of header list that the server is prepared to accept.
375func MaxHeaderListSize(s uint32) ServerOption {
376 return newFuncServerOption(func(o *serverOptions) {
377 o.maxHeaderListSize = &s
378 })
379}
380
381// HeaderTableSize returns a ServerOption that sets the size of dynamic
382// header table for stream.
383//
384// This API is EXPERIMENTAL.
385func HeaderTableSize(s uint32) ServerOption {
386 return newFuncServerOption(func(o *serverOptions) {
387 o.headerTableSize = &s
388 })
389}
390
391// NewServer creates a gRPC server which has no service registered and has not
392// started to accept requests yet.
393func NewServer(opt ...ServerOption) *Server {
394 opts := defaultServerOptions
395 for _, o := range opt {
396 o.apply(&opts)
397 }
398 s := &Server{
399 lis: make(map[net.Listener]bool),
400 opts: opts,
401 conns: make(map[transport.ServerTransport]bool),
402 m: make(map[string]*service),
403 quit: grpcsync.NewEvent(),
404 done: grpcsync.NewEvent(),
405 czData: new(channelzData),
406 }
407 s.cv = sync.NewCond(&s.mu)
408 if EnableTracing {
409 _, file, line, _ := runtime.Caller(1)
410 s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
411 }
412
413 if channelz.IsOn() {
414 s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
415 }
416 return s
417}
418
419// printf records an event in s's event log, unless s has been stopped.
420// REQUIRES s.mu is held.
421func (s *Server) printf(format string, a ...interface{}) {
422 if s.events != nil {
423 s.events.Printf(format, a...)
424 }
425}
426
427// errorf records an error in s's event log, unless s has been stopped.
428// REQUIRES s.mu is held.
429func (s *Server) errorf(format string, a ...interface{}) {
430 if s.events != nil {
431 s.events.Errorf(format, a...)
432 }
433}
434
435// RegisterService registers a service and its implementation to the gRPC
436// server. It is called from the IDL generated code. This must be called before
437// invoking Serve.
438func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
439 ht := reflect.TypeOf(sd.HandlerType).Elem()
440 st := reflect.TypeOf(ss)
441 if !st.Implements(ht) {
442 grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
443 }
444 s.register(sd, ss)
445}
446
447func (s *Server) register(sd *ServiceDesc, ss interface{}) {
448 s.mu.Lock()
449 defer s.mu.Unlock()
450 s.printf("RegisterService(%q)", sd.ServiceName)
451 if s.serve {
452 grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
453 }
454 if _, ok := s.m[sd.ServiceName]; ok {
455 grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
456 }
457 srv := &service{
458 server: ss,
459 md: make(map[string]*MethodDesc),
460 sd: make(map[string]*StreamDesc),
461 mdata: sd.Metadata,
462 }
463 for i := range sd.Methods {
464 d := &sd.Methods[i]
465 srv.md[d.MethodName] = d
466 }
467 for i := range sd.Streams {
468 d := &sd.Streams[i]
469 srv.sd[d.StreamName] = d
470 }
471 s.m[sd.ServiceName] = srv
472}
473
474// MethodInfo contains the information of an RPC including its method name and type.
475type MethodInfo struct {
476 // Name is the method name only, without the service name or package name.
477 Name string
478 // IsClientStream indicates whether the RPC is a client streaming RPC.
479 IsClientStream bool
480 // IsServerStream indicates whether the RPC is a server streaming RPC.
481 IsServerStream bool
482}
483
484// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
485type ServiceInfo struct {
486 Methods []MethodInfo
487 // Metadata is the metadata specified in ServiceDesc when registering service.
488 Metadata interface{}
489}
490
491// GetServiceInfo returns a map from service names to ServiceInfo.
492// Service names include the package names, in the form of <package>.<service>.
493func (s *Server) GetServiceInfo() map[string]ServiceInfo {
494 ret := make(map[string]ServiceInfo)
495 for n, srv := range s.m {
496 methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
497 for m := range srv.md {
498 methods = append(methods, MethodInfo{
499 Name: m,
500 IsClientStream: false,
501 IsServerStream: false,
502 })
503 }
504 for m, d := range srv.sd {
505 methods = append(methods, MethodInfo{
506 Name: m,
507 IsClientStream: d.ClientStreams,
508 IsServerStream: d.ServerStreams,
509 })
510 }
511
512 ret[n] = ServiceInfo{
513 Methods: methods,
514 Metadata: srv.mdata,
515 }
516 }
517 return ret
518}
519
520// ErrServerStopped indicates that the operation is now illegal because of
521// the server being stopped.
522var ErrServerStopped = errors.New("grpc: the server has been stopped")
523
524func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
525 if s.opts.creds == nil {
526 return rawConn, nil, nil
527 }
528 return s.opts.creds.ServerHandshake(rawConn)
529}
530
531type listenSocket struct {
532 net.Listener
533 channelzID int64
534}
535
536func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
537 return &channelz.SocketInternalMetric{
538 SocketOptions: channelz.GetSocketOption(l.Listener),
539 LocalAddr: l.Listener.Addr(),
540 }
541}
542
543func (l *listenSocket) Close() error {
544 err := l.Listener.Close()
545 if channelz.IsOn() {
546 channelz.RemoveEntry(l.channelzID)
547 }
548 return err
549}
550
551// Serve accepts incoming connections on the listener lis, creating a new
552// ServerTransport and service goroutine for each. The service goroutines
553// read gRPC requests and then call the registered handlers to reply to them.
554// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
555// this method returns.
556// Serve will return a non-nil error unless Stop or GracefulStop is called.
557func (s *Server) Serve(lis net.Listener) error {
558 s.mu.Lock()
559 s.printf("serving")
560 s.serve = true
561 if s.lis == nil {
562 // Serve called after Stop or GracefulStop.
563 s.mu.Unlock()
564 lis.Close()
565 return ErrServerStopped
566 }
567
568 s.serveWG.Add(1)
569 defer func() {
570 s.serveWG.Done()
571 if s.quit.HasFired() {
572 // Stop or GracefulStop called; block until done and return nil.
573 <-s.done.Done()
574 }
575 }()
576
577 ls := &listenSocket{Listener: lis}
578 s.lis[ls] = true
579
580 if channelz.IsOn() {
581 ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
582 }
583 s.mu.Unlock()
584
585 defer func() {
586 s.mu.Lock()
587 if s.lis != nil && s.lis[ls] {
588 ls.Close()
589 delete(s.lis, ls)
590 }
591 s.mu.Unlock()
592 }()
593
594 var tempDelay time.Duration // how long to sleep on accept failure
595
596 for {
597 rawConn, err := lis.Accept()
598 if err != nil {
599 if ne, ok := err.(interface {
600 Temporary() bool
601 }); ok && ne.Temporary() {
602 if tempDelay == 0 {
603 tempDelay = 5 * time.Millisecond
604 } else {
605 tempDelay *= 2
606 }
607 if max := 1 * time.Second; tempDelay > max {
608 tempDelay = max
609 }
610 s.mu.Lock()
611 s.printf("Accept error: %v; retrying in %v", err, tempDelay)
612 s.mu.Unlock()
613 timer := time.NewTimer(tempDelay)
614 select {
615 case <-timer.C:
616 case <-s.quit.Done():
617 timer.Stop()
618 return nil
619 }
620 continue
621 }
622 s.mu.Lock()
623 s.printf("done serving; Accept = %v", err)
624 s.mu.Unlock()
625
626 if s.quit.HasFired() {
627 return nil
628 }
629 return err
630 }
631 tempDelay = 0
632 // Start a new goroutine to deal with rawConn so we don't stall this Accept
633 // loop goroutine.
634 //
635 // Make sure we account for the goroutine so GracefulStop doesn't nil out
636 // s.conns before this conn can be added.
637 s.serveWG.Add(1)
638 go func() {
639 s.handleRawConn(rawConn)
640 s.serveWG.Done()
641 }()
642 }
643}
644
645// handleRawConn forks a goroutine to handle a just-accepted connection that
646// has not had any I/O performed on it yet.
647func (s *Server) handleRawConn(rawConn net.Conn) {
648 if s.quit.HasFired() {
649 rawConn.Close()
650 return
651 }
652 rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
653 conn, authInfo, err := s.useTransportAuthenticator(rawConn)
654 if err != nil {
655 // ErrConnDispatched means that the connection was dispatched away from
656 // gRPC; those connections should be left open.
657 if err != credentials.ErrConnDispatched {
658 s.mu.Lock()
659 s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
660 s.mu.Unlock()
661 grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
662 rawConn.Close()
663 }
664 rawConn.SetDeadline(time.Time{})
665 return
666 }
667
668 // Finish handshaking (HTTP2)
669 st := s.newHTTP2Transport(conn, authInfo)
670 if st == nil {
671 return
672 }
673
674 rawConn.SetDeadline(time.Time{})
675 if !s.addConn(st) {
676 return
677 }
678 go func() {
679 s.serveStreams(st)
680 s.removeConn(st)
681 }()
682}
683
684// newHTTP2Transport sets up a http/2 transport (using the
685// gRPC http2 server transport in transport/http2_server.go).
686func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
687 config := &transport.ServerConfig{
688 MaxStreams: s.opts.maxConcurrentStreams,
689 AuthInfo: authInfo,
690 InTapHandle: s.opts.inTapHandle,
691 StatsHandler: s.opts.statsHandler,
692 KeepaliveParams: s.opts.keepaliveParams,
693 KeepalivePolicy: s.opts.keepalivePolicy,
694 InitialWindowSize: s.opts.initialWindowSize,
695 InitialConnWindowSize: s.opts.initialConnWindowSize,
696 WriteBufferSize: s.opts.writeBufferSize,
697 ReadBufferSize: s.opts.readBufferSize,
698 ChannelzParentID: s.channelzID,
699 MaxHeaderListSize: s.opts.maxHeaderListSize,
700 HeaderTableSize: s.opts.headerTableSize,
701 }
702 st, err := transport.NewServerTransport("http2", c, config)
703 if err != nil {
704 s.mu.Lock()
705 s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
706 s.mu.Unlock()
707 c.Close()
708 grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
709 return nil
710 }
711
712 return st
713}
714
715func (s *Server) serveStreams(st transport.ServerTransport) {
716 defer st.Close()
717 var wg sync.WaitGroup
718 st.HandleStreams(func(stream *transport.Stream) {
719 wg.Add(1)
720 go func() {
721 defer wg.Done()
722 s.handleStream(st, stream, s.traceInfo(st, stream))
723 }()
724 }, func(ctx context.Context, method string) context.Context {
725 if !EnableTracing {
726 return ctx
727 }
728 tr := trace.New("grpc.Recv."+methodFamily(method), method)
729 return trace.NewContext(ctx, tr)
730 })
731 wg.Wait()
732}
733
734var _ http.Handler = (*Server)(nil)
735
736// ServeHTTP implements the Go standard library's http.Handler
737// interface by responding to the gRPC request r, by looking up
738// the requested gRPC method in the gRPC server s.
739//
740// The provided HTTP request must have arrived on an HTTP/2
741// connection. When using the Go standard library's server,
742// practically this means that the Request must also have arrived
743// over TLS.
744//
745// To share one port (such as 443 for https) between gRPC and an
746// existing http.Handler, use a root http.Handler such as:
747//
748// if r.ProtoMajor == 2 && strings.HasPrefix(
749// r.Header.Get("Content-Type"), "application/grpc") {
750// grpcServer.ServeHTTP(w, r)
751// } else {
752// yourMux.ServeHTTP(w, r)
753// }
754//
755// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
756// separate from grpc-go's HTTP/2 server. Performance and features may vary
757// between the two paths. ServeHTTP does not support some gRPC features
758// available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
759// and subject to change.
760func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
761 st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
762 if err != nil {
763 http.Error(w, err.Error(), http.StatusInternalServerError)
764 return
765 }
766 if !s.addConn(st) {
767 return
768 }
769 defer s.removeConn(st)
770 s.serveStreams(st)
771}
772
773// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
774// If tracing is not enabled, it returns nil.
775func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
776 if !EnableTracing {
777 return nil
778 }
779 tr, ok := trace.FromContext(stream.Context())
780 if !ok {
781 return nil
782 }
783
784 trInfo = &traceInfo{
785 tr: tr,
786 firstLine: firstLine{
787 client: false,
788 remoteAddr: st.RemoteAddr(),
789 },
790 }
791 if dl, ok := stream.Context().Deadline(); ok {
792 trInfo.firstLine.deadline = time.Until(dl)
793 }
794 return trInfo
795}
796
797func (s *Server) addConn(st transport.ServerTransport) bool {
798 s.mu.Lock()
799 defer s.mu.Unlock()
800 if s.conns == nil {
801 st.Close()
802 return false
803 }
804 if s.drain {
805 // Transport added after we drained our existing conns: drain it
806 // immediately.
807 st.Drain()
808 }
809 s.conns[st] = true
810 return true
811}
812
813func (s *Server) removeConn(st transport.ServerTransport) {
814 s.mu.Lock()
815 defer s.mu.Unlock()
816 if s.conns != nil {
817 delete(s.conns, st)
818 s.cv.Broadcast()
819 }
820}
821
822func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
823 return &channelz.ServerInternalMetric{
824 CallsStarted: atomic.LoadInt64(&s.czData.callsStarted),
825 CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded),
826 CallsFailed: atomic.LoadInt64(&s.czData.callsFailed),
827 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
828 }
829}
830
831func (s *Server) incrCallsStarted() {
832 atomic.AddInt64(&s.czData.callsStarted, 1)
833 atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
834}
835
836func (s *Server) incrCallsSucceeded() {
837 atomic.AddInt64(&s.czData.callsSucceeded, 1)
838}
839
840func (s *Server) incrCallsFailed() {
841 atomic.AddInt64(&s.czData.callsFailed, 1)
842}
843
844func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
845 data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
846 if err != nil {
847 grpclog.Errorln("grpc: server failed to encode response: ", err)
848 return err
849 }
850 compData, err := compress(data, cp, comp)
851 if err != nil {
852 grpclog.Errorln("grpc: server failed to compress response: ", err)
853 return err
854 }
855 hdr, payload := msgHeader(data, compData)
856 // TODO(dfawley): should we be checking len(data) instead?
857 if len(payload) > s.opts.maxSendMessageSize {
858 return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
859 }
860 err = t.Write(stream, hdr, payload, opts)
861 if err == nil && s.opts.statsHandler != nil {
862 s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
863 }
864 return err
865}
866
867func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
868 if channelz.IsOn() {
869 s.incrCallsStarted()
870 defer func() {
871 if err != nil && err != io.EOF {
872 s.incrCallsFailed()
873 } else {
874 s.incrCallsSucceeded()
875 }
876 }()
877 }
878 sh := s.opts.statsHandler
879 if sh != nil {
880 beginTime := time.Now()
881 begin := &stats.Begin{
882 BeginTime: beginTime,
883 }
884 sh.HandleRPC(stream.Context(), begin)
885 defer func() {
886 end := &stats.End{
887 BeginTime: beginTime,
888 EndTime: time.Now(),
889 }
890 if err != nil && err != io.EOF {
891 end.Error = toRPCErr(err)
892 }
893 sh.HandleRPC(stream.Context(), end)
894 }()
895 }
896 if trInfo != nil {
897 defer trInfo.tr.Finish()
898 trInfo.tr.LazyLog(&trInfo.firstLine, false)
899 defer func() {
900 if err != nil && err != io.EOF {
901 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
902 trInfo.tr.SetError()
903 }
904 }()
905 }
906
907 binlog := binarylog.GetMethodLogger(stream.Method())
908 if binlog != nil {
909 ctx := stream.Context()
910 md, _ := metadata.FromIncomingContext(ctx)
911 logEntry := &binarylog.ClientHeader{
912 Header: md,
913 MethodName: stream.Method(),
914 PeerAddr: nil,
915 }
916 if deadline, ok := ctx.Deadline(); ok {
917 logEntry.Timeout = time.Until(deadline)
918 if logEntry.Timeout < 0 {
919 logEntry.Timeout = 0
920 }
921 }
922 if a := md[":authority"]; len(a) > 0 {
923 logEntry.Authority = a[0]
924 }
925 if peer, ok := peer.FromContext(ctx); ok {
926 logEntry.PeerAddr = peer.Addr
927 }
928 binlog.Log(logEntry)
929 }
930
931 // comp and cp are used for compression. decomp and dc are used for
932 // decompression. If comp and decomp are both set, they are the same;
933 // however they are kept separate to ensure that at most one of the
934 // compressor/decompressor variable pairs are set for use later.
935 var comp, decomp encoding.Compressor
936 var cp Compressor
937 var dc Decompressor
938
939 // If dc is set and matches the stream's compression, use it. Otherwise, try
940 // to find a matching registered compressor for decomp.
941 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
942 dc = s.opts.dc
943 } else if rc != "" && rc != encoding.Identity {
944 decomp = encoding.GetCompressor(rc)
945 if decomp == nil {
946 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
947 t.WriteStatus(stream, st)
948 return st.Err()
949 }
950 }
951
952 // If cp is set, use it. Otherwise, attempt to compress the response using
953 // the incoming message compression method.
954 //
955 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
956 if s.opts.cp != nil {
957 cp = s.opts.cp
958 stream.SetSendCompress(cp.Type())
959 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
960 // Legacy compressor not specified; attempt to respond with same encoding.
961 comp = encoding.GetCompressor(rc)
962 if comp != nil {
963 stream.SetSendCompress(rc)
964 }
965 }
966
967 var payInfo *payloadInfo
968 if sh != nil || binlog != nil {
969 payInfo = &payloadInfo{}
970 }
971 d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
972 if err != nil {
973 if st, ok := status.FromError(err); ok {
974 if e := t.WriteStatus(stream, st); e != nil {
975 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
976 }
977 }
978 return err
979 }
980 if channelz.IsOn() {
981 t.IncrMsgRecv()
982 }
983 df := func(v interface{}) error {
984 if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
985 return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
986 }
987 if sh != nil {
988 sh.HandleRPC(stream.Context(), &stats.InPayload{
989 RecvTime: time.Now(),
990 Payload: v,
991 WireLength: payInfo.wireLength,
992 Data: d,
993 Length: len(d),
994 })
995 }
996 if binlog != nil {
997 binlog.Log(&binarylog.ClientMessage{
998 Message: d,
999 })
1000 }
1001 if trInfo != nil {
1002 trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
1003 }
1004 return nil
1005 }
1006 ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1007 reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
1008 if appErr != nil {
1009 appStatus, ok := status.FromError(appErr)
1010 if !ok {
1011 // Convert appErr if it is not a grpc status error.
1012 appErr = status.Error(codes.Unknown, appErr.Error())
1013 appStatus, _ = status.FromError(appErr)
1014 }
1015 if trInfo != nil {
1016 trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1017 trInfo.tr.SetError()
1018 }
1019 if e := t.WriteStatus(stream, appStatus); e != nil {
1020 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
1021 }
1022 if binlog != nil {
1023 if h, _ := stream.Header(); h.Len() > 0 {
1024 // Only log serverHeader if there was header. Otherwise it can
1025 // be trailer only.
1026 binlog.Log(&binarylog.ServerHeader{
1027 Header: h,
1028 })
1029 }
1030 binlog.Log(&binarylog.ServerTrailer{
1031 Trailer: stream.Trailer(),
1032 Err: appErr,
1033 })
1034 }
1035 return appErr
1036 }
1037 if trInfo != nil {
1038 trInfo.tr.LazyLog(stringer("OK"), false)
1039 }
1040 opts := &transport.Options{Last: true}
1041
1042 if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
1043 if err == io.EOF {
1044 // The entire stream is done (for unary RPC only).
1045 return err
1046 }
1047 if s, ok := status.FromError(err); ok {
1048 if e := t.WriteStatus(stream, s); e != nil {
1049 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
1050 }
1051 } else {
1052 switch st := err.(type) {
1053 case transport.ConnectionError:
1054 // Nothing to do here.
1055 default:
1056 panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
1057 }
1058 }
1059 if binlog != nil {
1060 h, _ := stream.Header()
1061 binlog.Log(&binarylog.ServerHeader{
1062 Header: h,
1063 })
1064 binlog.Log(&binarylog.ServerTrailer{
1065 Trailer: stream.Trailer(),
1066 Err: appErr,
1067 })
1068 }
1069 return err
1070 }
1071 if binlog != nil {
1072 h, _ := stream.Header()
1073 binlog.Log(&binarylog.ServerHeader{
1074 Header: h,
1075 })
1076 binlog.Log(&binarylog.ServerMessage{
1077 Message: reply,
1078 })
1079 }
1080 if channelz.IsOn() {
1081 t.IncrMsgSent()
1082 }
1083 if trInfo != nil {
1084 trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
1085 }
1086 // TODO: Should we be logging if writing status failed here, like above?
1087 // Should the logging be in WriteStatus? Should we ignore the WriteStatus
1088 // error or allow the stats handler to see it?
1089 err = t.WriteStatus(stream, statusOK)
1090 if binlog != nil {
1091 binlog.Log(&binarylog.ServerTrailer{
1092 Trailer: stream.Trailer(),
1093 Err: appErr,
1094 })
1095 }
1096 return err
1097}
1098
1099func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
1100 if channelz.IsOn() {
1101 s.incrCallsStarted()
1102 defer func() {
1103 if err != nil && err != io.EOF {
1104 s.incrCallsFailed()
1105 } else {
1106 s.incrCallsSucceeded()
1107 }
1108 }()
1109 }
1110 sh := s.opts.statsHandler
1111 if sh != nil {
1112 beginTime := time.Now()
1113 begin := &stats.Begin{
1114 BeginTime: beginTime,
1115 }
1116 sh.HandleRPC(stream.Context(), begin)
1117 defer func() {
1118 end := &stats.End{
1119 BeginTime: beginTime,
1120 EndTime: time.Now(),
1121 }
1122 if err != nil && err != io.EOF {
1123 end.Error = toRPCErr(err)
1124 }
1125 sh.HandleRPC(stream.Context(), end)
1126 }()
1127 }
1128 ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1129 ss := &serverStream{
1130 ctx: ctx,
1131 t: t,
1132 s: stream,
1133 p: &parser{r: stream},
1134 codec: s.getCodec(stream.ContentSubtype()),
1135 maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
1136 maxSendMessageSize: s.opts.maxSendMessageSize,
1137 trInfo: trInfo,
1138 statsHandler: sh,
1139 }
1140
1141 ss.binlog = binarylog.GetMethodLogger(stream.Method())
1142 if ss.binlog != nil {
1143 md, _ := metadata.FromIncomingContext(ctx)
1144 logEntry := &binarylog.ClientHeader{
1145 Header: md,
1146 MethodName: stream.Method(),
1147 PeerAddr: nil,
1148 }
1149 if deadline, ok := ctx.Deadline(); ok {
1150 logEntry.Timeout = time.Until(deadline)
1151 if logEntry.Timeout < 0 {
1152 logEntry.Timeout = 0
1153 }
1154 }
1155 if a := md[":authority"]; len(a) > 0 {
1156 logEntry.Authority = a[0]
1157 }
1158 if peer, ok := peer.FromContext(ss.Context()); ok {
1159 logEntry.PeerAddr = peer.Addr
1160 }
1161 ss.binlog.Log(logEntry)
1162 }
1163
1164 // If dc is set and matches the stream's compression, use it. Otherwise, try
1165 // to find a matching registered compressor for decomp.
1166 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1167 ss.dc = s.opts.dc
1168 } else if rc != "" && rc != encoding.Identity {
1169 ss.decomp = encoding.GetCompressor(rc)
1170 if ss.decomp == nil {
1171 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1172 t.WriteStatus(ss.s, st)
1173 return st.Err()
1174 }
1175 }
1176
1177 // If cp is set, use it. Otherwise, attempt to compress the response using
1178 // the incoming message compression method.
1179 //
1180 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1181 if s.opts.cp != nil {
1182 ss.cp = s.opts.cp
1183 stream.SetSendCompress(s.opts.cp.Type())
1184 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1185 // Legacy compressor not specified; attempt to respond with same encoding.
1186 ss.comp = encoding.GetCompressor(rc)
1187 if ss.comp != nil {
1188 stream.SetSendCompress(rc)
1189 }
1190 }
1191
1192 if trInfo != nil {
1193 trInfo.tr.LazyLog(&trInfo.firstLine, false)
1194 defer func() {
1195 ss.mu.Lock()
1196 if err != nil && err != io.EOF {
1197 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1198 ss.trInfo.tr.SetError()
1199 }
1200 ss.trInfo.tr.Finish()
1201 ss.trInfo.tr = nil
1202 ss.mu.Unlock()
1203 }()
1204 }
1205 var appErr error
1206 var server interface{}
1207 if srv != nil {
1208 server = srv.server
1209 }
1210 if s.opts.streamInt == nil {
1211 appErr = sd.Handler(server, ss)
1212 } else {
1213 info := &StreamServerInfo{
1214 FullMethod: stream.Method(),
1215 IsClientStream: sd.ClientStreams,
1216 IsServerStream: sd.ServerStreams,
1217 }
1218 appErr = s.opts.streamInt(server, ss, info, sd.Handler)
1219 }
1220 if appErr != nil {
1221 appStatus, ok := status.FromError(appErr)
1222 if !ok {
1223 appStatus = status.New(codes.Unknown, appErr.Error())
1224 appErr = appStatus.Err()
1225 }
1226 if trInfo != nil {
1227 ss.mu.Lock()
1228 ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1229 ss.trInfo.tr.SetError()
1230 ss.mu.Unlock()
1231 }
1232 t.WriteStatus(ss.s, appStatus)
1233 if ss.binlog != nil {
1234 ss.binlog.Log(&binarylog.ServerTrailer{
1235 Trailer: ss.s.Trailer(),
1236 Err: appErr,
1237 })
1238 }
1239 // TODO: Should we log an error from WriteStatus here and below?
1240 return appErr
1241 }
1242 if trInfo != nil {
1243 ss.mu.Lock()
1244 ss.trInfo.tr.LazyLog(stringer("OK"), false)
1245 ss.mu.Unlock()
1246 }
1247 err = t.WriteStatus(ss.s, statusOK)
1248 if ss.binlog != nil {
1249 ss.binlog.Log(&binarylog.ServerTrailer{
1250 Trailer: ss.s.Trailer(),
1251 Err: appErr,
1252 })
1253 }
1254 return err
1255}
1256
1257func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
1258 sm := stream.Method()
1259 if sm != "" && sm[0] == '/' {
1260 sm = sm[1:]
1261 }
1262 pos := strings.LastIndex(sm, "/")
1263 if pos == -1 {
1264 if trInfo != nil {
1265 trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
1266 trInfo.tr.SetError()
1267 }
1268 errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
1269 if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
1270 if trInfo != nil {
1271 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1272 trInfo.tr.SetError()
1273 }
1274 grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
1275 }
1276 if trInfo != nil {
1277 trInfo.tr.Finish()
1278 }
1279 return
1280 }
1281 service := sm[:pos]
1282 method := sm[pos+1:]
1283
1284 srv, knownService := s.m[service]
1285 if knownService {
1286 if md, ok := srv.md[method]; ok {
1287 s.processUnaryRPC(t, stream, srv, md, trInfo)
1288 return
1289 }
1290 if sd, ok := srv.sd[method]; ok {
1291 s.processStreamingRPC(t, stream, srv, sd, trInfo)
1292 return
1293 }
1294 }
1295 // Unknown service, or known server unknown method.
1296 if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
1297 s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
1298 return
1299 }
1300 var errDesc string
1301 if !knownService {
1302 errDesc = fmt.Sprintf("unknown service %v", service)
1303 } else {
1304 errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
1305 }
1306 if trInfo != nil {
1307 trInfo.tr.LazyPrintf("%s", errDesc)
1308 trInfo.tr.SetError()
1309 }
1310 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1311 if trInfo != nil {
1312 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1313 trInfo.tr.SetError()
1314 }
1315 grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
1316 }
1317 if trInfo != nil {
1318 trInfo.tr.Finish()
1319 }
1320}
1321
1322// The key to save ServerTransportStream in the context.
1323type streamKey struct{}
1324
1325// NewContextWithServerTransportStream creates a new context from ctx and
1326// attaches stream to it.
1327//
1328// This API is EXPERIMENTAL.
1329func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
1330 return context.WithValue(ctx, streamKey{}, stream)
1331}
1332
1333// ServerTransportStream is a minimal interface that a transport stream must
1334// implement. This can be used to mock an actual transport stream for tests of
1335// handler code that use, for example, grpc.SetHeader (which requires some
1336// stream to be in context).
1337//
1338// See also NewContextWithServerTransportStream.
1339//
1340// This API is EXPERIMENTAL.
1341type ServerTransportStream interface {
1342 Method() string
1343 SetHeader(md metadata.MD) error
1344 SendHeader(md metadata.MD) error
1345 SetTrailer(md metadata.MD) error
1346}
1347
1348// ServerTransportStreamFromContext returns the ServerTransportStream saved in
1349// ctx. Returns nil if the given context has no stream associated with it
1350// (which implies it is not an RPC invocation context).
1351//
1352// This API is EXPERIMENTAL.
1353func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
1354 s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
1355 return s
1356}
1357
1358// Stop stops the gRPC server. It immediately closes all open
1359// connections and listeners.
1360// It cancels all active RPCs on the server side and the corresponding
1361// pending RPCs on the client side will get notified by connection
1362// errors.
1363func (s *Server) Stop() {
1364 s.quit.Fire()
1365
1366 defer func() {
1367 s.serveWG.Wait()
1368 s.done.Fire()
1369 }()
1370
1371 s.channelzRemoveOnce.Do(func() {
1372 if channelz.IsOn() {
1373 channelz.RemoveEntry(s.channelzID)
1374 }
1375 })
1376
1377 s.mu.Lock()
1378 listeners := s.lis
1379 s.lis = nil
1380 st := s.conns
1381 s.conns = nil
1382 // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
1383 s.cv.Broadcast()
1384 s.mu.Unlock()
1385
1386 for lis := range listeners {
1387 lis.Close()
1388 }
1389 for c := range st {
1390 c.Close()
1391 }
1392
1393 s.mu.Lock()
1394 if s.events != nil {
1395 s.events.Finish()
1396 s.events = nil
1397 }
1398 s.mu.Unlock()
1399}
1400
1401// GracefulStop stops the gRPC server gracefully. It stops the server from
1402// accepting new connections and RPCs and blocks until all the pending RPCs are
1403// finished.
1404func (s *Server) GracefulStop() {
1405 s.quit.Fire()
1406 defer s.done.Fire()
1407
1408 s.channelzRemoveOnce.Do(func() {
1409 if channelz.IsOn() {
1410 channelz.RemoveEntry(s.channelzID)
1411 }
1412 })
1413 s.mu.Lock()
1414 if s.conns == nil {
1415 s.mu.Unlock()
1416 return
1417 }
1418
1419 for lis := range s.lis {
1420 lis.Close()
1421 }
1422 s.lis = nil
1423 if !s.drain {
1424 for st := range s.conns {
1425 st.Drain()
1426 }
1427 s.drain = true
1428 }
1429
1430 // Wait for serving threads to be ready to exit. Only then can we be sure no
1431 // new conns will be created.
1432 s.mu.Unlock()
1433 s.serveWG.Wait()
1434 s.mu.Lock()
1435
1436 for len(s.conns) != 0 {
1437 s.cv.Wait()
1438 }
1439 s.conns = nil
1440 if s.events != nil {
1441 s.events.Finish()
1442 s.events = nil
1443 }
1444 s.mu.Unlock()
1445}
1446
1447// contentSubtype must be lowercase
1448// cannot return nil
1449func (s *Server) getCodec(contentSubtype string) baseCodec {
1450 if s.opts.codec != nil {
1451 return s.opts.codec
1452 }
1453 if contentSubtype == "" {
1454 return encoding.GetCodec(proto.Name)
1455 }
1456 codec := encoding.GetCodec(contentSubtype)
1457 if codec == nil {
1458 return encoding.GetCodec(proto.Name)
1459 }
1460 return codec
1461}
1462
1463// SetHeader sets the header metadata.
1464// When called multiple times, all the provided metadata will be merged.
1465// All the metadata will be sent out when one of the following happens:
1466// - grpc.SendHeader() is called;
1467// - The first response is sent out;
1468// - An RPC status is sent out (error or success).
1469func SetHeader(ctx context.Context, md metadata.MD) error {
1470 if md.Len() == 0 {
1471 return nil
1472 }
1473 stream := ServerTransportStreamFromContext(ctx)
1474 if stream == nil {
1475 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1476 }
1477 return stream.SetHeader(md)
1478}
1479
1480// SendHeader sends header metadata. It may be called at most once.
1481// The provided md and headers set by SetHeader() will be sent.
1482func SendHeader(ctx context.Context, md metadata.MD) error {
1483 stream := ServerTransportStreamFromContext(ctx)
1484 if stream == nil {
1485 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1486 }
1487 if err := stream.SendHeader(md); err != nil {
1488 return toRPCErr(err)
1489 }
1490 return nil
1491}
1492
1493// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
1494// When called more than once, all the provided metadata will be merged.
1495func SetTrailer(ctx context.Context, md metadata.MD) error {
1496 if md.Len() == 0 {
1497 return nil
1498 }
1499 stream := ServerTransportStreamFromContext(ctx)
1500 if stream == nil {
1501 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1502 }
1503 return stream.SetTrailer(md)
1504}
1505
1506// Method returns the method string for the server context. The returned
1507// string is in the format of "/service/method".
1508func Method(ctx context.Context) (string, bool) {
1509 s := ServerTransportStreamFromContext(ctx)
1510 if s == nil {
1511 return "", false
1512 }
1513 return s.Method(), true
1514}
1515
1516type channelzServer struct {
1517 s *Server
1518}
1519
1520func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
1521 return c.s.channelzMetric()
1522}