blob: 33272a47afdb4a280cb3d1c2c0e0d3318360901e [file] [log] [blame]
Scott Bakereee8dd82019-09-24 12:52:34 -07001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "fmt"
25 "io"
26 "math"
27 "net"
28 "net/http"
29 "reflect"
30 "runtime"
31 "strings"
32 "sync"
33 "sync/atomic"
34 "time"
35
36 "golang.org/x/net/trace"
37
38 "google.golang.org/grpc/codes"
39 "google.golang.org/grpc/credentials"
40 "google.golang.org/grpc/encoding"
41 "google.golang.org/grpc/encoding/proto"
42 "google.golang.org/grpc/grpclog"
43 "google.golang.org/grpc/internal/binarylog"
44 "google.golang.org/grpc/internal/channelz"
45 "google.golang.org/grpc/internal/transport"
46 "google.golang.org/grpc/keepalive"
47 "google.golang.org/grpc/metadata"
48 "google.golang.org/grpc/peer"
49 "google.golang.org/grpc/stats"
50 "google.golang.org/grpc/status"
51 "google.golang.org/grpc/tap"
52)
53
54const (
55 defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
56 defaultServerMaxSendMessageSize = math.MaxInt32
57)
58
59type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
60
61// MethodDesc represents an RPC service's method specification.
62type MethodDesc struct {
63 MethodName string
64 Handler methodHandler
65}
66
67// ServiceDesc represents an RPC service's specification.
68type ServiceDesc struct {
69 ServiceName string
70 // The pointer to the service interface. Used to check whether the user
71 // provided implementation satisfies the interface requirements.
72 HandlerType interface{}
73 Methods []MethodDesc
74 Streams []StreamDesc
75 Metadata interface{}
76}
77
78// service consists of the information of the server serving this service and
79// the methods in this service.
80type service struct {
81 server interface{} // the server for service methods
82 md map[string]*MethodDesc
83 sd map[string]*StreamDesc
84 mdata interface{}
85}
86
87// Server is a gRPC server to serve RPC requests.
88type Server struct {
89 opts options
90
91 mu sync.Mutex // guards following
92 lis map[net.Listener]bool
93 conns map[io.Closer]bool
94 serve bool
95 drain bool
96 cv *sync.Cond // signaled when connections close for GracefulStop
97 m map[string]*service // service name -> service info
98 events trace.EventLog
99
100 quit chan struct{}
101 done chan struct{}
102 quitOnce sync.Once
103 doneOnce sync.Once
104 channelzRemoveOnce sync.Once
105 serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
106
107 channelzID int64 // channelz unique identification number
108 czData *channelzData
109}
110
111type options struct {
112 creds credentials.TransportCredentials
113 codec baseCodec
114 cp Compressor
115 dc Decompressor
116 unaryInt UnaryServerInterceptor
117 streamInt StreamServerInterceptor
118 inTapHandle tap.ServerInHandle
119 statsHandler stats.Handler
120 maxConcurrentStreams uint32
121 maxReceiveMessageSize int
122 maxSendMessageSize int
123 unknownStreamDesc *StreamDesc
124 keepaliveParams keepalive.ServerParameters
125 keepalivePolicy keepalive.EnforcementPolicy
126 initialWindowSize int32
127 initialConnWindowSize int32
128 writeBufferSize int
129 readBufferSize int
130 connectionTimeout time.Duration
131 maxHeaderListSize *uint32
132}
133
134var defaultServerOptions = options{
135 maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
136 maxSendMessageSize: defaultServerMaxSendMessageSize,
137 connectionTimeout: 120 * time.Second,
138 writeBufferSize: defaultWriteBufSize,
139 readBufferSize: defaultReadBufSize,
140}
141
142// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
143type ServerOption func(*options)
144
145// WriteBufferSize determines how much data can be batched before doing a write on the wire.
146// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
147// The default value for this buffer is 32KB.
148// Zero will disable the write buffer such that each write will be on underlying connection.
149// Note: A Send call may not directly translate to a write.
150func WriteBufferSize(s int) ServerOption {
151 return func(o *options) {
152 o.writeBufferSize = s
153 }
154}
155
156// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
157// for one read syscall.
158// The default value for this buffer is 32KB.
159// Zero will disable read buffer for a connection so data framer can access the underlying
160// conn directly.
161func ReadBufferSize(s int) ServerOption {
162 return func(o *options) {
163 o.readBufferSize = s
164 }
165}
166
167// InitialWindowSize returns a ServerOption that sets window size for stream.
168// The lower bound for window size is 64K and any value smaller than that will be ignored.
169func InitialWindowSize(s int32) ServerOption {
170 return func(o *options) {
171 o.initialWindowSize = s
172 }
173}
174
175// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
176// The lower bound for window size is 64K and any value smaller than that will be ignored.
177func InitialConnWindowSize(s int32) ServerOption {
178 return func(o *options) {
179 o.initialConnWindowSize = s
180 }
181}
182
183// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
184func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
185 if kp.Time > 0 && kp.Time < time.Second {
186 grpclog.Warning("Adjusting keepalive ping interval to minimum period of 1s")
187 kp.Time = time.Second
188 }
189
190 return func(o *options) {
191 o.keepaliveParams = kp
192 }
193}
194
195// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
196func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
197 return func(o *options) {
198 o.keepalivePolicy = kep
199 }
200}
201
202// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
203//
204// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
205func CustomCodec(codec Codec) ServerOption {
206 return func(o *options) {
207 o.codec = codec
208 }
209}
210
211// RPCCompressor returns a ServerOption that sets a compressor for outbound
212// messages. For backward compatibility, all outbound messages will be sent
213// using this compressor, regardless of incoming message compression. By
214// default, server messages will be sent using the same compressor with which
215// request messages were sent.
216//
217// Deprecated: use encoding.RegisterCompressor instead.
218func RPCCompressor(cp Compressor) ServerOption {
219 return func(o *options) {
220 o.cp = cp
221 }
222}
223
224// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
225// messages. It has higher priority than decompressors registered via
226// encoding.RegisterCompressor.
227//
228// Deprecated: use encoding.RegisterCompressor instead.
229func RPCDecompressor(dc Decompressor) ServerOption {
230 return func(o *options) {
231 o.dc = dc
232 }
233}
234
235// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
236// If this is not set, gRPC uses the default limit.
237//
238// Deprecated: use MaxRecvMsgSize instead.
239func MaxMsgSize(m int) ServerOption {
240 return MaxRecvMsgSize(m)
241}
242
243// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
244// If this is not set, gRPC uses the default 4MB.
245func MaxRecvMsgSize(m int) ServerOption {
246 return func(o *options) {
247 o.maxReceiveMessageSize = m
248 }
249}
250
251// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
252// If this is not set, gRPC uses the default `math.MaxInt32`.
253func MaxSendMsgSize(m int) ServerOption {
254 return func(o *options) {
255 o.maxSendMessageSize = m
256 }
257}
258
259// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
260// of concurrent streams to each ServerTransport.
261func MaxConcurrentStreams(n uint32) ServerOption {
262 return func(o *options) {
263 o.maxConcurrentStreams = n
264 }
265}
266
267// Creds returns a ServerOption that sets credentials for server connections.
268func Creds(c credentials.TransportCredentials) ServerOption {
269 return func(o *options) {
270 o.creds = c
271 }
272}
273
274// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
275// server. Only one unary interceptor can be installed. The construction of multiple
276// interceptors (e.g., chaining) can be implemented at the caller.
277func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
278 return func(o *options) {
279 if o.unaryInt != nil {
280 panic("The unary server interceptor was already set and may not be reset.")
281 }
282 o.unaryInt = i
283 }
284}
285
286// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
287// server. Only one stream interceptor can be installed.
288func StreamInterceptor(i StreamServerInterceptor) ServerOption {
289 return func(o *options) {
290 if o.streamInt != nil {
291 panic("The stream server interceptor was already set and may not be reset.")
292 }
293 o.streamInt = i
294 }
295}
296
297// InTapHandle returns a ServerOption that sets the tap handle for all the server
298// transport to be created. Only one can be installed.
299func InTapHandle(h tap.ServerInHandle) ServerOption {
300 return func(o *options) {
301 if o.inTapHandle != nil {
302 panic("The tap handle was already set and may not be reset.")
303 }
304 o.inTapHandle = h
305 }
306}
307
308// StatsHandler returns a ServerOption that sets the stats handler for the server.
309func StatsHandler(h stats.Handler) ServerOption {
310 return func(o *options) {
311 o.statsHandler = h
312 }
313}
314
315// UnknownServiceHandler returns a ServerOption that allows for adding a custom
316// unknown service handler. The provided method is a bidi-streaming RPC service
317// handler that will be invoked instead of returning the "unimplemented" gRPC
318// error whenever a request is received for an unregistered service or method.
319// The handling function has full access to the Context of the request and the
320// stream, and the invocation bypasses interceptors.
321func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
322 return func(o *options) {
323 o.unknownStreamDesc = &StreamDesc{
324 StreamName: "unknown_service_handler",
325 Handler: streamHandler,
326 // We need to assume that the users of the streamHandler will want to use both.
327 ClientStreams: true,
328 ServerStreams: true,
329 }
330 }
331}
332
333// ConnectionTimeout returns a ServerOption that sets the timeout for
334// connection establishment (up to and including HTTP/2 handshaking) for all
335// new connections. If this is not set, the default is 120 seconds. A zero or
336// negative value will result in an immediate timeout.
337//
338// This API is EXPERIMENTAL.
339func ConnectionTimeout(d time.Duration) ServerOption {
340 return func(o *options) {
341 o.connectionTimeout = d
342 }
343}
344
345// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
346// of header list that the server is prepared to accept.
347func MaxHeaderListSize(s uint32) ServerOption {
348 return func(o *options) {
349 o.maxHeaderListSize = &s
350 }
351}
352
353// NewServer creates a gRPC server which has no service registered and has not
354// started to accept requests yet.
355func NewServer(opt ...ServerOption) *Server {
356 opts := defaultServerOptions
357 for _, o := range opt {
358 o(&opts)
359 }
360 s := &Server{
361 lis: make(map[net.Listener]bool),
362 opts: opts,
363 conns: make(map[io.Closer]bool),
364 m: make(map[string]*service),
365 quit: make(chan struct{}),
366 done: make(chan struct{}),
367 czData: new(channelzData),
368 }
369 s.cv = sync.NewCond(&s.mu)
370 if EnableTracing {
371 _, file, line, _ := runtime.Caller(1)
372 s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
373 }
374
375 if channelz.IsOn() {
376 s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
377 }
378 return s
379}
380
381// printf records an event in s's event log, unless s has been stopped.
382// REQUIRES s.mu is held.
383func (s *Server) printf(format string, a ...interface{}) {
384 if s.events != nil {
385 s.events.Printf(format, a...)
386 }
387}
388
389// errorf records an error in s's event log, unless s has been stopped.
390// REQUIRES s.mu is held.
391func (s *Server) errorf(format string, a ...interface{}) {
392 if s.events != nil {
393 s.events.Errorf(format, a...)
394 }
395}
396
397// RegisterService registers a service and its implementation to the gRPC
398// server. It is called from the IDL generated code. This must be called before
399// invoking Serve.
400func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
401 ht := reflect.TypeOf(sd.HandlerType).Elem()
402 st := reflect.TypeOf(ss)
403 if !st.Implements(ht) {
404 grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
405 }
406 s.register(sd, ss)
407}
408
409func (s *Server) register(sd *ServiceDesc, ss interface{}) {
410 s.mu.Lock()
411 defer s.mu.Unlock()
412 s.printf("RegisterService(%q)", sd.ServiceName)
413 if s.serve {
414 grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
415 }
416 if _, ok := s.m[sd.ServiceName]; ok {
417 grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
418 }
419 srv := &service{
420 server: ss,
421 md: make(map[string]*MethodDesc),
422 sd: make(map[string]*StreamDesc),
423 mdata: sd.Metadata,
424 }
425 for i := range sd.Methods {
426 d := &sd.Methods[i]
427 srv.md[d.MethodName] = d
428 }
429 for i := range sd.Streams {
430 d := &sd.Streams[i]
431 srv.sd[d.StreamName] = d
432 }
433 s.m[sd.ServiceName] = srv
434}
435
436// MethodInfo contains the information of an RPC including its method name and type.
437type MethodInfo struct {
438 // Name is the method name only, without the service name or package name.
439 Name string
440 // IsClientStream indicates whether the RPC is a client streaming RPC.
441 IsClientStream bool
442 // IsServerStream indicates whether the RPC is a server streaming RPC.
443 IsServerStream bool
444}
445
446// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
447type ServiceInfo struct {
448 Methods []MethodInfo
449 // Metadata is the metadata specified in ServiceDesc when registering service.
450 Metadata interface{}
451}
452
453// GetServiceInfo returns a map from service names to ServiceInfo.
454// Service names include the package names, in the form of <package>.<service>.
455func (s *Server) GetServiceInfo() map[string]ServiceInfo {
456 ret := make(map[string]ServiceInfo)
457 for n, srv := range s.m {
458 methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
459 for m := range srv.md {
460 methods = append(methods, MethodInfo{
461 Name: m,
462 IsClientStream: false,
463 IsServerStream: false,
464 })
465 }
466 for m, d := range srv.sd {
467 methods = append(methods, MethodInfo{
468 Name: m,
469 IsClientStream: d.ClientStreams,
470 IsServerStream: d.ServerStreams,
471 })
472 }
473
474 ret[n] = ServiceInfo{
475 Methods: methods,
476 Metadata: srv.mdata,
477 }
478 }
479 return ret
480}
481
482// ErrServerStopped indicates that the operation is now illegal because of
483// the server being stopped.
484var ErrServerStopped = errors.New("grpc: the server has been stopped")
485
486func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
487 if s.opts.creds == nil {
488 return rawConn, nil, nil
489 }
490 return s.opts.creds.ServerHandshake(rawConn)
491}
492
493type listenSocket struct {
494 net.Listener
495 channelzID int64
496}
497
498func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
499 return &channelz.SocketInternalMetric{
500 SocketOptions: channelz.GetSocketOption(l.Listener),
501 LocalAddr: l.Listener.Addr(),
502 }
503}
504
505func (l *listenSocket) Close() error {
506 err := l.Listener.Close()
507 if channelz.IsOn() {
508 channelz.RemoveEntry(l.channelzID)
509 }
510 return err
511}
512
513// Serve accepts incoming connections on the listener lis, creating a new
514// ServerTransport and service goroutine for each. The service goroutines
515// read gRPC requests and then call the registered handlers to reply to them.
516// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
517// this method returns.
518// Serve will return a non-nil error unless Stop or GracefulStop is called.
519func (s *Server) Serve(lis net.Listener) error {
520 s.mu.Lock()
521 s.printf("serving")
522 s.serve = true
523 if s.lis == nil {
524 // Serve called after Stop or GracefulStop.
525 s.mu.Unlock()
526 lis.Close()
527 return ErrServerStopped
528 }
529
530 s.serveWG.Add(1)
531 defer func() {
532 s.serveWG.Done()
533 select {
534 // Stop or GracefulStop called; block until done and return nil.
535 case <-s.quit:
536 <-s.done
537 default:
538 }
539 }()
540
541 ls := &listenSocket{Listener: lis}
542 s.lis[ls] = true
543
544 if channelz.IsOn() {
545 ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
546 }
547 s.mu.Unlock()
548
549 defer func() {
550 s.mu.Lock()
551 if s.lis != nil && s.lis[ls] {
552 ls.Close()
553 delete(s.lis, ls)
554 }
555 s.mu.Unlock()
556 }()
557
558 var tempDelay time.Duration // how long to sleep on accept failure
559
560 for {
561 rawConn, err := lis.Accept()
562 if err != nil {
563 if ne, ok := err.(interface {
564 Temporary() bool
565 }); ok && ne.Temporary() {
566 if tempDelay == 0 {
567 tempDelay = 5 * time.Millisecond
568 } else {
569 tempDelay *= 2
570 }
571 if max := 1 * time.Second; tempDelay > max {
572 tempDelay = max
573 }
574 s.mu.Lock()
575 s.printf("Accept error: %v; retrying in %v", err, tempDelay)
576 s.mu.Unlock()
577 timer := time.NewTimer(tempDelay)
578 select {
579 case <-timer.C:
580 case <-s.quit:
581 timer.Stop()
582 return nil
583 }
584 continue
585 }
586 s.mu.Lock()
587 s.printf("done serving; Accept = %v", err)
588 s.mu.Unlock()
589
590 select {
591 case <-s.quit:
592 return nil
593 default:
594 }
595 return err
596 }
597 tempDelay = 0
598 // Start a new goroutine to deal with rawConn so we don't stall this Accept
599 // loop goroutine.
600 //
601 // Make sure we account for the goroutine so GracefulStop doesn't nil out
602 // s.conns before this conn can be added.
603 s.serveWG.Add(1)
604 go func() {
605 s.handleRawConn(rawConn)
606 s.serveWG.Done()
607 }()
608 }
609}
610
611// handleRawConn forks a goroutine to handle a just-accepted connection that
612// has not had any I/O performed on it yet.
613func (s *Server) handleRawConn(rawConn net.Conn) {
614 rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
615 conn, authInfo, err := s.useTransportAuthenticator(rawConn)
616 if err != nil {
617 s.mu.Lock()
618 s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
619 s.mu.Unlock()
620 grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
621 // If serverHandshake returns ErrConnDispatched, keep rawConn open.
622 if err != credentials.ErrConnDispatched {
623 rawConn.Close()
624 }
625 rawConn.SetDeadline(time.Time{})
626 return
627 }
628
629 s.mu.Lock()
630 if s.conns == nil {
631 s.mu.Unlock()
632 conn.Close()
633 return
634 }
635 s.mu.Unlock()
636
637 // Finish handshaking (HTTP2)
638 st := s.newHTTP2Transport(conn, authInfo)
639 if st == nil {
640 return
641 }
642
643 rawConn.SetDeadline(time.Time{})
644 if !s.addConn(st) {
645 return
646 }
647 go func() {
648 s.serveStreams(st)
649 s.removeConn(st)
650 }()
651}
652
653// newHTTP2Transport sets up a http/2 transport (using the
654// gRPC http2 server transport in transport/http2_server.go).
655func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
656 config := &transport.ServerConfig{
657 MaxStreams: s.opts.maxConcurrentStreams,
658 AuthInfo: authInfo,
659 InTapHandle: s.opts.inTapHandle,
660 StatsHandler: s.opts.statsHandler,
661 KeepaliveParams: s.opts.keepaliveParams,
662 KeepalivePolicy: s.opts.keepalivePolicy,
663 InitialWindowSize: s.opts.initialWindowSize,
664 InitialConnWindowSize: s.opts.initialConnWindowSize,
665 WriteBufferSize: s.opts.writeBufferSize,
666 ReadBufferSize: s.opts.readBufferSize,
667 ChannelzParentID: s.channelzID,
668 MaxHeaderListSize: s.opts.maxHeaderListSize,
669 }
670 st, err := transport.NewServerTransport("http2", c, config)
671 if err != nil {
672 s.mu.Lock()
673 s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
674 s.mu.Unlock()
675 c.Close()
676 grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
677 return nil
678 }
679
680 return st
681}
682
683func (s *Server) serveStreams(st transport.ServerTransport) {
684 defer st.Close()
685 var wg sync.WaitGroup
686 st.HandleStreams(func(stream *transport.Stream) {
687 wg.Add(1)
688 go func() {
689 defer wg.Done()
690 s.handleStream(st, stream, s.traceInfo(st, stream))
691 }()
692 }, func(ctx context.Context, method string) context.Context {
693 if !EnableTracing {
694 return ctx
695 }
696 tr := trace.New("grpc.Recv."+methodFamily(method), method)
697 return trace.NewContext(ctx, tr)
698 })
699 wg.Wait()
700}
701
702var _ http.Handler = (*Server)(nil)
703
704// ServeHTTP implements the Go standard library's http.Handler
705// interface by responding to the gRPC request r, by looking up
706// the requested gRPC method in the gRPC server s.
707//
708// The provided HTTP request must have arrived on an HTTP/2
709// connection. When using the Go standard library's server,
710// practically this means that the Request must also have arrived
711// over TLS.
712//
713// To share one port (such as 443 for https) between gRPC and an
714// existing http.Handler, use a root http.Handler such as:
715//
716// if r.ProtoMajor == 2 && strings.HasPrefix(
717// r.Header.Get("Content-Type"), "application/grpc") {
718// grpcServer.ServeHTTP(w, r)
719// } else {
720// yourMux.ServeHTTP(w, r)
721// }
722//
723// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
724// separate from grpc-go's HTTP/2 server. Performance and features may vary
725// between the two paths. ServeHTTP does not support some gRPC features
726// available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
727// and subject to change.
728func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
729 st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
730 if err != nil {
731 http.Error(w, err.Error(), http.StatusInternalServerError)
732 return
733 }
734 if !s.addConn(st) {
735 return
736 }
737 defer s.removeConn(st)
738 s.serveStreams(st)
739}
740
741// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
742// If tracing is not enabled, it returns nil.
743func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
744 tr, ok := trace.FromContext(stream.Context())
745 if !ok {
746 return nil
747 }
748
749 trInfo = &traceInfo{
750 tr: tr,
751 }
752 trInfo.firstLine.client = false
753 trInfo.firstLine.remoteAddr = st.RemoteAddr()
754
755 if dl, ok := stream.Context().Deadline(); ok {
756 trInfo.firstLine.deadline = time.Until(dl)
757 }
758 return trInfo
759}
760
761func (s *Server) addConn(c io.Closer) bool {
762 s.mu.Lock()
763 defer s.mu.Unlock()
764 if s.conns == nil {
765 c.Close()
766 return false
767 }
768 if s.drain {
769 // Transport added after we drained our existing conns: drain it
770 // immediately.
771 c.(transport.ServerTransport).Drain()
772 }
773 s.conns[c] = true
774 return true
775}
776
777func (s *Server) removeConn(c io.Closer) {
778 s.mu.Lock()
779 defer s.mu.Unlock()
780 if s.conns != nil {
781 delete(s.conns, c)
782 s.cv.Broadcast()
783 }
784}
785
786func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
787 return &channelz.ServerInternalMetric{
788 CallsStarted: atomic.LoadInt64(&s.czData.callsStarted),
789 CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded),
790 CallsFailed: atomic.LoadInt64(&s.czData.callsFailed),
791 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
792 }
793}
794
795func (s *Server) incrCallsStarted() {
796 atomic.AddInt64(&s.czData.callsStarted, 1)
797 atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
798}
799
800func (s *Server) incrCallsSucceeded() {
801 atomic.AddInt64(&s.czData.callsSucceeded, 1)
802}
803
804func (s *Server) incrCallsFailed() {
805 atomic.AddInt64(&s.czData.callsFailed, 1)
806}
807
808func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
809 data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
810 if err != nil {
811 grpclog.Errorln("grpc: server failed to encode response: ", err)
812 return err
813 }
814 compData, err := compress(data, cp, comp)
815 if err != nil {
816 grpclog.Errorln("grpc: server failed to compress response: ", err)
817 return err
818 }
819 hdr, payload := msgHeader(data, compData)
820 // TODO(dfawley): should we be checking len(data) instead?
821 if len(payload) > s.opts.maxSendMessageSize {
822 return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
823 }
824 err = t.Write(stream, hdr, payload, opts)
825 if err == nil && s.opts.statsHandler != nil {
826 s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
827 }
828 return err
829}
830
831func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
832 if channelz.IsOn() {
833 s.incrCallsStarted()
834 defer func() {
835 if err != nil && err != io.EOF {
836 s.incrCallsFailed()
837 } else {
838 s.incrCallsSucceeded()
839 }
840 }()
841 }
842 sh := s.opts.statsHandler
843 if sh != nil {
844 beginTime := time.Now()
845 begin := &stats.Begin{
846 BeginTime: beginTime,
847 }
848 sh.HandleRPC(stream.Context(), begin)
849 defer func() {
850 end := &stats.End{
851 BeginTime: beginTime,
852 EndTime: time.Now(),
853 }
854 if err != nil && err != io.EOF {
855 end.Error = toRPCErr(err)
856 }
857 sh.HandleRPC(stream.Context(), end)
858 }()
859 }
860 if trInfo != nil {
861 defer trInfo.tr.Finish()
862 trInfo.firstLine.client = false
863 trInfo.tr.LazyLog(&trInfo.firstLine, false)
864 defer func() {
865 if err != nil && err != io.EOF {
866 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
867 trInfo.tr.SetError()
868 }
869 }()
870 }
871
872 binlog := binarylog.GetMethodLogger(stream.Method())
873 if binlog != nil {
874 ctx := stream.Context()
875 md, _ := metadata.FromIncomingContext(ctx)
876 logEntry := &binarylog.ClientHeader{
877 Header: md,
878 MethodName: stream.Method(),
879 PeerAddr: nil,
880 }
881 if deadline, ok := ctx.Deadline(); ok {
882 logEntry.Timeout = time.Until(deadline)
883 if logEntry.Timeout < 0 {
884 logEntry.Timeout = 0
885 }
886 }
887 if a := md[":authority"]; len(a) > 0 {
888 logEntry.Authority = a[0]
889 }
890 if peer, ok := peer.FromContext(ctx); ok {
891 logEntry.PeerAddr = peer.Addr
892 }
893 binlog.Log(logEntry)
894 }
895
896 // comp and cp are used for compression. decomp and dc are used for
897 // decompression. If comp and decomp are both set, they are the same;
898 // however they are kept separate to ensure that at most one of the
899 // compressor/decompressor variable pairs are set for use later.
900 var comp, decomp encoding.Compressor
901 var cp Compressor
902 var dc Decompressor
903
904 // If dc is set and matches the stream's compression, use it. Otherwise, try
905 // to find a matching registered compressor for decomp.
906 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
907 dc = s.opts.dc
908 } else if rc != "" && rc != encoding.Identity {
909 decomp = encoding.GetCompressor(rc)
910 if decomp == nil {
911 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
912 t.WriteStatus(stream, st)
913 return st.Err()
914 }
915 }
916
917 // If cp is set, use it. Otherwise, attempt to compress the response using
918 // the incoming message compression method.
919 //
920 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
921 if s.opts.cp != nil {
922 cp = s.opts.cp
923 stream.SetSendCompress(cp.Type())
924 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
925 // Legacy compressor not specified; attempt to respond with same encoding.
926 comp = encoding.GetCompressor(rc)
927 if comp != nil {
928 stream.SetSendCompress(rc)
929 }
930 }
931
932 var payInfo *payloadInfo
933 if sh != nil || binlog != nil {
934 payInfo = &payloadInfo{}
935 }
936 d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
937 if err != nil {
938 if st, ok := status.FromError(err); ok {
939 if e := t.WriteStatus(stream, st); e != nil {
940 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
941 }
942 }
943 return err
944 }
945 if channelz.IsOn() {
946 t.IncrMsgRecv()
947 }
948 df := func(v interface{}) error {
949 if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
950 return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
951 }
952 if sh != nil {
953 sh.HandleRPC(stream.Context(), &stats.InPayload{
954 RecvTime: time.Now(),
955 Payload: v,
956 Data: d,
957 Length: len(d),
958 })
959 }
960 if binlog != nil {
961 binlog.Log(&binarylog.ClientMessage{
962 Message: d,
963 })
964 }
965 if trInfo != nil {
966 trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
967 }
968 return nil
969 }
970 ctx := NewContextWithServerTransportStream(stream.Context(), stream)
971 reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
972 if appErr != nil {
973 appStatus, ok := status.FromError(appErr)
974 if !ok {
975 // Convert appErr if it is not a grpc status error.
976 appErr = status.Error(codes.Unknown, appErr.Error())
977 appStatus, _ = status.FromError(appErr)
978 }
979 if trInfo != nil {
980 trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
981 trInfo.tr.SetError()
982 }
983 if e := t.WriteStatus(stream, appStatus); e != nil {
984 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
985 }
986 if binlog != nil {
987 if h, _ := stream.Header(); h.Len() > 0 {
988 // Only log serverHeader if there was header. Otherwise it can
989 // be trailer only.
990 binlog.Log(&binarylog.ServerHeader{
991 Header: h,
992 })
993 }
994 binlog.Log(&binarylog.ServerTrailer{
995 Trailer: stream.Trailer(),
996 Err: appErr,
997 })
998 }
999 return appErr
1000 }
1001 if trInfo != nil {
1002 trInfo.tr.LazyLog(stringer("OK"), false)
1003 }
1004 opts := &transport.Options{Last: true}
1005
1006 if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
1007 if err == io.EOF {
1008 // The entire stream is done (for unary RPC only).
1009 return err
1010 }
1011 if s, ok := status.FromError(err); ok {
1012 if e := t.WriteStatus(stream, s); e != nil {
1013 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
1014 }
1015 } else {
1016 switch st := err.(type) {
1017 case transport.ConnectionError:
1018 // Nothing to do here.
1019 default:
1020 panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
1021 }
1022 }
1023 if binlog != nil {
1024 h, _ := stream.Header()
1025 binlog.Log(&binarylog.ServerHeader{
1026 Header: h,
1027 })
1028 binlog.Log(&binarylog.ServerTrailer{
1029 Trailer: stream.Trailer(),
1030 Err: appErr,
1031 })
1032 }
1033 return err
1034 }
1035 if binlog != nil {
1036 h, _ := stream.Header()
1037 binlog.Log(&binarylog.ServerHeader{
1038 Header: h,
1039 })
1040 binlog.Log(&binarylog.ServerMessage{
1041 Message: reply,
1042 })
1043 }
1044 if channelz.IsOn() {
1045 t.IncrMsgSent()
1046 }
1047 if trInfo != nil {
1048 trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
1049 }
1050 // TODO: Should we be logging if writing status failed here, like above?
1051 // Should the logging be in WriteStatus? Should we ignore the WriteStatus
1052 // error or allow the stats handler to see it?
1053 err = t.WriteStatus(stream, status.New(codes.OK, ""))
1054 if binlog != nil {
1055 binlog.Log(&binarylog.ServerTrailer{
1056 Trailer: stream.Trailer(),
1057 Err: appErr,
1058 })
1059 }
1060 return err
1061}
1062
1063func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
1064 if channelz.IsOn() {
1065 s.incrCallsStarted()
1066 defer func() {
1067 if err != nil && err != io.EOF {
1068 s.incrCallsFailed()
1069 } else {
1070 s.incrCallsSucceeded()
1071 }
1072 }()
1073 }
1074 sh := s.opts.statsHandler
1075 if sh != nil {
1076 beginTime := time.Now()
1077 begin := &stats.Begin{
1078 BeginTime: beginTime,
1079 }
1080 sh.HandleRPC(stream.Context(), begin)
1081 defer func() {
1082 end := &stats.End{
1083 BeginTime: beginTime,
1084 EndTime: time.Now(),
1085 }
1086 if err != nil && err != io.EOF {
1087 end.Error = toRPCErr(err)
1088 }
1089 sh.HandleRPC(stream.Context(), end)
1090 }()
1091 }
1092 ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1093 ss := &serverStream{
1094 ctx: ctx,
1095 t: t,
1096 s: stream,
1097 p: &parser{r: stream},
1098 codec: s.getCodec(stream.ContentSubtype()),
1099 maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
1100 maxSendMessageSize: s.opts.maxSendMessageSize,
1101 trInfo: trInfo,
1102 statsHandler: sh,
1103 }
1104
1105 ss.binlog = binarylog.GetMethodLogger(stream.Method())
1106 if ss.binlog != nil {
1107 md, _ := metadata.FromIncomingContext(ctx)
1108 logEntry := &binarylog.ClientHeader{
1109 Header: md,
1110 MethodName: stream.Method(),
1111 PeerAddr: nil,
1112 }
1113 if deadline, ok := ctx.Deadline(); ok {
1114 logEntry.Timeout = time.Until(deadline)
1115 if logEntry.Timeout < 0 {
1116 logEntry.Timeout = 0
1117 }
1118 }
1119 if a := md[":authority"]; len(a) > 0 {
1120 logEntry.Authority = a[0]
1121 }
1122 if peer, ok := peer.FromContext(ss.Context()); ok {
1123 logEntry.PeerAddr = peer.Addr
1124 }
1125 ss.binlog.Log(logEntry)
1126 }
1127
1128 // If dc is set and matches the stream's compression, use it. Otherwise, try
1129 // to find a matching registered compressor for decomp.
1130 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1131 ss.dc = s.opts.dc
1132 } else if rc != "" && rc != encoding.Identity {
1133 ss.decomp = encoding.GetCompressor(rc)
1134 if ss.decomp == nil {
1135 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1136 t.WriteStatus(ss.s, st)
1137 return st.Err()
1138 }
1139 }
1140
1141 // If cp is set, use it. Otherwise, attempt to compress the response using
1142 // the incoming message compression method.
1143 //
1144 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1145 if s.opts.cp != nil {
1146 ss.cp = s.opts.cp
1147 stream.SetSendCompress(s.opts.cp.Type())
1148 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1149 // Legacy compressor not specified; attempt to respond with same encoding.
1150 ss.comp = encoding.GetCompressor(rc)
1151 if ss.comp != nil {
1152 stream.SetSendCompress(rc)
1153 }
1154 }
1155
1156 if trInfo != nil {
1157 trInfo.tr.LazyLog(&trInfo.firstLine, false)
1158 defer func() {
1159 ss.mu.Lock()
1160 if err != nil && err != io.EOF {
1161 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1162 ss.trInfo.tr.SetError()
1163 }
1164 ss.trInfo.tr.Finish()
1165 ss.trInfo.tr = nil
1166 ss.mu.Unlock()
1167 }()
1168 }
1169 var appErr error
1170 var server interface{}
1171 if srv != nil {
1172 server = srv.server
1173 }
1174 if s.opts.streamInt == nil {
1175 appErr = sd.Handler(server, ss)
1176 } else {
1177 info := &StreamServerInfo{
1178 FullMethod: stream.Method(),
1179 IsClientStream: sd.ClientStreams,
1180 IsServerStream: sd.ServerStreams,
1181 }
1182 appErr = s.opts.streamInt(server, ss, info, sd.Handler)
1183 }
1184 if appErr != nil {
1185 appStatus, ok := status.FromError(appErr)
1186 if !ok {
1187 appStatus = status.New(codes.Unknown, appErr.Error())
1188 appErr = appStatus.Err()
1189 }
1190 if trInfo != nil {
1191 ss.mu.Lock()
1192 ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1193 ss.trInfo.tr.SetError()
1194 ss.mu.Unlock()
1195 }
1196 t.WriteStatus(ss.s, appStatus)
1197 if ss.binlog != nil {
1198 ss.binlog.Log(&binarylog.ServerTrailer{
1199 Trailer: ss.s.Trailer(),
1200 Err: appErr,
1201 })
1202 }
1203 // TODO: Should we log an error from WriteStatus here and below?
1204 return appErr
1205 }
1206 if trInfo != nil {
1207 ss.mu.Lock()
1208 ss.trInfo.tr.LazyLog(stringer("OK"), false)
1209 ss.mu.Unlock()
1210 }
1211 err = t.WriteStatus(ss.s, status.New(codes.OK, ""))
1212 if ss.binlog != nil {
1213 ss.binlog.Log(&binarylog.ServerTrailer{
1214 Trailer: ss.s.Trailer(),
1215 Err: appErr,
1216 })
1217 }
1218 return err
1219}
1220
1221func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
1222 sm := stream.Method()
1223 if sm != "" && sm[0] == '/' {
1224 sm = sm[1:]
1225 }
1226 pos := strings.LastIndex(sm, "/")
1227 if pos == -1 {
1228 if trInfo != nil {
1229 trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
1230 trInfo.tr.SetError()
1231 }
1232 errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
1233 if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
1234 if trInfo != nil {
1235 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1236 trInfo.tr.SetError()
1237 }
1238 grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
1239 }
1240 if trInfo != nil {
1241 trInfo.tr.Finish()
1242 }
1243 return
1244 }
1245 service := sm[:pos]
1246 method := sm[pos+1:]
1247
1248 if srv, ok := s.m[service]; ok {
1249 if md, ok := srv.md[method]; ok {
1250 s.processUnaryRPC(t, stream, srv, md, trInfo)
1251 return
1252 }
1253 if sd, ok := srv.sd[method]; ok {
1254 s.processStreamingRPC(t, stream, srv, sd, trInfo)
1255 return
1256 }
1257 }
1258 // Unknown service, or known server unknown method.
1259 if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
1260 s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
1261 return
1262 }
1263 if trInfo != nil {
1264 trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
1265 trInfo.tr.SetError()
1266 }
1267 errDesc := fmt.Sprintf("unknown service %v", service)
1268 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1269 if trInfo != nil {
1270 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1271 trInfo.tr.SetError()
1272 }
1273 grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
1274 }
1275 if trInfo != nil {
1276 trInfo.tr.Finish()
1277 }
1278}
1279
1280// The key to save ServerTransportStream in the context.
1281type streamKey struct{}
1282
1283// NewContextWithServerTransportStream creates a new context from ctx and
1284// attaches stream to it.
1285//
1286// This API is EXPERIMENTAL.
1287func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
1288 return context.WithValue(ctx, streamKey{}, stream)
1289}
1290
1291// ServerTransportStream is a minimal interface that a transport stream must
1292// implement. This can be used to mock an actual transport stream for tests of
1293// handler code that use, for example, grpc.SetHeader (which requires some
1294// stream to be in context).
1295//
1296// See also NewContextWithServerTransportStream.
1297//
1298// This API is EXPERIMENTAL.
1299type ServerTransportStream interface {
1300 Method() string
1301 SetHeader(md metadata.MD) error
1302 SendHeader(md metadata.MD) error
1303 SetTrailer(md metadata.MD) error
1304}
1305
1306// ServerTransportStreamFromContext returns the ServerTransportStream saved in
1307// ctx. Returns nil if the given context has no stream associated with it
1308// (which implies it is not an RPC invocation context).
1309//
1310// This API is EXPERIMENTAL.
1311func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
1312 s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
1313 return s
1314}
1315
1316// Stop stops the gRPC server. It immediately closes all open
1317// connections and listeners.
1318// It cancels all active RPCs on the server side and the corresponding
1319// pending RPCs on the client side will get notified by connection
1320// errors.
1321func (s *Server) Stop() {
1322 s.quitOnce.Do(func() {
1323 close(s.quit)
1324 })
1325
1326 defer func() {
1327 s.serveWG.Wait()
1328 s.doneOnce.Do(func() {
1329 close(s.done)
1330 })
1331 }()
1332
1333 s.channelzRemoveOnce.Do(func() {
1334 if channelz.IsOn() {
1335 channelz.RemoveEntry(s.channelzID)
1336 }
1337 })
1338
1339 s.mu.Lock()
1340 listeners := s.lis
1341 s.lis = nil
1342 st := s.conns
1343 s.conns = nil
1344 // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
1345 s.cv.Broadcast()
1346 s.mu.Unlock()
1347
1348 for lis := range listeners {
1349 lis.Close()
1350 }
1351 for c := range st {
1352 c.Close()
1353 }
1354
1355 s.mu.Lock()
1356 if s.events != nil {
1357 s.events.Finish()
1358 s.events = nil
1359 }
1360 s.mu.Unlock()
1361}
1362
1363// GracefulStop stops the gRPC server gracefully. It stops the server from
1364// accepting new connections and RPCs and blocks until all the pending RPCs are
1365// finished.
1366func (s *Server) GracefulStop() {
1367 s.quitOnce.Do(func() {
1368 close(s.quit)
1369 })
1370
1371 defer func() {
1372 s.doneOnce.Do(func() {
1373 close(s.done)
1374 })
1375 }()
1376
1377 s.channelzRemoveOnce.Do(func() {
1378 if channelz.IsOn() {
1379 channelz.RemoveEntry(s.channelzID)
1380 }
1381 })
1382 s.mu.Lock()
1383 if s.conns == nil {
1384 s.mu.Unlock()
1385 return
1386 }
1387
1388 for lis := range s.lis {
1389 lis.Close()
1390 }
1391 s.lis = nil
1392 if !s.drain {
1393 for c := range s.conns {
1394 c.(transport.ServerTransport).Drain()
1395 }
1396 s.drain = true
1397 }
1398
1399 // Wait for serving threads to be ready to exit. Only then can we be sure no
1400 // new conns will be created.
1401 s.mu.Unlock()
1402 s.serveWG.Wait()
1403 s.mu.Lock()
1404
1405 for len(s.conns) != 0 {
1406 s.cv.Wait()
1407 }
1408 s.conns = nil
1409 if s.events != nil {
1410 s.events.Finish()
1411 s.events = nil
1412 }
1413 s.mu.Unlock()
1414}
1415
1416// contentSubtype must be lowercase
1417// cannot return nil
1418func (s *Server) getCodec(contentSubtype string) baseCodec {
1419 if s.opts.codec != nil {
1420 return s.opts.codec
1421 }
1422 if contentSubtype == "" {
1423 return encoding.GetCodec(proto.Name)
1424 }
1425 codec := encoding.GetCodec(contentSubtype)
1426 if codec == nil {
1427 return encoding.GetCodec(proto.Name)
1428 }
1429 return codec
1430}
1431
1432// SetHeader sets the header metadata.
1433// When called multiple times, all the provided metadata will be merged.
1434// All the metadata will be sent out when one of the following happens:
1435// - grpc.SendHeader() is called;
1436// - The first response is sent out;
1437// - An RPC status is sent out (error or success).
1438func SetHeader(ctx context.Context, md metadata.MD) error {
1439 if md.Len() == 0 {
1440 return nil
1441 }
1442 stream := ServerTransportStreamFromContext(ctx)
1443 if stream == nil {
1444 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1445 }
1446 return stream.SetHeader(md)
1447}
1448
1449// SendHeader sends header metadata. It may be called at most once.
1450// The provided md and headers set by SetHeader() will be sent.
1451func SendHeader(ctx context.Context, md metadata.MD) error {
1452 stream := ServerTransportStreamFromContext(ctx)
1453 if stream == nil {
1454 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1455 }
1456 if err := stream.SendHeader(md); err != nil {
1457 return toRPCErr(err)
1458 }
1459 return nil
1460}
1461
1462// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
1463// When called more than once, all the provided metadata will be merged.
1464func SetTrailer(ctx context.Context, md metadata.MD) error {
1465 if md.Len() == 0 {
1466 return nil
1467 }
1468 stream := ServerTransportStreamFromContext(ctx)
1469 if stream == nil {
1470 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1471 }
1472 return stream.SetTrailer(md)
1473}
1474
1475// Method returns the method string for the server context. The returned
1476// string is in the format of "/service/method".
1477func Method(ctx context.Context) (string, bool) {
1478 s := ServerTransportStreamFromContext(ctx)
1479 if s == nil {
1480 return "", false
1481 }
1482 return s.Method(), true
1483}
1484
1485type channelzServer struct {
1486 s *Server
1487}
1488
1489func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
1490 return c.s.channelzMetric()
1491}