blob: edfcdcaee9e37b976e01511448e4af2787c42203 [file] [log] [blame]
kesavand2cde6582020-06-22 04:56:23 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "fmt"
25 "io"
26 "math"
27 "net"
28 "net/http"
29 "reflect"
30 "runtime"
31 "strings"
32 "sync"
33 "sync/atomic"
34 "time"
35
36 "golang.org/x/net/trace"
37
38 "google.golang.org/grpc/codes"
39 "google.golang.org/grpc/credentials"
40 "google.golang.org/grpc/encoding"
41 "google.golang.org/grpc/encoding/proto"
42 "google.golang.org/grpc/grpclog"
43 "google.golang.org/grpc/internal/binarylog"
44 "google.golang.org/grpc/internal/channelz"
45 "google.golang.org/grpc/internal/grpcsync"
46 "google.golang.org/grpc/internal/transport"
47 "google.golang.org/grpc/keepalive"
48 "google.golang.org/grpc/metadata"
49 "google.golang.org/grpc/peer"
50 "google.golang.org/grpc/stats"
51 "google.golang.org/grpc/status"
52 "google.golang.org/grpc/tap"
53)
54
55const (
56 defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
57 defaultServerMaxSendMessageSize = math.MaxInt32
58)
59
60var statusOK = status.New(codes.OK, "")
61
62type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
63
64// MethodDesc represents an RPC service's method specification.
65type MethodDesc struct {
66 MethodName string
67 Handler methodHandler
68}
69
70// ServiceDesc represents an RPC service's specification.
71type ServiceDesc struct {
72 ServiceName string
73 // The pointer to the service interface. Used to check whether the user
74 // provided implementation satisfies the interface requirements.
75 HandlerType interface{}
76 Methods []MethodDesc
77 Streams []StreamDesc
78 Metadata interface{}
79}
80
81// service consists of the information of the server serving this service and
82// the methods in this service.
83type service struct {
84 server interface{} // the server for service methods
85 md map[string]*MethodDesc
86 sd map[string]*StreamDesc
87 mdata interface{}
88}
89
90// Server is a gRPC server to serve RPC requests.
91type Server struct {
92 opts serverOptions
93
94 mu sync.Mutex // guards following
95 lis map[net.Listener]bool
96 conns map[transport.ServerTransport]bool
97 serve bool
98 drain bool
99 cv *sync.Cond // signaled when connections close for GracefulStop
100 m map[string]*service // service name -> service info
101 events trace.EventLog
102
103 quit *grpcsync.Event
104 done *grpcsync.Event
105 channelzRemoveOnce sync.Once
106 serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
107
108 channelzID int64 // channelz unique identification number
109 czData *channelzData
110}
111
112type serverOptions struct {
113 creds credentials.TransportCredentials
114 codec baseCodec
115 cp Compressor
116 dc Decompressor
117 unaryInt UnaryServerInterceptor
118 streamInt StreamServerInterceptor
119 chainUnaryInts []UnaryServerInterceptor
120 chainStreamInts []StreamServerInterceptor
121 inTapHandle tap.ServerInHandle
122 statsHandler stats.Handler
123 maxConcurrentStreams uint32
124 maxReceiveMessageSize int
125 maxSendMessageSize int
126 unknownStreamDesc *StreamDesc
127 keepaliveParams keepalive.ServerParameters
128 keepalivePolicy keepalive.EnforcementPolicy
129 initialWindowSize int32
130 initialConnWindowSize int32
131 writeBufferSize int
132 readBufferSize int
133 connectionTimeout time.Duration
134 maxHeaderListSize *uint32
135 headerTableSize *uint32
136}
137
138var defaultServerOptions = serverOptions{
139 maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
140 maxSendMessageSize: defaultServerMaxSendMessageSize,
141 connectionTimeout: 120 * time.Second,
142 writeBufferSize: defaultWriteBufSize,
143 readBufferSize: defaultReadBufSize,
144}
145
146// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
147type ServerOption interface {
148 apply(*serverOptions)
149}
150
151// EmptyServerOption does not alter the server configuration. It can be embedded
152// in another structure to build custom server options.
153//
154// This API is EXPERIMENTAL.
155type EmptyServerOption struct{}
156
157func (EmptyServerOption) apply(*serverOptions) {}
158
159// funcServerOption wraps a function that modifies serverOptions into an
160// implementation of the ServerOption interface.
161type funcServerOption struct {
162 f func(*serverOptions)
163}
164
165func (fdo *funcServerOption) apply(do *serverOptions) {
166 fdo.f(do)
167}
168
169func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
170 return &funcServerOption{
171 f: f,
172 }
173}
174
175// WriteBufferSize determines how much data can be batched before doing a write on the wire.
176// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
177// The default value for this buffer is 32KB.
178// Zero will disable the write buffer such that each write will be on underlying connection.
179// Note: A Send call may not directly translate to a write.
180func WriteBufferSize(s int) ServerOption {
181 return newFuncServerOption(func(o *serverOptions) {
182 o.writeBufferSize = s
183 })
184}
185
186// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
187// for one read syscall.
188// The default value for this buffer is 32KB.
189// Zero will disable read buffer for a connection so data framer can access the underlying
190// conn directly.
191func ReadBufferSize(s int) ServerOption {
192 return newFuncServerOption(func(o *serverOptions) {
193 o.readBufferSize = s
194 })
195}
196
197// InitialWindowSize returns a ServerOption that sets window size for stream.
198// The lower bound for window size is 64K and any value smaller than that will be ignored.
199func InitialWindowSize(s int32) ServerOption {
200 return newFuncServerOption(func(o *serverOptions) {
201 o.initialWindowSize = s
202 })
203}
204
205// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
206// The lower bound for window size is 64K and any value smaller than that will be ignored.
207func InitialConnWindowSize(s int32) ServerOption {
208 return newFuncServerOption(func(o *serverOptions) {
209 o.initialConnWindowSize = s
210 })
211}
212
213// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
214func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
215 if kp.Time > 0 && kp.Time < time.Second {
216 grpclog.Warning("Adjusting keepalive ping interval to minimum period of 1s")
217 kp.Time = time.Second
218 }
219
220 return newFuncServerOption(func(o *serverOptions) {
221 o.keepaliveParams = kp
222 })
223}
224
225// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
226func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
227 return newFuncServerOption(func(o *serverOptions) {
228 o.keepalivePolicy = kep
229 })
230}
231
232// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
233//
234// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
235func CustomCodec(codec Codec) ServerOption {
236 return newFuncServerOption(func(o *serverOptions) {
237 o.codec = codec
238 })
239}
240
241// RPCCompressor returns a ServerOption that sets a compressor for outbound
242// messages. For backward compatibility, all outbound messages will be sent
243// using this compressor, regardless of incoming message compression. By
244// default, server messages will be sent using the same compressor with which
245// request messages were sent.
246//
247// Deprecated: use encoding.RegisterCompressor instead.
248func RPCCompressor(cp Compressor) ServerOption {
249 return newFuncServerOption(func(o *serverOptions) {
250 o.cp = cp
251 })
252}
253
254// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
255// messages. It has higher priority than decompressors registered via
256// encoding.RegisterCompressor.
257//
258// Deprecated: use encoding.RegisterCompressor instead.
259func RPCDecompressor(dc Decompressor) ServerOption {
260 return newFuncServerOption(func(o *serverOptions) {
261 o.dc = dc
262 })
263}
264
265// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
266// If this is not set, gRPC uses the default limit.
267//
268// Deprecated: use MaxRecvMsgSize instead.
269func MaxMsgSize(m int) ServerOption {
270 return MaxRecvMsgSize(m)
271}
272
273// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
274// If this is not set, gRPC uses the default 4MB.
275func MaxRecvMsgSize(m int) ServerOption {
276 return newFuncServerOption(func(o *serverOptions) {
277 o.maxReceiveMessageSize = m
278 })
279}
280
281// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
282// If this is not set, gRPC uses the default `math.MaxInt32`.
283func MaxSendMsgSize(m int) ServerOption {
284 return newFuncServerOption(func(o *serverOptions) {
285 o.maxSendMessageSize = m
286 })
287}
288
289// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
290// of concurrent streams to each ServerTransport.
291func MaxConcurrentStreams(n uint32) ServerOption {
292 return newFuncServerOption(func(o *serverOptions) {
293 o.maxConcurrentStreams = n
294 })
295}
296
297// Creds returns a ServerOption that sets credentials for server connections.
298func Creds(c credentials.TransportCredentials) ServerOption {
299 return newFuncServerOption(func(o *serverOptions) {
300 o.creds = c
301 })
302}
303
304// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
305// server. Only one unary interceptor can be installed. The construction of multiple
306// interceptors (e.g., chaining) can be implemented at the caller.
307func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
308 return newFuncServerOption(func(o *serverOptions) {
309 if o.unaryInt != nil {
310 panic("The unary server interceptor was already set and may not be reset.")
311 }
312 o.unaryInt = i
313 })
314}
315
316// ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor
317// for unary RPCs. The first interceptor will be the outer most,
318// while the last interceptor will be the inner most wrapper around the real call.
319// All unary interceptors added by this method will be chained.
320func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption {
321 return newFuncServerOption(func(o *serverOptions) {
322 o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
323 })
324}
325
326// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
327// server. Only one stream interceptor can be installed.
328func StreamInterceptor(i StreamServerInterceptor) ServerOption {
329 return newFuncServerOption(func(o *serverOptions) {
330 if o.streamInt != nil {
331 panic("The stream server interceptor was already set and may not be reset.")
332 }
333 o.streamInt = i
334 })
335}
336
337// ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor
338// for stream RPCs. The first interceptor will be the outer most,
339// while the last interceptor will be the inner most wrapper around the real call.
340// All stream interceptors added by this method will be chained.
341func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption {
342 return newFuncServerOption(func(o *serverOptions) {
343 o.chainStreamInts = append(o.chainStreamInts, interceptors...)
344 })
345}
346
347// InTapHandle returns a ServerOption that sets the tap handle for all the server
348// transport to be created. Only one can be installed.
349func InTapHandle(h tap.ServerInHandle) ServerOption {
350 return newFuncServerOption(func(o *serverOptions) {
351 if o.inTapHandle != nil {
352 panic("The tap handle was already set and may not be reset.")
353 }
354 o.inTapHandle = h
355 })
356}
357
358// StatsHandler returns a ServerOption that sets the stats handler for the server.
359func StatsHandler(h stats.Handler) ServerOption {
360 return newFuncServerOption(func(o *serverOptions) {
361 o.statsHandler = h
362 })
363}
364
365// UnknownServiceHandler returns a ServerOption that allows for adding a custom
366// unknown service handler. The provided method is a bidi-streaming RPC service
367// handler that will be invoked instead of returning the "unimplemented" gRPC
368// error whenever a request is received for an unregistered service or method.
369// The handling function and stream interceptor (if set) have full access to
370// the ServerStream, including its Context.
371func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
372 return newFuncServerOption(func(o *serverOptions) {
373 o.unknownStreamDesc = &StreamDesc{
374 StreamName: "unknown_service_handler",
375 Handler: streamHandler,
376 // We need to assume that the users of the streamHandler will want to use both.
377 ClientStreams: true,
378 ServerStreams: true,
379 }
380 })
381}
382
383// ConnectionTimeout returns a ServerOption that sets the timeout for
384// connection establishment (up to and including HTTP/2 handshaking) for all
385// new connections. If this is not set, the default is 120 seconds. A zero or
386// negative value will result in an immediate timeout.
387//
388// This API is EXPERIMENTAL.
389func ConnectionTimeout(d time.Duration) ServerOption {
390 return newFuncServerOption(func(o *serverOptions) {
391 o.connectionTimeout = d
392 })
393}
394
395// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
396// of header list that the server is prepared to accept.
397func MaxHeaderListSize(s uint32) ServerOption {
398 return newFuncServerOption(func(o *serverOptions) {
399 o.maxHeaderListSize = &s
400 })
401}
402
403// HeaderTableSize returns a ServerOption that sets the size of dynamic
404// header table for stream.
405//
406// This API is EXPERIMENTAL.
407func HeaderTableSize(s uint32) ServerOption {
408 return newFuncServerOption(func(o *serverOptions) {
409 o.headerTableSize = &s
410 })
411}
412
413// NewServer creates a gRPC server which has no service registered and has not
414// started to accept requests yet.
415func NewServer(opt ...ServerOption) *Server {
416 opts := defaultServerOptions
417 for _, o := range opt {
418 o.apply(&opts)
419 }
420 s := &Server{
421 lis: make(map[net.Listener]bool),
422 opts: opts,
423 conns: make(map[transport.ServerTransport]bool),
424 m: make(map[string]*service),
425 quit: grpcsync.NewEvent(),
426 done: grpcsync.NewEvent(),
427 czData: new(channelzData),
428 }
429 chainUnaryServerInterceptors(s)
430 chainStreamServerInterceptors(s)
431 s.cv = sync.NewCond(&s.mu)
432 if EnableTracing {
433 _, file, line, _ := runtime.Caller(1)
434 s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
435 }
436
437 if channelz.IsOn() {
438 s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
439 }
440 return s
441}
442
443// printf records an event in s's event log, unless s has been stopped.
444// REQUIRES s.mu is held.
445func (s *Server) printf(format string, a ...interface{}) {
446 if s.events != nil {
447 s.events.Printf(format, a...)
448 }
449}
450
451// errorf records an error in s's event log, unless s has been stopped.
452// REQUIRES s.mu is held.
453func (s *Server) errorf(format string, a ...interface{}) {
454 if s.events != nil {
455 s.events.Errorf(format, a...)
456 }
457}
458
459// RegisterService registers a service and its implementation to the gRPC
460// server. It is called from the IDL generated code. This must be called before
461// invoking Serve.
462func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
463 ht := reflect.TypeOf(sd.HandlerType).Elem()
464 st := reflect.TypeOf(ss)
465 if !st.Implements(ht) {
466 grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
467 }
468 s.register(sd, ss)
469}
470
471func (s *Server) register(sd *ServiceDesc, ss interface{}) {
472 s.mu.Lock()
473 defer s.mu.Unlock()
474 s.printf("RegisterService(%q)", sd.ServiceName)
475 if s.serve {
476 grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
477 }
478 if _, ok := s.m[sd.ServiceName]; ok {
479 grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
480 }
481 srv := &service{
482 server: ss,
483 md: make(map[string]*MethodDesc),
484 sd: make(map[string]*StreamDesc),
485 mdata: sd.Metadata,
486 }
487 for i := range sd.Methods {
488 d := &sd.Methods[i]
489 srv.md[d.MethodName] = d
490 }
491 for i := range sd.Streams {
492 d := &sd.Streams[i]
493 srv.sd[d.StreamName] = d
494 }
495 s.m[sd.ServiceName] = srv
496}
497
498// MethodInfo contains the information of an RPC including its method name and type.
499type MethodInfo struct {
500 // Name is the method name only, without the service name or package name.
501 Name string
502 // IsClientStream indicates whether the RPC is a client streaming RPC.
503 IsClientStream bool
504 // IsServerStream indicates whether the RPC is a server streaming RPC.
505 IsServerStream bool
506}
507
508// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
509type ServiceInfo struct {
510 Methods []MethodInfo
511 // Metadata is the metadata specified in ServiceDesc when registering service.
512 Metadata interface{}
513}
514
515// GetServiceInfo returns a map from service names to ServiceInfo.
516// Service names include the package names, in the form of <package>.<service>.
517func (s *Server) GetServiceInfo() map[string]ServiceInfo {
518 ret := make(map[string]ServiceInfo)
519 for n, srv := range s.m {
520 methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
521 for m := range srv.md {
522 methods = append(methods, MethodInfo{
523 Name: m,
524 IsClientStream: false,
525 IsServerStream: false,
526 })
527 }
528 for m, d := range srv.sd {
529 methods = append(methods, MethodInfo{
530 Name: m,
531 IsClientStream: d.ClientStreams,
532 IsServerStream: d.ServerStreams,
533 })
534 }
535
536 ret[n] = ServiceInfo{
537 Methods: methods,
538 Metadata: srv.mdata,
539 }
540 }
541 return ret
542}
543
544// ErrServerStopped indicates that the operation is now illegal because of
545// the server being stopped.
546var ErrServerStopped = errors.New("grpc: the server has been stopped")
547
548func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
549 if s.opts.creds == nil {
550 return rawConn, nil, nil
551 }
552 return s.opts.creds.ServerHandshake(rawConn)
553}
554
555type listenSocket struct {
556 net.Listener
557 channelzID int64
558}
559
560func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
561 return &channelz.SocketInternalMetric{
562 SocketOptions: channelz.GetSocketOption(l.Listener),
563 LocalAddr: l.Listener.Addr(),
564 }
565}
566
567func (l *listenSocket) Close() error {
568 err := l.Listener.Close()
569 if channelz.IsOn() {
570 channelz.RemoveEntry(l.channelzID)
571 }
572 return err
573}
574
575// Serve accepts incoming connections on the listener lis, creating a new
576// ServerTransport and service goroutine for each. The service goroutines
577// read gRPC requests and then call the registered handlers to reply to them.
578// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
579// this method returns.
580// Serve will return a non-nil error unless Stop or GracefulStop is called.
581func (s *Server) Serve(lis net.Listener) error {
582 s.mu.Lock()
583 s.printf("serving")
584 s.serve = true
585 if s.lis == nil {
586 // Serve called after Stop or GracefulStop.
587 s.mu.Unlock()
588 lis.Close()
589 return ErrServerStopped
590 }
591
592 s.serveWG.Add(1)
593 defer func() {
594 s.serveWG.Done()
595 if s.quit.HasFired() {
596 // Stop or GracefulStop called; block until done and return nil.
597 <-s.done.Done()
598 }
599 }()
600
601 ls := &listenSocket{Listener: lis}
602 s.lis[ls] = true
603
604 if channelz.IsOn() {
605 ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
606 }
607 s.mu.Unlock()
608
609 defer func() {
610 s.mu.Lock()
611 if s.lis != nil && s.lis[ls] {
612 ls.Close()
613 delete(s.lis, ls)
614 }
615 s.mu.Unlock()
616 }()
617
618 var tempDelay time.Duration // how long to sleep on accept failure
619
620 for {
621 rawConn, err := lis.Accept()
622 if err != nil {
623 if ne, ok := err.(interface {
624 Temporary() bool
625 }); ok && ne.Temporary() {
626 if tempDelay == 0 {
627 tempDelay = 5 * time.Millisecond
628 } else {
629 tempDelay *= 2
630 }
631 if max := 1 * time.Second; tempDelay > max {
632 tempDelay = max
633 }
634 s.mu.Lock()
635 s.printf("Accept error: %v; retrying in %v", err, tempDelay)
636 s.mu.Unlock()
637 timer := time.NewTimer(tempDelay)
638 select {
639 case <-timer.C:
640 case <-s.quit.Done():
641 timer.Stop()
642 return nil
643 }
644 continue
645 }
646 s.mu.Lock()
647 s.printf("done serving; Accept = %v", err)
648 s.mu.Unlock()
649
650 if s.quit.HasFired() {
651 return nil
652 }
653 return err
654 }
655 tempDelay = 0
656 // Start a new goroutine to deal with rawConn so we don't stall this Accept
657 // loop goroutine.
658 //
659 // Make sure we account for the goroutine so GracefulStop doesn't nil out
660 // s.conns before this conn can be added.
661 s.serveWG.Add(1)
662 go func() {
663 s.handleRawConn(rawConn)
664 s.serveWG.Done()
665 }()
666 }
667}
668
669// handleRawConn forks a goroutine to handle a just-accepted connection that
670// has not had any I/O performed on it yet.
671func (s *Server) handleRawConn(rawConn net.Conn) {
672 if s.quit.HasFired() {
673 rawConn.Close()
674 return
675 }
676 rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
677 conn, authInfo, err := s.useTransportAuthenticator(rawConn)
678 if err != nil {
679 // ErrConnDispatched means that the connection was dispatched away from
680 // gRPC; those connections should be left open.
681 if err != credentials.ErrConnDispatched {
682 s.mu.Lock()
683 s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
684 s.mu.Unlock()
685 channelz.Warningf(s.channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
686 rawConn.Close()
687 }
688 rawConn.SetDeadline(time.Time{})
689 return
690 }
691
692 // Finish handshaking (HTTP2)
693 st := s.newHTTP2Transport(conn, authInfo)
694 if st == nil {
695 return
696 }
697
698 rawConn.SetDeadline(time.Time{})
699 if !s.addConn(st) {
700 return
701 }
702 go func() {
703 s.serveStreams(st)
704 s.removeConn(st)
705 }()
706}
707
708// newHTTP2Transport sets up a http/2 transport (using the
709// gRPC http2 server transport in transport/http2_server.go).
710func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
711 config := &transport.ServerConfig{
712 MaxStreams: s.opts.maxConcurrentStreams,
713 AuthInfo: authInfo,
714 InTapHandle: s.opts.inTapHandle,
715 StatsHandler: s.opts.statsHandler,
716 KeepaliveParams: s.opts.keepaliveParams,
717 KeepalivePolicy: s.opts.keepalivePolicy,
718 InitialWindowSize: s.opts.initialWindowSize,
719 InitialConnWindowSize: s.opts.initialConnWindowSize,
720 WriteBufferSize: s.opts.writeBufferSize,
721 ReadBufferSize: s.opts.readBufferSize,
722 ChannelzParentID: s.channelzID,
723 MaxHeaderListSize: s.opts.maxHeaderListSize,
724 HeaderTableSize: s.opts.headerTableSize,
725 }
726 st, err := transport.NewServerTransport("http2", c, config)
727 if err != nil {
728 s.mu.Lock()
729 s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
730 s.mu.Unlock()
731 c.Close()
732 channelz.Warning(s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
733 return nil
734 }
735
736 return st
737}
738
739func (s *Server) serveStreams(st transport.ServerTransport) {
740 defer st.Close()
741 var wg sync.WaitGroup
742 st.HandleStreams(func(stream *transport.Stream) {
743 wg.Add(1)
744 go func() {
745 defer wg.Done()
746 s.handleStream(st, stream, s.traceInfo(st, stream))
747 }()
748 }, func(ctx context.Context, method string) context.Context {
749 if !EnableTracing {
750 return ctx
751 }
752 tr := trace.New("grpc.Recv."+methodFamily(method), method)
753 return trace.NewContext(ctx, tr)
754 })
755 wg.Wait()
756}
757
758var _ http.Handler = (*Server)(nil)
759
760// ServeHTTP implements the Go standard library's http.Handler
761// interface by responding to the gRPC request r, by looking up
762// the requested gRPC method in the gRPC server s.
763//
764// The provided HTTP request must have arrived on an HTTP/2
765// connection. When using the Go standard library's server,
766// practically this means that the Request must also have arrived
767// over TLS.
768//
769// To share one port (such as 443 for https) between gRPC and an
770// existing http.Handler, use a root http.Handler such as:
771//
772// if r.ProtoMajor == 2 && strings.HasPrefix(
773// r.Header.Get("Content-Type"), "application/grpc") {
774// grpcServer.ServeHTTP(w, r)
775// } else {
776// yourMux.ServeHTTP(w, r)
777// }
778//
779// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
780// separate from grpc-go's HTTP/2 server. Performance and features may vary
781// between the two paths. ServeHTTP does not support some gRPC features
782// available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
783// and subject to change.
784func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
785 st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
786 if err != nil {
787 http.Error(w, err.Error(), http.StatusInternalServerError)
788 return
789 }
790 if !s.addConn(st) {
791 return
792 }
793 defer s.removeConn(st)
794 s.serveStreams(st)
795}
796
797// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
798// If tracing is not enabled, it returns nil.
799func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
800 if !EnableTracing {
801 return nil
802 }
803 tr, ok := trace.FromContext(stream.Context())
804 if !ok {
805 return nil
806 }
807
808 trInfo = &traceInfo{
809 tr: tr,
810 firstLine: firstLine{
811 client: false,
812 remoteAddr: st.RemoteAddr(),
813 },
814 }
815 if dl, ok := stream.Context().Deadline(); ok {
816 trInfo.firstLine.deadline = time.Until(dl)
817 }
818 return trInfo
819}
820
821func (s *Server) addConn(st transport.ServerTransport) bool {
822 s.mu.Lock()
823 defer s.mu.Unlock()
824 if s.conns == nil {
825 st.Close()
826 return false
827 }
828 if s.drain {
829 // Transport added after we drained our existing conns: drain it
830 // immediately.
831 st.Drain()
832 }
833 s.conns[st] = true
834 return true
835}
836
837func (s *Server) removeConn(st transport.ServerTransport) {
838 s.mu.Lock()
839 defer s.mu.Unlock()
840 if s.conns != nil {
841 delete(s.conns, st)
842 s.cv.Broadcast()
843 }
844}
845
846func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
847 return &channelz.ServerInternalMetric{
848 CallsStarted: atomic.LoadInt64(&s.czData.callsStarted),
849 CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded),
850 CallsFailed: atomic.LoadInt64(&s.czData.callsFailed),
851 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
852 }
853}
854
855func (s *Server) incrCallsStarted() {
856 atomic.AddInt64(&s.czData.callsStarted, 1)
857 atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
858}
859
860func (s *Server) incrCallsSucceeded() {
861 atomic.AddInt64(&s.czData.callsSucceeded, 1)
862}
863
864func (s *Server) incrCallsFailed() {
865 atomic.AddInt64(&s.czData.callsFailed, 1)
866}
867
868func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
869 data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
870 if err != nil {
871 channelz.Error(s.channelzID, "grpc: server failed to encode response: ", err)
872 return err
873 }
874 compData, err := compress(data, cp, comp)
875 if err != nil {
876 channelz.Error(s.channelzID, "grpc: server failed to compress response: ", err)
877 return err
878 }
879 hdr, payload := msgHeader(data, compData)
880 // TODO(dfawley): should we be checking len(data) instead?
881 if len(payload) > s.opts.maxSendMessageSize {
882 return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
883 }
884 err = t.Write(stream, hdr, payload, opts)
885 if err == nil && s.opts.statsHandler != nil {
886 s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
887 }
888 return err
889}
890
891// chainUnaryServerInterceptors chains all unary server interceptors into one.
892func chainUnaryServerInterceptors(s *Server) {
893 // Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will
894 // be executed before any other chained interceptors.
895 interceptors := s.opts.chainUnaryInts
896 if s.opts.unaryInt != nil {
897 interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
898 }
899
900 var chainedInt UnaryServerInterceptor
901 if len(interceptors) == 0 {
902 chainedInt = nil
903 } else if len(interceptors) == 1 {
904 chainedInt = interceptors[0]
905 } else {
906 chainedInt = func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
907 return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
908 }
909 }
910
911 s.opts.unaryInt = chainedInt
912}
913
914// getChainUnaryHandler recursively generate the chained UnaryHandler
915func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
916 if curr == len(interceptors)-1 {
917 return finalHandler
918 }
919
920 return func(ctx context.Context, req interface{}) (interface{}, error) {
921 return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
922 }
923}
924
925func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
926 sh := s.opts.statsHandler
927 if sh != nil || trInfo != nil || channelz.IsOn() {
928 if channelz.IsOn() {
929 s.incrCallsStarted()
930 }
931 var statsBegin *stats.Begin
932 if sh != nil {
933 beginTime := time.Now()
934 statsBegin = &stats.Begin{
935 BeginTime: beginTime,
936 }
937 sh.HandleRPC(stream.Context(), statsBegin)
938 }
939 if trInfo != nil {
940 trInfo.tr.LazyLog(&trInfo.firstLine, false)
941 }
942 // The deferred error handling for tracing, stats handler and channelz are
943 // combined into one function to reduce stack usage -- a defer takes ~56-64
944 // bytes on the stack, so overflowing the stack will require a stack
945 // re-allocation, which is expensive.
946 //
947 // To maintain behavior similar to separate deferred statements, statements
948 // should be executed in the reverse order. That is, tracing first, stats
949 // handler second, and channelz last. Note that panics *within* defers will
950 // lead to different behavior, but that's an acceptable compromise; that
951 // would be undefined behavior territory anyway.
952 defer func() {
953 if trInfo != nil {
954 if err != nil && err != io.EOF {
955 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
956 trInfo.tr.SetError()
957 }
958 trInfo.tr.Finish()
959 }
960
961 if sh != nil {
962 end := &stats.End{
963 BeginTime: statsBegin.BeginTime,
964 EndTime: time.Now(),
965 }
966 if err != nil && err != io.EOF {
967 end.Error = toRPCErr(err)
968 }
969 sh.HandleRPC(stream.Context(), end)
970 }
971
972 if channelz.IsOn() {
973 if err != nil && err != io.EOF {
974 s.incrCallsFailed()
975 } else {
976 s.incrCallsSucceeded()
977 }
978 }
979 }()
980 }
981
982 binlog := binarylog.GetMethodLogger(stream.Method())
983 if binlog != nil {
984 ctx := stream.Context()
985 md, _ := metadata.FromIncomingContext(ctx)
986 logEntry := &binarylog.ClientHeader{
987 Header: md,
988 MethodName: stream.Method(),
989 PeerAddr: nil,
990 }
991 if deadline, ok := ctx.Deadline(); ok {
992 logEntry.Timeout = time.Until(deadline)
993 if logEntry.Timeout < 0 {
994 logEntry.Timeout = 0
995 }
996 }
997 if a := md[":authority"]; len(a) > 0 {
998 logEntry.Authority = a[0]
999 }
1000 if peer, ok := peer.FromContext(ctx); ok {
1001 logEntry.PeerAddr = peer.Addr
1002 }
1003 binlog.Log(logEntry)
1004 }
1005
1006 // comp and cp are used for compression. decomp and dc are used for
1007 // decompression. If comp and decomp are both set, they are the same;
1008 // however they are kept separate to ensure that at most one of the
1009 // compressor/decompressor variable pairs are set for use later.
1010 var comp, decomp encoding.Compressor
1011 var cp Compressor
1012 var dc Decompressor
1013
1014 // If dc is set and matches the stream's compression, use it. Otherwise, try
1015 // to find a matching registered compressor for decomp.
1016 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1017 dc = s.opts.dc
1018 } else if rc != "" && rc != encoding.Identity {
1019 decomp = encoding.GetCompressor(rc)
1020 if decomp == nil {
1021 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1022 t.WriteStatus(stream, st)
1023 return st.Err()
1024 }
1025 }
1026
1027 // If cp is set, use it. Otherwise, attempt to compress the response using
1028 // the incoming message compression method.
1029 //
1030 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1031 if s.opts.cp != nil {
1032 cp = s.opts.cp
1033 stream.SetSendCompress(cp.Type())
1034 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1035 // Legacy compressor not specified; attempt to respond with same encoding.
1036 comp = encoding.GetCompressor(rc)
1037 if comp != nil {
1038 stream.SetSendCompress(rc)
1039 }
1040 }
1041
1042 var payInfo *payloadInfo
1043 if sh != nil || binlog != nil {
1044 payInfo = &payloadInfo{}
1045 }
1046 d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
1047 if err != nil {
1048 if st, ok := status.FromError(err); ok {
1049 if e := t.WriteStatus(stream, st); e != nil {
1050 channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e)
1051 }
1052 }
1053 return err
1054 }
1055 if channelz.IsOn() {
1056 t.IncrMsgRecv()
1057 }
1058 df := func(v interface{}) error {
1059 if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
1060 return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
1061 }
1062 if sh != nil {
1063 sh.HandleRPC(stream.Context(), &stats.InPayload{
1064 RecvTime: time.Now(),
1065 Payload: v,
1066 WireLength: payInfo.wireLength,
1067 Data: d,
1068 Length: len(d),
1069 })
1070 }
1071 if binlog != nil {
1072 binlog.Log(&binarylog.ClientMessage{
1073 Message: d,
1074 })
1075 }
1076 if trInfo != nil {
1077 trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
1078 }
1079 return nil
1080 }
1081 ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1082 reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
1083 if appErr != nil {
1084 appStatus, ok := status.FromError(appErr)
1085 if !ok {
1086 // Convert appErr if it is not a grpc status error.
1087 appErr = status.Error(codes.Unknown, appErr.Error())
1088 appStatus, _ = status.FromError(appErr)
1089 }
1090 if trInfo != nil {
1091 trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1092 trInfo.tr.SetError()
1093 }
1094 if e := t.WriteStatus(stream, appStatus); e != nil {
1095 channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
1096 }
1097 if binlog != nil {
1098 if h, _ := stream.Header(); h.Len() > 0 {
1099 // Only log serverHeader if there was header. Otherwise it can
1100 // be trailer only.
1101 binlog.Log(&binarylog.ServerHeader{
1102 Header: h,
1103 })
1104 }
1105 binlog.Log(&binarylog.ServerTrailer{
1106 Trailer: stream.Trailer(),
1107 Err: appErr,
1108 })
1109 }
1110 return appErr
1111 }
1112 if trInfo != nil {
1113 trInfo.tr.LazyLog(stringer("OK"), false)
1114 }
1115 opts := &transport.Options{Last: true}
1116
1117 if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
1118 if err == io.EOF {
1119 // The entire stream is done (for unary RPC only).
1120 return err
1121 }
1122 if sts, ok := status.FromError(err); ok {
1123 if e := t.WriteStatus(stream, sts); e != nil {
1124 channelz.Warningf(s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
1125 }
1126 } else {
1127 switch st := err.(type) {
1128 case transport.ConnectionError:
1129 // Nothing to do here.
1130 default:
1131 panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
1132 }
1133 }
1134 if binlog != nil {
1135 h, _ := stream.Header()
1136 binlog.Log(&binarylog.ServerHeader{
1137 Header: h,
1138 })
1139 binlog.Log(&binarylog.ServerTrailer{
1140 Trailer: stream.Trailer(),
1141 Err: appErr,
1142 })
1143 }
1144 return err
1145 }
1146 if binlog != nil {
1147 h, _ := stream.Header()
1148 binlog.Log(&binarylog.ServerHeader{
1149 Header: h,
1150 })
1151 binlog.Log(&binarylog.ServerMessage{
1152 Message: reply,
1153 })
1154 }
1155 if channelz.IsOn() {
1156 t.IncrMsgSent()
1157 }
1158 if trInfo != nil {
1159 trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
1160 }
1161 // TODO: Should we be logging if writing status failed here, like above?
1162 // Should the logging be in WriteStatus? Should we ignore the WriteStatus
1163 // error or allow the stats handler to see it?
1164 err = t.WriteStatus(stream, statusOK)
1165 if binlog != nil {
1166 binlog.Log(&binarylog.ServerTrailer{
1167 Trailer: stream.Trailer(),
1168 Err: appErr,
1169 })
1170 }
1171 return err
1172}
1173
1174// chainStreamServerInterceptors chains all stream server interceptors into one.
1175func chainStreamServerInterceptors(s *Server) {
1176 // Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will
1177 // be executed before any other chained interceptors.
1178 interceptors := s.opts.chainStreamInts
1179 if s.opts.streamInt != nil {
1180 interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...)
1181 }
1182
1183 var chainedInt StreamServerInterceptor
1184 if len(interceptors) == 0 {
1185 chainedInt = nil
1186 } else if len(interceptors) == 1 {
1187 chainedInt = interceptors[0]
1188 } else {
1189 chainedInt = func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
1190 return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler))
1191 }
1192 }
1193
1194 s.opts.streamInt = chainedInt
1195}
1196
1197// getChainStreamHandler recursively generate the chained StreamHandler
1198func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler {
1199 if curr == len(interceptors)-1 {
1200 return finalHandler
1201 }
1202
1203 return func(srv interface{}, ss ServerStream) error {
1204 return interceptors[curr+1](srv, ss, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler))
1205 }
1206}
1207
1208func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
1209 if channelz.IsOn() {
1210 s.incrCallsStarted()
1211 }
1212 sh := s.opts.statsHandler
1213 var statsBegin *stats.Begin
1214 if sh != nil {
1215 beginTime := time.Now()
1216 statsBegin = &stats.Begin{
1217 BeginTime: beginTime,
1218 }
1219 sh.HandleRPC(stream.Context(), statsBegin)
1220 }
1221 ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1222 ss := &serverStream{
1223 ctx: ctx,
1224 t: t,
1225 s: stream,
1226 p: &parser{r: stream},
1227 codec: s.getCodec(stream.ContentSubtype()),
1228 maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
1229 maxSendMessageSize: s.opts.maxSendMessageSize,
1230 trInfo: trInfo,
1231 statsHandler: sh,
1232 }
1233
1234 if sh != nil || trInfo != nil || channelz.IsOn() {
1235 // See comment in processUnaryRPC on defers.
1236 defer func() {
1237 if trInfo != nil {
1238 ss.mu.Lock()
1239 if err != nil && err != io.EOF {
1240 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1241 ss.trInfo.tr.SetError()
1242 }
1243 ss.trInfo.tr.Finish()
1244 ss.trInfo.tr = nil
1245 ss.mu.Unlock()
1246 }
1247
1248 if sh != nil {
1249 end := &stats.End{
1250 BeginTime: statsBegin.BeginTime,
1251 EndTime: time.Now(),
1252 }
1253 if err != nil && err != io.EOF {
1254 end.Error = toRPCErr(err)
1255 }
1256 sh.HandleRPC(stream.Context(), end)
1257 }
1258
1259 if channelz.IsOn() {
1260 if err != nil && err != io.EOF {
1261 s.incrCallsFailed()
1262 } else {
1263 s.incrCallsSucceeded()
1264 }
1265 }
1266 }()
1267 }
1268
1269 ss.binlog = binarylog.GetMethodLogger(stream.Method())
1270 if ss.binlog != nil {
1271 md, _ := metadata.FromIncomingContext(ctx)
1272 logEntry := &binarylog.ClientHeader{
1273 Header: md,
1274 MethodName: stream.Method(),
1275 PeerAddr: nil,
1276 }
1277 if deadline, ok := ctx.Deadline(); ok {
1278 logEntry.Timeout = time.Until(deadline)
1279 if logEntry.Timeout < 0 {
1280 logEntry.Timeout = 0
1281 }
1282 }
1283 if a := md[":authority"]; len(a) > 0 {
1284 logEntry.Authority = a[0]
1285 }
1286 if peer, ok := peer.FromContext(ss.Context()); ok {
1287 logEntry.PeerAddr = peer.Addr
1288 }
1289 ss.binlog.Log(logEntry)
1290 }
1291
1292 // If dc is set and matches the stream's compression, use it. Otherwise, try
1293 // to find a matching registered compressor for decomp.
1294 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1295 ss.dc = s.opts.dc
1296 } else if rc != "" && rc != encoding.Identity {
1297 ss.decomp = encoding.GetCompressor(rc)
1298 if ss.decomp == nil {
1299 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1300 t.WriteStatus(ss.s, st)
1301 return st.Err()
1302 }
1303 }
1304
1305 // If cp is set, use it. Otherwise, attempt to compress the response using
1306 // the incoming message compression method.
1307 //
1308 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1309 if s.opts.cp != nil {
1310 ss.cp = s.opts.cp
1311 stream.SetSendCompress(s.opts.cp.Type())
1312 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1313 // Legacy compressor not specified; attempt to respond with same encoding.
1314 ss.comp = encoding.GetCompressor(rc)
1315 if ss.comp != nil {
1316 stream.SetSendCompress(rc)
1317 }
1318 }
1319
1320 if trInfo != nil {
1321 trInfo.tr.LazyLog(&trInfo.firstLine, false)
1322 }
1323 var appErr error
1324 var server interface{}
1325 if srv != nil {
1326 server = srv.server
1327 }
1328 if s.opts.streamInt == nil {
1329 appErr = sd.Handler(server, ss)
1330 } else {
1331 info := &StreamServerInfo{
1332 FullMethod: stream.Method(),
1333 IsClientStream: sd.ClientStreams,
1334 IsServerStream: sd.ServerStreams,
1335 }
1336 appErr = s.opts.streamInt(server, ss, info, sd.Handler)
1337 }
1338 if appErr != nil {
1339 appStatus, ok := status.FromError(appErr)
1340 if !ok {
1341 appStatus = status.New(codes.Unknown, appErr.Error())
1342 appErr = appStatus.Err()
1343 }
1344 if trInfo != nil {
1345 ss.mu.Lock()
1346 ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1347 ss.trInfo.tr.SetError()
1348 ss.mu.Unlock()
1349 }
1350 t.WriteStatus(ss.s, appStatus)
1351 if ss.binlog != nil {
1352 ss.binlog.Log(&binarylog.ServerTrailer{
1353 Trailer: ss.s.Trailer(),
1354 Err: appErr,
1355 })
1356 }
1357 // TODO: Should we log an error from WriteStatus here and below?
1358 return appErr
1359 }
1360 if trInfo != nil {
1361 ss.mu.Lock()
1362 ss.trInfo.tr.LazyLog(stringer("OK"), false)
1363 ss.mu.Unlock()
1364 }
1365 err = t.WriteStatus(ss.s, statusOK)
1366 if ss.binlog != nil {
1367 ss.binlog.Log(&binarylog.ServerTrailer{
1368 Trailer: ss.s.Trailer(),
1369 Err: appErr,
1370 })
1371 }
1372 return err
1373}
1374
1375func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
1376 sm := stream.Method()
1377 if sm != "" && sm[0] == '/' {
1378 sm = sm[1:]
1379 }
1380 pos := strings.LastIndex(sm, "/")
1381 if pos == -1 {
1382 if trInfo != nil {
1383 trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
1384 trInfo.tr.SetError()
1385 }
1386 errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
1387 if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
1388 if trInfo != nil {
1389 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1390 trInfo.tr.SetError()
1391 }
1392 channelz.Warningf(s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
1393 }
1394 if trInfo != nil {
1395 trInfo.tr.Finish()
1396 }
1397 return
1398 }
1399 service := sm[:pos]
1400 method := sm[pos+1:]
1401
1402 srv, knownService := s.m[service]
1403 if knownService {
1404 if md, ok := srv.md[method]; ok {
1405 s.processUnaryRPC(t, stream, srv, md, trInfo)
1406 return
1407 }
1408 if sd, ok := srv.sd[method]; ok {
1409 s.processStreamingRPC(t, stream, srv, sd, trInfo)
1410 return
1411 }
1412 }
1413 // Unknown service, or known server unknown method.
1414 if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
1415 s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
1416 return
1417 }
1418 var errDesc string
1419 if !knownService {
1420 errDesc = fmt.Sprintf("unknown service %v", service)
1421 } else {
1422 errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
1423 }
1424 if trInfo != nil {
1425 trInfo.tr.LazyPrintf("%s", errDesc)
1426 trInfo.tr.SetError()
1427 }
1428 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1429 if trInfo != nil {
1430 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1431 trInfo.tr.SetError()
1432 }
1433 channelz.Warningf(s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
1434 }
1435 if trInfo != nil {
1436 trInfo.tr.Finish()
1437 }
1438}
1439
1440// The key to save ServerTransportStream in the context.
1441type streamKey struct{}
1442
1443// NewContextWithServerTransportStream creates a new context from ctx and
1444// attaches stream to it.
1445//
1446// This API is EXPERIMENTAL.
1447func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
1448 return context.WithValue(ctx, streamKey{}, stream)
1449}
1450
1451// ServerTransportStream is a minimal interface that a transport stream must
1452// implement. This can be used to mock an actual transport stream for tests of
1453// handler code that use, for example, grpc.SetHeader (which requires some
1454// stream to be in context).
1455//
1456// See also NewContextWithServerTransportStream.
1457//
1458// This API is EXPERIMENTAL.
1459type ServerTransportStream interface {
1460 Method() string
1461 SetHeader(md metadata.MD) error
1462 SendHeader(md metadata.MD) error
1463 SetTrailer(md metadata.MD) error
1464}
1465
1466// ServerTransportStreamFromContext returns the ServerTransportStream saved in
1467// ctx. Returns nil if the given context has no stream associated with it
1468// (which implies it is not an RPC invocation context).
1469//
1470// This API is EXPERIMENTAL.
1471func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
1472 s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
1473 return s
1474}
1475
1476// Stop stops the gRPC server. It immediately closes all open
1477// connections and listeners.
1478// It cancels all active RPCs on the server side and the corresponding
1479// pending RPCs on the client side will get notified by connection
1480// errors.
1481func (s *Server) Stop() {
1482 s.quit.Fire()
1483
1484 defer func() {
1485 s.serveWG.Wait()
1486 s.done.Fire()
1487 }()
1488
1489 s.channelzRemoveOnce.Do(func() {
1490 if channelz.IsOn() {
1491 channelz.RemoveEntry(s.channelzID)
1492 }
1493 })
1494
1495 s.mu.Lock()
1496 listeners := s.lis
1497 s.lis = nil
1498 st := s.conns
1499 s.conns = nil
1500 // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
1501 s.cv.Broadcast()
1502 s.mu.Unlock()
1503
1504 for lis := range listeners {
1505 lis.Close()
1506 }
1507 for c := range st {
1508 c.Close()
1509 }
1510
1511 s.mu.Lock()
1512 if s.events != nil {
1513 s.events.Finish()
1514 s.events = nil
1515 }
1516 s.mu.Unlock()
1517}
1518
1519// GracefulStop stops the gRPC server gracefully. It stops the server from
1520// accepting new connections and RPCs and blocks until all the pending RPCs are
1521// finished.
1522func (s *Server) GracefulStop() {
1523 s.quit.Fire()
1524 defer s.done.Fire()
1525
1526 s.channelzRemoveOnce.Do(func() {
1527 if channelz.IsOn() {
1528 channelz.RemoveEntry(s.channelzID)
1529 }
1530 })
1531 s.mu.Lock()
1532 if s.conns == nil {
1533 s.mu.Unlock()
1534 return
1535 }
1536
1537 for lis := range s.lis {
1538 lis.Close()
1539 }
1540 s.lis = nil
1541 if !s.drain {
1542 for st := range s.conns {
1543 st.Drain()
1544 }
1545 s.drain = true
1546 }
1547
1548 // Wait for serving threads to be ready to exit. Only then can we be sure no
1549 // new conns will be created.
1550 s.mu.Unlock()
1551 s.serveWG.Wait()
1552 s.mu.Lock()
1553
1554 for len(s.conns) != 0 {
1555 s.cv.Wait()
1556 }
1557 s.conns = nil
1558 if s.events != nil {
1559 s.events.Finish()
1560 s.events = nil
1561 }
1562 s.mu.Unlock()
1563}
1564
1565// contentSubtype must be lowercase
1566// cannot return nil
1567func (s *Server) getCodec(contentSubtype string) baseCodec {
1568 if s.opts.codec != nil {
1569 return s.opts.codec
1570 }
1571 if contentSubtype == "" {
1572 return encoding.GetCodec(proto.Name)
1573 }
1574 codec := encoding.GetCodec(contentSubtype)
1575 if codec == nil {
1576 return encoding.GetCodec(proto.Name)
1577 }
1578 return codec
1579}
1580
1581// SetHeader sets the header metadata.
1582// When called multiple times, all the provided metadata will be merged.
1583// All the metadata will be sent out when one of the following happens:
1584// - grpc.SendHeader() is called;
1585// - The first response is sent out;
1586// - An RPC status is sent out (error or success).
1587func SetHeader(ctx context.Context, md metadata.MD) error {
1588 if md.Len() == 0 {
1589 return nil
1590 }
1591 stream := ServerTransportStreamFromContext(ctx)
1592 if stream == nil {
1593 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1594 }
1595 return stream.SetHeader(md)
1596}
1597
1598// SendHeader sends header metadata. It may be called at most once.
1599// The provided md and headers set by SetHeader() will be sent.
1600func SendHeader(ctx context.Context, md metadata.MD) error {
1601 stream := ServerTransportStreamFromContext(ctx)
1602 if stream == nil {
1603 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1604 }
1605 if err := stream.SendHeader(md); err != nil {
1606 return toRPCErr(err)
1607 }
1608 return nil
1609}
1610
1611// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
1612// When called more than once, all the provided metadata will be merged.
1613func SetTrailer(ctx context.Context, md metadata.MD) error {
1614 if md.Len() == 0 {
1615 return nil
1616 }
1617 stream := ServerTransportStreamFromContext(ctx)
1618 if stream == nil {
1619 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1620 }
1621 return stream.SetTrailer(md)
1622}
1623
1624// Method returns the method string for the server context. The returned
1625// string is in the format of "/service/method".
1626func Method(ctx context.Context) (string, bool) {
1627 s := ServerTransportStreamFromContext(ctx)
1628 if s == nil {
1629 return "", false
1630 }
1631 return s.Method(), true
1632}
1633
1634type channelzServer struct {
1635 s *Server
1636}
1637
1638func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
1639 return c.s.channelzMetric()
1640}