blob: d705d7a80cd1c484829f40828cd219f2ee5cc92f [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "fmt"
25 "io"
26 "math"
27 "net"
28 "net/http"
29 "reflect"
30 "runtime"
31 "strings"
32 "sync"
33 "sync/atomic"
34 "time"
35
36 "golang.org/x/net/trace"
37
38 "google.golang.org/grpc/codes"
39 "google.golang.org/grpc/credentials"
40 "google.golang.org/grpc/encoding"
41 "google.golang.org/grpc/encoding/proto"
42 "google.golang.org/grpc/grpclog"
43 "google.golang.org/grpc/internal/binarylog"
44 "google.golang.org/grpc/internal/channelz"
45 "google.golang.org/grpc/internal/transport"
46 "google.golang.org/grpc/keepalive"
47 "google.golang.org/grpc/metadata"
48 "google.golang.org/grpc/peer"
49 "google.golang.org/grpc/stats"
50 "google.golang.org/grpc/status"
51 "google.golang.org/grpc/tap"
52)
53
54const (
55 defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
56 defaultServerMaxSendMessageSize = math.MaxInt32
57)
58
59type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
60
61// MethodDesc represents an RPC service's method specification.
62type MethodDesc struct {
63 MethodName string
64 Handler methodHandler
65}
66
67// ServiceDesc represents an RPC service's specification.
68type ServiceDesc struct {
69 ServiceName string
70 // The pointer to the service interface. Used to check whether the user
71 // provided implementation satisfies the interface requirements.
72 HandlerType interface{}
73 Methods []MethodDesc
74 Streams []StreamDesc
75 Metadata interface{}
76}
77
78// service consists of the information of the server serving this service and
79// the methods in this service.
80type service struct {
81 server interface{} // the server for service methods
82 md map[string]*MethodDesc
83 sd map[string]*StreamDesc
84 mdata interface{}
85}
86
87// Server is a gRPC server to serve RPC requests.
88type Server struct {
89 opts options
90
91 mu sync.Mutex // guards following
92 lis map[net.Listener]bool
93 conns map[io.Closer]bool
94 serve bool
95 drain bool
96 cv *sync.Cond // signaled when connections close for GracefulStop
97 m map[string]*service // service name -> service info
98 events trace.EventLog
99
100 quit chan struct{}
101 done chan struct{}
102 quitOnce sync.Once
103 doneOnce sync.Once
104 channelzRemoveOnce sync.Once
105 serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
106
107 channelzID int64 // channelz unique identification number
108 czData *channelzData
109}
110
111type options struct {
112 creds credentials.TransportCredentials
113 codec baseCodec
114 cp Compressor
115 dc Decompressor
116 unaryInt UnaryServerInterceptor
117 streamInt StreamServerInterceptor
118 inTapHandle tap.ServerInHandle
119 statsHandler stats.Handler
120 maxConcurrentStreams uint32
121 maxReceiveMessageSize int
122 maxSendMessageSize int
123 unknownStreamDesc *StreamDesc
124 keepaliveParams keepalive.ServerParameters
125 keepalivePolicy keepalive.EnforcementPolicy
126 initialWindowSize int32
127 initialConnWindowSize int32
128 writeBufferSize int
129 readBufferSize int
130 connectionTimeout time.Duration
131 maxHeaderListSize *uint32
132}
133
134var defaultServerOptions = options{
135 maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
136 maxSendMessageSize: defaultServerMaxSendMessageSize,
137 connectionTimeout: 120 * time.Second,
138 writeBufferSize: defaultWriteBufSize,
139 readBufferSize: defaultReadBufSize,
140}
141
142// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
143type ServerOption func(*options)
144
145// WriteBufferSize determines how much data can be batched before doing a write on the wire.
146// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
147// The default value for this buffer is 32KB.
148// Zero will disable the write buffer such that each write will be on underlying connection.
149// Note: A Send call may not directly translate to a write.
150func WriteBufferSize(s int) ServerOption {
151 return func(o *options) {
152 o.writeBufferSize = s
153 }
154}
155
156// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
157// for one read syscall.
158// The default value for this buffer is 32KB.
159// Zero will disable read buffer for a connection so data framer can access the underlying
160// conn directly.
161func ReadBufferSize(s int) ServerOption {
162 return func(o *options) {
163 o.readBufferSize = s
164 }
165}
166
167// InitialWindowSize returns a ServerOption that sets window size for stream.
168// The lower bound for window size is 64K and any value smaller than that will be ignored.
169func InitialWindowSize(s int32) ServerOption {
170 return func(o *options) {
171 o.initialWindowSize = s
172 }
173}
174
175// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
176// The lower bound for window size is 64K and any value smaller than that will be ignored.
177func InitialConnWindowSize(s int32) ServerOption {
178 return func(o *options) {
179 o.initialConnWindowSize = s
180 }
181}
182
183// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
184func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
185 return func(o *options) {
186 o.keepaliveParams = kp
187 }
188}
189
190// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
191func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
192 return func(o *options) {
193 o.keepalivePolicy = kep
194 }
195}
196
197// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
198//
199// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
200func CustomCodec(codec Codec) ServerOption {
201 return func(o *options) {
202 o.codec = codec
203 }
204}
205
206// RPCCompressor returns a ServerOption that sets a compressor for outbound
207// messages. For backward compatibility, all outbound messages will be sent
208// using this compressor, regardless of incoming message compression. By
209// default, server messages will be sent using the same compressor with which
210// request messages were sent.
211//
212// Deprecated: use encoding.RegisterCompressor instead.
213func RPCCompressor(cp Compressor) ServerOption {
214 return func(o *options) {
215 o.cp = cp
216 }
217}
218
219// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
220// messages. It has higher priority than decompressors registered via
221// encoding.RegisterCompressor.
222//
223// Deprecated: use encoding.RegisterCompressor instead.
224func RPCDecompressor(dc Decompressor) ServerOption {
225 return func(o *options) {
226 o.dc = dc
227 }
228}
229
230// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
231// If this is not set, gRPC uses the default limit.
232//
233// Deprecated: use MaxRecvMsgSize instead.
234func MaxMsgSize(m int) ServerOption {
235 return MaxRecvMsgSize(m)
236}
237
238// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
239// If this is not set, gRPC uses the default 4MB.
240func MaxRecvMsgSize(m int) ServerOption {
241 return func(o *options) {
242 o.maxReceiveMessageSize = m
243 }
244}
245
246// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
247// If this is not set, gRPC uses the default 4MB.
248func MaxSendMsgSize(m int) ServerOption {
249 return func(o *options) {
250 o.maxSendMessageSize = m
251 }
252}
253
254// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
255// of concurrent streams to each ServerTransport.
256func MaxConcurrentStreams(n uint32) ServerOption {
257 return func(o *options) {
258 o.maxConcurrentStreams = n
259 }
260}
261
262// Creds returns a ServerOption that sets credentials for server connections.
263func Creds(c credentials.TransportCredentials) ServerOption {
264 return func(o *options) {
265 o.creds = c
266 }
267}
268
269// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
270// server. Only one unary interceptor can be installed. The construction of multiple
271// interceptors (e.g., chaining) can be implemented at the caller.
272func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
273 return func(o *options) {
274 if o.unaryInt != nil {
275 panic("The unary server interceptor was already set and may not be reset.")
276 }
277 o.unaryInt = i
278 }
279}
280
281// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
282// server. Only one stream interceptor can be installed.
283func StreamInterceptor(i StreamServerInterceptor) ServerOption {
284 return func(o *options) {
285 if o.streamInt != nil {
286 panic("The stream server interceptor was already set and may not be reset.")
287 }
288 o.streamInt = i
289 }
290}
291
292// InTapHandle returns a ServerOption that sets the tap handle for all the server
293// transport to be created. Only one can be installed.
294func InTapHandle(h tap.ServerInHandle) ServerOption {
295 return func(o *options) {
296 if o.inTapHandle != nil {
297 panic("The tap handle was already set and may not be reset.")
298 }
299 o.inTapHandle = h
300 }
301}
302
303// StatsHandler returns a ServerOption that sets the stats handler for the server.
304func StatsHandler(h stats.Handler) ServerOption {
305 return func(o *options) {
306 o.statsHandler = h
307 }
308}
309
310// UnknownServiceHandler returns a ServerOption that allows for adding a custom
311// unknown service handler. The provided method is a bidi-streaming RPC service
312// handler that will be invoked instead of returning the "unimplemented" gRPC
313// error whenever a request is received for an unregistered service or method.
314// The handling function has full access to the Context of the request and the
315// stream, and the invocation bypasses interceptors.
316func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
317 return func(o *options) {
318 o.unknownStreamDesc = &StreamDesc{
319 StreamName: "unknown_service_handler",
320 Handler: streamHandler,
321 // We need to assume that the users of the streamHandler will want to use both.
322 ClientStreams: true,
323 ServerStreams: true,
324 }
325 }
326}
327
328// ConnectionTimeout returns a ServerOption that sets the timeout for
329// connection establishment (up to and including HTTP/2 handshaking) for all
330// new connections. If this is not set, the default is 120 seconds. A zero or
331// negative value will result in an immediate timeout.
332//
333// This API is EXPERIMENTAL.
334func ConnectionTimeout(d time.Duration) ServerOption {
335 return func(o *options) {
336 o.connectionTimeout = d
337 }
338}
339
340// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
341// of header list that the server is prepared to accept.
342func MaxHeaderListSize(s uint32) ServerOption {
343 return func(o *options) {
344 o.maxHeaderListSize = &s
345 }
346}
347
348// NewServer creates a gRPC server which has no service registered and has not
349// started to accept requests yet.
350func NewServer(opt ...ServerOption) *Server {
351 opts := defaultServerOptions
352 for _, o := range opt {
353 o(&opts)
354 }
355 s := &Server{
356 lis: make(map[net.Listener]bool),
357 opts: opts,
358 conns: make(map[io.Closer]bool),
359 m: make(map[string]*service),
360 quit: make(chan struct{}),
361 done: make(chan struct{}),
362 czData: new(channelzData),
363 }
364 s.cv = sync.NewCond(&s.mu)
365 if EnableTracing {
366 _, file, line, _ := runtime.Caller(1)
367 s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
368 }
369
370 if channelz.IsOn() {
371 s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
372 }
373 return s
374}
375
376// printf records an event in s's event log, unless s has been stopped.
377// REQUIRES s.mu is held.
378func (s *Server) printf(format string, a ...interface{}) {
379 if s.events != nil {
380 s.events.Printf(format, a...)
381 }
382}
383
384// errorf records an error in s's event log, unless s has been stopped.
385// REQUIRES s.mu is held.
386func (s *Server) errorf(format string, a ...interface{}) {
387 if s.events != nil {
388 s.events.Errorf(format, a...)
389 }
390}
391
392// RegisterService registers a service and its implementation to the gRPC
393// server. It is called from the IDL generated code. This must be called before
394// invoking Serve.
395func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
396 ht := reflect.TypeOf(sd.HandlerType).Elem()
397 st := reflect.TypeOf(ss)
398 if !st.Implements(ht) {
399 grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
400 }
401 s.register(sd, ss)
402}
403
404func (s *Server) register(sd *ServiceDesc, ss interface{}) {
405 s.mu.Lock()
406 defer s.mu.Unlock()
407 s.printf("RegisterService(%q)", sd.ServiceName)
408 if s.serve {
409 grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
410 }
411 if _, ok := s.m[sd.ServiceName]; ok {
412 grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
413 }
414 srv := &service{
415 server: ss,
416 md: make(map[string]*MethodDesc),
417 sd: make(map[string]*StreamDesc),
418 mdata: sd.Metadata,
419 }
420 for i := range sd.Methods {
421 d := &sd.Methods[i]
422 srv.md[d.MethodName] = d
423 }
424 for i := range sd.Streams {
425 d := &sd.Streams[i]
426 srv.sd[d.StreamName] = d
427 }
428 s.m[sd.ServiceName] = srv
429}
430
431// MethodInfo contains the information of an RPC including its method name and type.
432type MethodInfo struct {
433 // Name is the method name only, without the service name or package name.
434 Name string
435 // IsClientStream indicates whether the RPC is a client streaming RPC.
436 IsClientStream bool
437 // IsServerStream indicates whether the RPC is a server streaming RPC.
438 IsServerStream bool
439}
440
441// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
442type ServiceInfo struct {
443 Methods []MethodInfo
444 // Metadata is the metadata specified in ServiceDesc when registering service.
445 Metadata interface{}
446}
447
448// GetServiceInfo returns a map from service names to ServiceInfo.
449// Service names include the package names, in the form of <package>.<service>.
450func (s *Server) GetServiceInfo() map[string]ServiceInfo {
451 ret := make(map[string]ServiceInfo)
452 for n, srv := range s.m {
453 methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
454 for m := range srv.md {
455 methods = append(methods, MethodInfo{
456 Name: m,
457 IsClientStream: false,
458 IsServerStream: false,
459 })
460 }
461 for m, d := range srv.sd {
462 methods = append(methods, MethodInfo{
463 Name: m,
464 IsClientStream: d.ClientStreams,
465 IsServerStream: d.ServerStreams,
466 })
467 }
468
469 ret[n] = ServiceInfo{
470 Methods: methods,
471 Metadata: srv.mdata,
472 }
473 }
474 return ret
475}
476
477// ErrServerStopped indicates that the operation is now illegal because of
478// the server being stopped.
479var ErrServerStopped = errors.New("grpc: the server has been stopped")
480
481func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
482 if s.opts.creds == nil {
483 return rawConn, nil, nil
484 }
485 return s.opts.creds.ServerHandshake(rawConn)
486}
487
488type listenSocket struct {
489 net.Listener
490 channelzID int64
491}
492
493func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
494 return &channelz.SocketInternalMetric{
495 SocketOptions: channelz.GetSocketOption(l.Listener),
496 LocalAddr: l.Listener.Addr(),
497 }
498}
499
500func (l *listenSocket) Close() error {
501 err := l.Listener.Close()
502 if channelz.IsOn() {
503 channelz.RemoveEntry(l.channelzID)
504 }
505 return err
506}
507
508// Serve accepts incoming connections on the listener lis, creating a new
509// ServerTransport and service goroutine for each. The service goroutines
510// read gRPC requests and then call the registered handlers to reply to them.
511// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
512// this method returns.
513// Serve will return a non-nil error unless Stop or GracefulStop is called.
514func (s *Server) Serve(lis net.Listener) error {
515 s.mu.Lock()
516 s.printf("serving")
517 s.serve = true
518 if s.lis == nil {
519 // Serve called after Stop or GracefulStop.
520 s.mu.Unlock()
521 lis.Close()
522 return ErrServerStopped
523 }
524
525 s.serveWG.Add(1)
526 defer func() {
527 s.serveWG.Done()
528 select {
529 // Stop or GracefulStop called; block until done and return nil.
530 case <-s.quit:
531 <-s.done
532 default:
533 }
534 }()
535
536 ls := &listenSocket{Listener: lis}
537 s.lis[ls] = true
538
539 if channelz.IsOn() {
540 ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
541 }
542 s.mu.Unlock()
543
544 defer func() {
545 s.mu.Lock()
546 if s.lis != nil && s.lis[ls] {
547 ls.Close()
548 delete(s.lis, ls)
549 }
550 s.mu.Unlock()
551 }()
552
553 var tempDelay time.Duration // how long to sleep on accept failure
554
555 for {
556 rawConn, err := lis.Accept()
557 if err != nil {
558 if ne, ok := err.(interface {
559 Temporary() bool
560 }); ok && ne.Temporary() {
561 if tempDelay == 0 {
562 tempDelay = 5 * time.Millisecond
563 } else {
564 tempDelay *= 2
565 }
566 if max := 1 * time.Second; tempDelay > max {
567 tempDelay = max
568 }
569 s.mu.Lock()
570 s.printf("Accept error: %v; retrying in %v", err, tempDelay)
571 s.mu.Unlock()
572 timer := time.NewTimer(tempDelay)
573 select {
574 case <-timer.C:
575 case <-s.quit:
576 timer.Stop()
577 return nil
578 }
579 continue
580 }
581 s.mu.Lock()
582 s.printf("done serving; Accept = %v", err)
583 s.mu.Unlock()
584
585 select {
586 case <-s.quit:
587 return nil
588 default:
589 }
590 return err
591 }
592 tempDelay = 0
593 // Start a new goroutine to deal with rawConn so we don't stall this Accept
594 // loop goroutine.
595 //
596 // Make sure we account for the goroutine so GracefulStop doesn't nil out
597 // s.conns before this conn can be added.
598 s.serveWG.Add(1)
599 go func() {
600 s.handleRawConn(rawConn)
601 s.serveWG.Done()
602 }()
603 }
604}
605
606// handleRawConn forks a goroutine to handle a just-accepted connection that
607// has not had any I/O performed on it yet.
608func (s *Server) handleRawConn(rawConn net.Conn) {
609 rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
610 conn, authInfo, err := s.useTransportAuthenticator(rawConn)
611 if err != nil {
612 s.mu.Lock()
613 s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
614 s.mu.Unlock()
615 grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
616 // If serverHandshake returns ErrConnDispatched, keep rawConn open.
617 if err != credentials.ErrConnDispatched {
618 rawConn.Close()
619 }
620 rawConn.SetDeadline(time.Time{})
621 return
622 }
623
624 s.mu.Lock()
625 if s.conns == nil {
626 s.mu.Unlock()
627 conn.Close()
628 return
629 }
630 s.mu.Unlock()
631
632 // Finish handshaking (HTTP2)
633 st := s.newHTTP2Transport(conn, authInfo)
634 if st == nil {
635 return
636 }
637
638 rawConn.SetDeadline(time.Time{})
639 if !s.addConn(st) {
640 return
641 }
642 go func() {
643 s.serveStreams(st)
644 s.removeConn(st)
645 }()
646}
647
648// newHTTP2Transport sets up a http/2 transport (using the
649// gRPC http2 server transport in transport/http2_server.go).
650func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
651 config := &transport.ServerConfig{
652 MaxStreams: s.opts.maxConcurrentStreams,
653 AuthInfo: authInfo,
654 InTapHandle: s.opts.inTapHandle,
655 StatsHandler: s.opts.statsHandler,
656 KeepaliveParams: s.opts.keepaliveParams,
657 KeepalivePolicy: s.opts.keepalivePolicy,
658 InitialWindowSize: s.opts.initialWindowSize,
659 InitialConnWindowSize: s.opts.initialConnWindowSize,
660 WriteBufferSize: s.opts.writeBufferSize,
661 ReadBufferSize: s.opts.readBufferSize,
662 ChannelzParentID: s.channelzID,
663 MaxHeaderListSize: s.opts.maxHeaderListSize,
664 }
665 st, err := transport.NewServerTransport("http2", c, config)
666 if err != nil {
667 s.mu.Lock()
668 s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
669 s.mu.Unlock()
670 c.Close()
671 grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
672 return nil
673 }
674
675 return st
676}
677
678func (s *Server) serveStreams(st transport.ServerTransport) {
679 defer st.Close()
680 var wg sync.WaitGroup
681 st.HandleStreams(func(stream *transport.Stream) {
682 wg.Add(1)
683 go func() {
684 defer wg.Done()
685 s.handleStream(st, stream, s.traceInfo(st, stream))
686 }()
687 }, func(ctx context.Context, method string) context.Context {
688 if !EnableTracing {
689 return ctx
690 }
691 tr := trace.New("grpc.Recv."+methodFamily(method), method)
692 return trace.NewContext(ctx, tr)
693 })
694 wg.Wait()
695}
696
697var _ http.Handler = (*Server)(nil)
698
699// ServeHTTP implements the Go standard library's http.Handler
700// interface by responding to the gRPC request r, by looking up
701// the requested gRPC method in the gRPC server s.
702//
703// The provided HTTP request must have arrived on an HTTP/2
704// connection. When using the Go standard library's server,
705// practically this means that the Request must also have arrived
706// over TLS.
707//
708// To share one port (such as 443 for https) between gRPC and an
709// existing http.Handler, use a root http.Handler such as:
710//
711// if r.ProtoMajor == 2 && strings.HasPrefix(
712// r.Header.Get("Content-Type"), "application/grpc") {
713// grpcServer.ServeHTTP(w, r)
714// } else {
715// yourMux.ServeHTTP(w, r)
716// }
717//
718// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
719// separate from grpc-go's HTTP/2 server. Performance and features may vary
720// between the two paths. ServeHTTP does not support some gRPC features
721// available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
722// and subject to change.
723func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
724 st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
725 if err != nil {
726 http.Error(w, err.Error(), http.StatusInternalServerError)
727 return
728 }
729 if !s.addConn(st) {
730 return
731 }
732 defer s.removeConn(st)
733 s.serveStreams(st)
734}
735
736// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
737// If tracing is not enabled, it returns nil.
738func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
739 tr, ok := trace.FromContext(stream.Context())
740 if !ok {
741 return nil
742 }
743
744 trInfo = &traceInfo{
745 tr: tr,
746 }
747 trInfo.firstLine.client = false
748 trInfo.firstLine.remoteAddr = st.RemoteAddr()
749
750 if dl, ok := stream.Context().Deadline(); ok {
751 trInfo.firstLine.deadline = dl.Sub(time.Now())
752 }
753 return trInfo
754}
755
756func (s *Server) addConn(c io.Closer) bool {
757 s.mu.Lock()
758 defer s.mu.Unlock()
759 if s.conns == nil {
760 c.Close()
761 return false
762 }
763 if s.drain {
764 // Transport added after we drained our existing conns: drain it
765 // immediately.
766 c.(transport.ServerTransport).Drain()
767 }
768 s.conns[c] = true
769 return true
770}
771
772func (s *Server) removeConn(c io.Closer) {
773 s.mu.Lock()
774 defer s.mu.Unlock()
775 if s.conns != nil {
776 delete(s.conns, c)
777 s.cv.Broadcast()
778 }
779}
780
781func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
782 return &channelz.ServerInternalMetric{
783 CallsStarted: atomic.LoadInt64(&s.czData.callsStarted),
784 CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded),
785 CallsFailed: atomic.LoadInt64(&s.czData.callsFailed),
786 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
787 }
788}
789
790func (s *Server) incrCallsStarted() {
791 atomic.AddInt64(&s.czData.callsStarted, 1)
792 atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
793}
794
795func (s *Server) incrCallsSucceeded() {
796 atomic.AddInt64(&s.czData.callsSucceeded, 1)
797}
798
799func (s *Server) incrCallsFailed() {
800 atomic.AddInt64(&s.czData.callsFailed, 1)
801}
802
803func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
804 data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
805 if err != nil {
806 grpclog.Errorln("grpc: server failed to encode response: ", err)
807 return err
808 }
809 compData, err := compress(data, cp, comp)
810 if err != nil {
811 grpclog.Errorln("grpc: server failed to compress response: ", err)
812 return err
813 }
814 hdr, payload := msgHeader(data, compData)
815 // TODO(dfawley): should we be checking len(data) instead?
816 if len(payload) > s.opts.maxSendMessageSize {
817 return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
818 }
819 err = t.Write(stream, hdr, payload, opts)
820 if err == nil && s.opts.statsHandler != nil {
821 s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
822 }
823 return err
824}
825
826func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
827 if channelz.IsOn() {
828 s.incrCallsStarted()
829 defer func() {
830 if err != nil && err != io.EOF {
831 s.incrCallsFailed()
832 } else {
833 s.incrCallsSucceeded()
834 }
835 }()
836 }
837 sh := s.opts.statsHandler
838 if sh != nil {
839 beginTime := time.Now()
840 begin := &stats.Begin{
841 BeginTime: beginTime,
842 }
843 sh.HandleRPC(stream.Context(), begin)
844 defer func() {
845 end := &stats.End{
846 BeginTime: beginTime,
847 EndTime: time.Now(),
848 }
849 if err != nil && err != io.EOF {
850 end.Error = toRPCErr(err)
851 }
852 sh.HandleRPC(stream.Context(), end)
853 }()
854 }
855 if trInfo != nil {
856 defer trInfo.tr.Finish()
857 trInfo.firstLine.client = false
858 trInfo.tr.LazyLog(&trInfo.firstLine, false)
859 defer func() {
860 if err != nil && err != io.EOF {
861 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
862 trInfo.tr.SetError()
863 }
864 }()
865 }
866
867 binlog := binarylog.GetMethodLogger(stream.Method())
868 if binlog != nil {
869 ctx := stream.Context()
870 md, _ := metadata.FromIncomingContext(ctx)
871 logEntry := &binarylog.ClientHeader{
872 Header: md,
873 MethodName: stream.Method(),
874 PeerAddr: nil,
875 }
876 if deadline, ok := ctx.Deadline(); ok {
877 logEntry.Timeout = deadline.Sub(time.Now())
878 if logEntry.Timeout < 0 {
879 logEntry.Timeout = 0
880 }
881 }
882 if a := md[":authority"]; len(a) > 0 {
883 logEntry.Authority = a[0]
884 }
885 if peer, ok := peer.FromContext(ctx); ok {
886 logEntry.PeerAddr = peer.Addr
887 }
888 binlog.Log(logEntry)
889 }
890
891 // comp and cp are used for compression. decomp and dc are used for
892 // decompression. If comp and decomp are both set, they are the same;
893 // however they are kept separate to ensure that at most one of the
894 // compressor/decompressor variable pairs are set for use later.
895 var comp, decomp encoding.Compressor
896 var cp Compressor
897 var dc Decompressor
898
899 // If dc is set and matches the stream's compression, use it. Otherwise, try
900 // to find a matching registered compressor for decomp.
901 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
902 dc = s.opts.dc
903 } else if rc != "" && rc != encoding.Identity {
904 decomp = encoding.GetCompressor(rc)
905 if decomp == nil {
906 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
907 t.WriteStatus(stream, st)
908 return st.Err()
909 }
910 }
911
912 // If cp is set, use it. Otherwise, attempt to compress the response using
913 // the incoming message compression method.
914 //
915 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
916 if s.opts.cp != nil {
917 cp = s.opts.cp
918 stream.SetSendCompress(cp.Type())
919 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
920 // Legacy compressor not specified; attempt to respond with same encoding.
921 comp = encoding.GetCompressor(rc)
922 if comp != nil {
923 stream.SetSendCompress(rc)
924 }
925 }
926
927 var payInfo *payloadInfo
928 if sh != nil || binlog != nil {
929 payInfo = &payloadInfo{}
930 }
931 d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
932 if err != nil {
933 if st, ok := status.FromError(err); ok {
934 if e := t.WriteStatus(stream, st); e != nil {
935 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
936 }
937 }
938 return err
939 }
940 if channelz.IsOn() {
941 t.IncrMsgRecv()
942 }
943 df := func(v interface{}) error {
944 if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
945 return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
946 }
947 if sh != nil {
948 sh.HandleRPC(stream.Context(), &stats.InPayload{
949 RecvTime: time.Now(),
950 Payload: v,
951 Data: d,
952 Length: len(d),
953 })
954 }
955 if binlog != nil {
956 binlog.Log(&binarylog.ClientMessage{
957 Message: d,
958 })
959 }
960 if trInfo != nil {
961 trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
962 }
963 return nil
964 }
965 ctx := NewContextWithServerTransportStream(stream.Context(), stream)
966 reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
967 if appErr != nil {
968 appStatus, ok := status.FromError(appErr)
969 if !ok {
970 // Convert appErr if it is not a grpc status error.
971 appErr = status.Error(codes.Unknown, appErr.Error())
972 appStatus, _ = status.FromError(appErr)
973 }
974 if trInfo != nil {
975 trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
976 trInfo.tr.SetError()
977 }
978 if e := t.WriteStatus(stream, appStatus); e != nil {
979 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
980 }
981 if binlog != nil {
982 if h, _ := stream.Header(); h.Len() > 0 {
983 // Only log serverHeader if there was header. Otherwise it can
984 // be trailer only.
985 binlog.Log(&binarylog.ServerHeader{
986 Header: h,
987 })
988 }
989 binlog.Log(&binarylog.ServerTrailer{
990 Trailer: stream.Trailer(),
991 Err: appErr,
992 })
993 }
994 return appErr
995 }
996 if trInfo != nil {
997 trInfo.tr.LazyLog(stringer("OK"), false)
998 }
999 opts := &transport.Options{Last: true}
1000
1001 if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
1002 if err == io.EOF {
1003 // The entire stream is done (for unary RPC only).
1004 return err
1005 }
1006 if s, ok := status.FromError(err); ok {
1007 if e := t.WriteStatus(stream, s); e != nil {
1008 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
1009 }
1010 } else {
1011 switch st := err.(type) {
1012 case transport.ConnectionError:
1013 // Nothing to do here.
1014 default:
1015 panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
1016 }
1017 }
1018 if binlog != nil {
1019 h, _ := stream.Header()
1020 binlog.Log(&binarylog.ServerHeader{
1021 Header: h,
1022 })
1023 binlog.Log(&binarylog.ServerTrailer{
1024 Trailer: stream.Trailer(),
1025 Err: appErr,
1026 })
1027 }
1028 return err
1029 }
1030 if binlog != nil {
1031 h, _ := stream.Header()
1032 binlog.Log(&binarylog.ServerHeader{
1033 Header: h,
1034 })
1035 binlog.Log(&binarylog.ServerMessage{
1036 Message: reply,
1037 })
1038 }
1039 if channelz.IsOn() {
1040 t.IncrMsgSent()
1041 }
1042 if trInfo != nil {
1043 trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
1044 }
1045 // TODO: Should we be logging if writing status failed here, like above?
1046 // Should the logging be in WriteStatus? Should we ignore the WriteStatus
1047 // error or allow the stats handler to see it?
1048 err = t.WriteStatus(stream, status.New(codes.OK, ""))
1049 if binlog != nil {
1050 binlog.Log(&binarylog.ServerTrailer{
1051 Trailer: stream.Trailer(),
1052 Err: appErr,
1053 })
1054 }
1055 return err
1056}
1057
1058func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
1059 if channelz.IsOn() {
1060 s.incrCallsStarted()
1061 defer func() {
1062 if err != nil && err != io.EOF {
1063 s.incrCallsFailed()
1064 } else {
1065 s.incrCallsSucceeded()
1066 }
1067 }()
1068 }
1069 sh := s.opts.statsHandler
1070 if sh != nil {
1071 beginTime := time.Now()
1072 begin := &stats.Begin{
1073 BeginTime: beginTime,
1074 }
1075 sh.HandleRPC(stream.Context(), begin)
1076 defer func() {
1077 end := &stats.End{
1078 BeginTime: beginTime,
1079 EndTime: time.Now(),
1080 }
1081 if err != nil && err != io.EOF {
1082 end.Error = toRPCErr(err)
1083 }
1084 sh.HandleRPC(stream.Context(), end)
1085 }()
1086 }
1087 ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1088 ss := &serverStream{
1089 ctx: ctx,
1090 t: t,
1091 s: stream,
1092 p: &parser{r: stream},
1093 codec: s.getCodec(stream.ContentSubtype()),
1094 maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
1095 maxSendMessageSize: s.opts.maxSendMessageSize,
1096 trInfo: trInfo,
1097 statsHandler: sh,
1098 }
1099
1100 ss.binlog = binarylog.GetMethodLogger(stream.Method())
1101 if ss.binlog != nil {
1102 md, _ := metadata.FromIncomingContext(ctx)
1103 logEntry := &binarylog.ClientHeader{
1104 Header: md,
1105 MethodName: stream.Method(),
1106 PeerAddr: nil,
1107 }
1108 if deadline, ok := ctx.Deadline(); ok {
1109 logEntry.Timeout = deadline.Sub(time.Now())
1110 if logEntry.Timeout < 0 {
1111 logEntry.Timeout = 0
1112 }
1113 }
1114 if a := md[":authority"]; len(a) > 0 {
1115 logEntry.Authority = a[0]
1116 }
1117 if peer, ok := peer.FromContext(ss.Context()); ok {
1118 logEntry.PeerAddr = peer.Addr
1119 }
1120 ss.binlog.Log(logEntry)
1121 }
1122
1123 // If dc is set and matches the stream's compression, use it. Otherwise, try
1124 // to find a matching registered compressor for decomp.
1125 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1126 ss.dc = s.opts.dc
1127 } else if rc != "" && rc != encoding.Identity {
1128 ss.decomp = encoding.GetCompressor(rc)
1129 if ss.decomp == nil {
1130 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1131 t.WriteStatus(ss.s, st)
1132 return st.Err()
1133 }
1134 }
1135
1136 // If cp is set, use it. Otherwise, attempt to compress the response using
1137 // the incoming message compression method.
1138 //
1139 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1140 if s.opts.cp != nil {
1141 ss.cp = s.opts.cp
1142 stream.SetSendCompress(s.opts.cp.Type())
1143 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1144 // Legacy compressor not specified; attempt to respond with same encoding.
1145 ss.comp = encoding.GetCompressor(rc)
1146 if ss.comp != nil {
1147 stream.SetSendCompress(rc)
1148 }
1149 }
1150
1151 if trInfo != nil {
1152 trInfo.tr.LazyLog(&trInfo.firstLine, false)
1153 defer func() {
1154 ss.mu.Lock()
1155 if err != nil && err != io.EOF {
1156 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1157 ss.trInfo.tr.SetError()
1158 }
1159 ss.trInfo.tr.Finish()
1160 ss.trInfo.tr = nil
1161 ss.mu.Unlock()
1162 }()
1163 }
1164 var appErr error
1165 var server interface{}
1166 if srv != nil {
1167 server = srv.server
1168 }
1169 if s.opts.streamInt == nil {
1170 appErr = sd.Handler(server, ss)
1171 } else {
1172 info := &StreamServerInfo{
1173 FullMethod: stream.Method(),
1174 IsClientStream: sd.ClientStreams,
1175 IsServerStream: sd.ServerStreams,
1176 }
1177 appErr = s.opts.streamInt(server, ss, info, sd.Handler)
1178 }
1179 if appErr != nil {
1180 appStatus, ok := status.FromError(appErr)
1181 if !ok {
1182 appStatus = status.New(codes.Unknown, appErr.Error())
1183 appErr = appStatus.Err()
1184 }
1185 if trInfo != nil {
1186 ss.mu.Lock()
1187 ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1188 ss.trInfo.tr.SetError()
1189 ss.mu.Unlock()
1190 }
1191 t.WriteStatus(ss.s, appStatus)
1192 if ss.binlog != nil {
1193 ss.binlog.Log(&binarylog.ServerTrailer{
1194 Trailer: ss.s.Trailer(),
1195 Err: appErr,
1196 })
1197 }
1198 // TODO: Should we log an error from WriteStatus here and below?
1199 return appErr
1200 }
1201 if trInfo != nil {
1202 ss.mu.Lock()
1203 ss.trInfo.tr.LazyLog(stringer("OK"), false)
1204 ss.mu.Unlock()
1205 }
1206 err = t.WriteStatus(ss.s, status.New(codes.OK, ""))
1207 if ss.binlog != nil {
1208 ss.binlog.Log(&binarylog.ServerTrailer{
1209 Trailer: ss.s.Trailer(),
1210 Err: appErr,
1211 })
1212 }
1213 return err
1214}
1215
1216func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
1217 sm := stream.Method()
1218 if sm != "" && sm[0] == '/' {
1219 sm = sm[1:]
1220 }
1221 pos := strings.LastIndex(sm, "/")
1222 if pos == -1 {
1223 if trInfo != nil {
1224 trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
1225 trInfo.tr.SetError()
1226 }
1227 errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
1228 if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
1229 if trInfo != nil {
1230 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1231 trInfo.tr.SetError()
1232 }
1233 grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
1234 }
1235 if trInfo != nil {
1236 trInfo.tr.Finish()
1237 }
1238 return
1239 }
1240 service := sm[:pos]
1241 method := sm[pos+1:]
1242
1243 if srv, ok := s.m[service]; ok {
1244 if md, ok := srv.md[method]; ok {
1245 s.processUnaryRPC(t, stream, srv, md, trInfo)
1246 return
1247 }
1248 if sd, ok := srv.sd[method]; ok {
1249 s.processStreamingRPC(t, stream, srv, sd, trInfo)
1250 return
1251 }
1252 }
1253 // Unknown service, or known server unknown method.
1254 if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
1255 s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
1256 return
1257 }
1258 if trInfo != nil {
1259 trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
1260 trInfo.tr.SetError()
1261 }
1262 errDesc := fmt.Sprintf("unknown service %v", service)
1263 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1264 if trInfo != nil {
1265 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1266 trInfo.tr.SetError()
1267 }
1268 grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
1269 }
1270 if trInfo != nil {
1271 trInfo.tr.Finish()
1272 }
1273}
1274
1275// The key to save ServerTransportStream in the context.
1276type streamKey struct{}
1277
1278// NewContextWithServerTransportStream creates a new context from ctx and
1279// attaches stream to it.
1280//
1281// This API is EXPERIMENTAL.
1282func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
1283 return context.WithValue(ctx, streamKey{}, stream)
1284}
1285
1286// ServerTransportStream is a minimal interface that a transport stream must
1287// implement. This can be used to mock an actual transport stream for tests of
1288// handler code that use, for example, grpc.SetHeader (which requires some
1289// stream to be in context).
1290//
1291// See also NewContextWithServerTransportStream.
1292//
1293// This API is EXPERIMENTAL.
1294type ServerTransportStream interface {
1295 Method() string
1296 SetHeader(md metadata.MD) error
1297 SendHeader(md metadata.MD) error
1298 SetTrailer(md metadata.MD) error
1299}
1300
1301// ServerTransportStreamFromContext returns the ServerTransportStream saved in
1302// ctx. Returns nil if the given context has no stream associated with it
1303// (which implies it is not an RPC invocation context).
1304//
1305// This API is EXPERIMENTAL.
1306func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
1307 s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
1308 return s
1309}
1310
1311// Stop stops the gRPC server. It immediately closes all open
1312// connections and listeners.
1313// It cancels all active RPCs on the server side and the corresponding
1314// pending RPCs on the client side will get notified by connection
1315// errors.
1316func (s *Server) Stop() {
1317 s.quitOnce.Do(func() {
1318 close(s.quit)
1319 })
1320
1321 defer func() {
1322 s.serveWG.Wait()
1323 s.doneOnce.Do(func() {
1324 close(s.done)
1325 })
1326 }()
1327
1328 s.channelzRemoveOnce.Do(func() {
1329 if channelz.IsOn() {
1330 channelz.RemoveEntry(s.channelzID)
1331 }
1332 })
1333
1334 s.mu.Lock()
1335 listeners := s.lis
1336 s.lis = nil
1337 st := s.conns
1338 s.conns = nil
1339 // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
1340 s.cv.Broadcast()
1341 s.mu.Unlock()
1342
1343 for lis := range listeners {
1344 lis.Close()
1345 }
1346 for c := range st {
1347 c.Close()
1348 }
1349
1350 s.mu.Lock()
1351 if s.events != nil {
1352 s.events.Finish()
1353 s.events = nil
1354 }
1355 s.mu.Unlock()
1356}
1357
1358// GracefulStop stops the gRPC server gracefully. It stops the server from
1359// accepting new connections and RPCs and blocks until all the pending RPCs are
1360// finished.
1361func (s *Server) GracefulStop() {
1362 s.quitOnce.Do(func() {
1363 close(s.quit)
1364 })
1365
1366 defer func() {
1367 s.doneOnce.Do(func() {
1368 close(s.done)
1369 })
1370 }()
1371
1372 s.channelzRemoveOnce.Do(func() {
1373 if channelz.IsOn() {
1374 channelz.RemoveEntry(s.channelzID)
1375 }
1376 })
1377 s.mu.Lock()
1378 if s.conns == nil {
1379 s.mu.Unlock()
1380 return
1381 }
1382
1383 for lis := range s.lis {
1384 lis.Close()
1385 }
1386 s.lis = nil
1387 if !s.drain {
1388 for c := range s.conns {
1389 c.(transport.ServerTransport).Drain()
1390 }
1391 s.drain = true
1392 }
1393
1394 // Wait for serving threads to be ready to exit. Only then can we be sure no
1395 // new conns will be created.
1396 s.mu.Unlock()
1397 s.serveWG.Wait()
1398 s.mu.Lock()
1399
1400 for len(s.conns) != 0 {
1401 s.cv.Wait()
1402 }
1403 s.conns = nil
1404 if s.events != nil {
1405 s.events.Finish()
1406 s.events = nil
1407 }
1408 s.mu.Unlock()
1409}
1410
1411// contentSubtype must be lowercase
1412// cannot return nil
1413func (s *Server) getCodec(contentSubtype string) baseCodec {
1414 if s.opts.codec != nil {
1415 return s.opts.codec
1416 }
1417 if contentSubtype == "" {
1418 return encoding.GetCodec(proto.Name)
1419 }
1420 codec := encoding.GetCodec(contentSubtype)
1421 if codec == nil {
1422 return encoding.GetCodec(proto.Name)
1423 }
1424 return codec
1425}
1426
1427// SetHeader sets the header metadata.
1428// When called multiple times, all the provided metadata will be merged.
1429// All the metadata will be sent out when one of the following happens:
1430// - grpc.SendHeader() is called;
1431// - The first response is sent out;
1432// - An RPC status is sent out (error or success).
1433func SetHeader(ctx context.Context, md metadata.MD) error {
1434 if md.Len() == 0 {
1435 return nil
1436 }
1437 stream := ServerTransportStreamFromContext(ctx)
1438 if stream == nil {
1439 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1440 }
1441 return stream.SetHeader(md)
1442}
1443
1444// SendHeader sends header metadata. It may be called at most once.
1445// The provided md and headers set by SetHeader() will be sent.
1446func SendHeader(ctx context.Context, md metadata.MD) error {
1447 stream := ServerTransportStreamFromContext(ctx)
1448 if stream == nil {
1449 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1450 }
1451 if err := stream.SendHeader(md); err != nil {
1452 return toRPCErr(err)
1453 }
1454 return nil
1455}
1456
1457// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
1458// When called more than once, all the provided metadata will be merged.
1459func SetTrailer(ctx context.Context, md metadata.MD) error {
1460 if md.Len() == 0 {
1461 return nil
1462 }
1463 stream := ServerTransportStreamFromContext(ctx)
1464 if stream == nil {
1465 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1466 }
1467 return stream.SetTrailer(md)
1468}
1469
1470// Method returns the method string for the server context. The returned
1471// string is in the format of "/service/method".
1472func Method(ctx context.Context) (string, bool) {
1473 s := ServerTransportStreamFromContext(ctx)
1474 if s == nil {
1475 return "", false
1476 }
1477 return s.Method(), true
1478}
1479
1480type channelzServer struct {
1481 s *Server
1482}
1483
1484func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
1485 return c.s.channelzMetric()
1486}