blob: 81969e7c15a98df52ae46d359ac9f8794031e430 [file] [log] [blame]
khenaidoo5fc5cea2021-08-11 17:39:16 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "fmt"
25 "io"
26 "math"
27 "net"
28 "net/http"
29 "reflect"
30 "runtime"
31 "strings"
32 "sync"
33 "sync/atomic"
34 "time"
35
36 "golang.org/x/net/trace"
37
38 "google.golang.org/grpc/codes"
39 "google.golang.org/grpc/credentials"
40 "google.golang.org/grpc/encoding"
41 "google.golang.org/grpc/encoding/proto"
42 "google.golang.org/grpc/grpclog"
43 "google.golang.org/grpc/internal"
44 "google.golang.org/grpc/internal/binarylog"
45 "google.golang.org/grpc/internal/channelz"
khenaidoo5fc5cea2021-08-11 17:39:16 -040046 "google.golang.org/grpc/internal/grpcsync"
Akash Kankanala761955c2024-02-21 19:32:20 +053047 "google.golang.org/grpc/internal/grpcutil"
khenaidoo5fc5cea2021-08-11 17:39:16 -040048 "google.golang.org/grpc/internal/transport"
49 "google.golang.org/grpc/keepalive"
50 "google.golang.org/grpc/metadata"
51 "google.golang.org/grpc/peer"
52 "google.golang.org/grpc/stats"
53 "google.golang.org/grpc/status"
54 "google.golang.org/grpc/tap"
55)
56
57const (
58 defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
59 defaultServerMaxSendMessageSize = math.MaxInt32
60
61 // Server transports are tracked in a map which is keyed on listener
62 // address. For regular gRPC traffic, connections are accepted in Serve()
63 // through a call to Accept(), and we use the actual listener address as key
64 // when we add it to the map. But for connections received through
65 // ServeHTTP(), we do not have a listener and hence use this dummy value.
66 listenerAddressForServeHTTP = "listenerAddressForServeHTTP"
67)
68
69func init() {
70 internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials {
71 return srv.opts.creds
72 }
73 internal.DrainServerTransports = func(srv *Server, addr string) {
74 srv.drainServerTransports(addr)
75 }
Akash Kankanala761955c2024-02-21 19:32:20 +053076 internal.AddGlobalServerOptions = func(opt ...ServerOption) {
77 globalServerOptions = append(globalServerOptions, opt...)
78 }
79 internal.ClearGlobalServerOptions = func() {
80 globalServerOptions = nil
81 }
82 internal.BinaryLogger = binaryLogger
83 internal.JoinServerOptions = newJoinServerOption
khenaidoo5fc5cea2021-08-11 17:39:16 -040084}
85
86var statusOK = status.New(codes.OK, "")
87var logger = grpclog.Component("core")
88
89type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
90
91// MethodDesc represents an RPC service's method specification.
92type MethodDesc struct {
93 MethodName string
94 Handler methodHandler
95}
96
97// ServiceDesc represents an RPC service's specification.
98type ServiceDesc struct {
99 ServiceName string
100 // The pointer to the service interface. Used to check whether the user
101 // provided implementation satisfies the interface requirements.
102 HandlerType interface{}
103 Methods []MethodDesc
104 Streams []StreamDesc
105 Metadata interface{}
106}
107
108// serviceInfo wraps information about a service. It is very similar to
109// ServiceDesc and is constructed from it for internal purposes.
110type serviceInfo struct {
111 // Contains the implementation for the methods in this service.
112 serviceImpl interface{}
113 methods map[string]*MethodDesc
114 streams map[string]*StreamDesc
115 mdata interface{}
116}
117
118type serverWorkerData struct {
119 st transport.ServerTransport
120 wg *sync.WaitGroup
121 stream *transport.Stream
122}
123
124// Server is a gRPC server to serve RPC requests.
125type Server struct {
126 opts serverOptions
127
128 mu sync.Mutex // guards following
129 lis map[net.Listener]bool
130 // conns contains all active server transports. It is a map keyed on a
131 // listener address with the value being the set of active transports
132 // belonging to that listener.
133 conns map[string]map[transport.ServerTransport]bool
134 serve bool
135 drain bool
136 cv *sync.Cond // signaled when connections close for GracefulStop
137 services map[string]*serviceInfo // service name -> service info
138 events trace.EventLog
139
140 quit *grpcsync.Event
141 done *grpcsync.Event
142 channelzRemoveOnce sync.Once
143 serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
144
Akash Kankanala761955c2024-02-21 19:32:20 +0530145 channelzID *channelz.Identifier
khenaidoo5fc5cea2021-08-11 17:39:16 -0400146 czData *channelzData
147
Akash Kankanala761955c2024-02-21 19:32:20 +0530148 serverWorkerChannel chan *serverWorkerData
khenaidoo5fc5cea2021-08-11 17:39:16 -0400149}
150
151type serverOptions struct {
152 creds credentials.TransportCredentials
153 codec baseCodec
154 cp Compressor
155 dc Decompressor
156 unaryInt UnaryServerInterceptor
157 streamInt StreamServerInterceptor
158 chainUnaryInts []UnaryServerInterceptor
159 chainStreamInts []StreamServerInterceptor
Akash Kankanala761955c2024-02-21 19:32:20 +0530160 binaryLogger binarylog.Logger
khenaidoo5fc5cea2021-08-11 17:39:16 -0400161 inTapHandle tap.ServerInHandle
Akash Kankanala761955c2024-02-21 19:32:20 +0530162 statsHandlers []stats.Handler
khenaidoo5fc5cea2021-08-11 17:39:16 -0400163 maxConcurrentStreams uint32
164 maxReceiveMessageSize int
165 maxSendMessageSize int
166 unknownStreamDesc *StreamDesc
167 keepaliveParams keepalive.ServerParameters
168 keepalivePolicy keepalive.EnforcementPolicy
169 initialWindowSize int32
170 initialConnWindowSize int32
171 writeBufferSize int
172 readBufferSize int
173 connectionTimeout time.Duration
174 maxHeaderListSize *uint32
175 headerTableSize *uint32
176 numServerWorkers uint32
177}
178
179var defaultServerOptions = serverOptions{
180 maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
181 maxSendMessageSize: defaultServerMaxSendMessageSize,
182 connectionTimeout: 120 * time.Second,
183 writeBufferSize: defaultWriteBufSize,
184 readBufferSize: defaultReadBufSize,
185}
Akash Kankanala761955c2024-02-21 19:32:20 +0530186var globalServerOptions []ServerOption
khenaidoo5fc5cea2021-08-11 17:39:16 -0400187
188// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
189type ServerOption interface {
190 apply(*serverOptions)
191}
192
193// EmptyServerOption does not alter the server configuration. It can be embedded
194// in another structure to build custom server options.
195//
Joey Armstrongba3d9d12024-01-15 14:22:11 -0500196// # Experimental
khenaidoo5fc5cea2021-08-11 17:39:16 -0400197//
198// Notice: This type is EXPERIMENTAL and may be changed or removed in a
199// later release.
200type EmptyServerOption struct{}
201
202func (EmptyServerOption) apply(*serverOptions) {}
203
204// funcServerOption wraps a function that modifies serverOptions into an
205// implementation of the ServerOption interface.
206type funcServerOption struct {
207 f func(*serverOptions)
208}
209
210func (fdo *funcServerOption) apply(do *serverOptions) {
211 fdo.f(do)
212}
213
214func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
215 return &funcServerOption{
216 f: f,
217 }
218}
219
Akash Kankanala761955c2024-02-21 19:32:20 +0530220// joinServerOption provides a way to combine arbitrary number of server
221// options into one.
222type joinServerOption struct {
223 opts []ServerOption
224}
225
226func (mdo *joinServerOption) apply(do *serverOptions) {
227 for _, opt := range mdo.opts {
228 opt.apply(do)
229 }
230}
231
232func newJoinServerOption(opts ...ServerOption) ServerOption {
233 return &joinServerOption{opts: opts}
234}
235
236// WriteBufferSize determines how much data can be batched before doing a write
237// on the wire. The corresponding memory allocation for this buffer will be
238// twice the size to keep syscalls low. The default value for this buffer is
239// 32KB. Zero or negative values will disable the write buffer such that each
240// write will be on underlying connection.
khenaidoo5fc5cea2021-08-11 17:39:16 -0400241// Note: A Send call may not directly translate to a write.
242func WriteBufferSize(s int) ServerOption {
243 return newFuncServerOption(func(o *serverOptions) {
244 o.writeBufferSize = s
245 })
246}
247
Akash Kankanala761955c2024-02-21 19:32:20 +0530248// ReadBufferSize lets you set the size of read buffer, this determines how much
249// data can be read at most for one read syscall. The default value for this
250// buffer is 32KB. Zero or negative values will disable read buffer for a
251// connection so data framer can access the underlying conn directly.
khenaidoo5fc5cea2021-08-11 17:39:16 -0400252func ReadBufferSize(s int) ServerOption {
253 return newFuncServerOption(func(o *serverOptions) {
254 o.readBufferSize = s
255 })
256}
257
258// InitialWindowSize returns a ServerOption that sets window size for stream.
259// The lower bound for window size is 64K and any value smaller than that will be ignored.
260func InitialWindowSize(s int32) ServerOption {
261 return newFuncServerOption(func(o *serverOptions) {
262 o.initialWindowSize = s
263 })
264}
265
266// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
267// The lower bound for window size is 64K and any value smaller than that will be ignored.
268func InitialConnWindowSize(s int32) ServerOption {
269 return newFuncServerOption(func(o *serverOptions) {
270 o.initialConnWindowSize = s
271 })
272}
273
274// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
275func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
276 if kp.Time > 0 && kp.Time < time.Second {
277 logger.Warning("Adjusting keepalive ping interval to minimum period of 1s")
278 kp.Time = time.Second
279 }
280
281 return newFuncServerOption(func(o *serverOptions) {
282 o.keepaliveParams = kp
283 })
284}
285
286// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
287func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
288 return newFuncServerOption(func(o *serverOptions) {
289 o.keepalivePolicy = kep
290 })
291}
292
293// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
294//
295// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
296//
297// Deprecated: register codecs using encoding.RegisterCodec. The server will
298// automatically use registered codecs based on the incoming requests' headers.
299// See also
300// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
301// Will be supported throughout 1.x.
302func CustomCodec(codec Codec) ServerOption {
303 return newFuncServerOption(func(o *serverOptions) {
304 o.codec = codec
305 })
306}
307
308// ForceServerCodec returns a ServerOption that sets a codec for message
309// marshaling and unmarshaling.
310//
311// This will override any lookups by content-subtype for Codecs registered
312// with RegisterCodec.
313//
314// See Content-Type on
315// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
316// more details. Also see the documentation on RegisterCodec and
317// CallContentSubtype for more details on the interaction between encoding.Codec
318// and content-subtype.
319//
320// This function is provided for advanced users; prefer to register codecs
321// using encoding.RegisterCodec.
322// The server will automatically use registered codecs based on the incoming
323// requests' headers. See also
324// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
325// Will be supported throughout 1.x.
326//
Joey Armstrongba3d9d12024-01-15 14:22:11 -0500327// # Experimental
khenaidoo5fc5cea2021-08-11 17:39:16 -0400328//
329// Notice: This API is EXPERIMENTAL and may be changed or removed in a
330// later release.
331func ForceServerCodec(codec encoding.Codec) ServerOption {
332 return newFuncServerOption(func(o *serverOptions) {
333 o.codec = codec
334 })
335}
336
337// RPCCompressor returns a ServerOption that sets a compressor for outbound
338// messages. For backward compatibility, all outbound messages will be sent
339// using this compressor, regardless of incoming message compression. By
340// default, server messages will be sent using the same compressor with which
341// request messages were sent.
342//
343// Deprecated: use encoding.RegisterCompressor instead. Will be supported
344// throughout 1.x.
345func RPCCompressor(cp Compressor) ServerOption {
346 return newFuncServerOption(func(o *serverOptions) {
347 o.cp = cp
348 })
349}
350
351// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
352// messages. It has higher priority than decompressors registered via
353// encoding.RegisterCompressor.
354//
355// Deprecated: use encoding.RegisterCompressor instead. Will be supported
356// throughout 1.x.
357func RPCDecompressor(dc Decompressor) ServerOption {
358 return newFuncServerOption(func(o *serverOptions) {
359 o.dc = dc
360 })
361}
362
363// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
364// If this is not set, gRPC uses the default limit.
365//
366// Deprecated: use MaxRecvMsgSize instead. Will be supported throughout 1.x.
367func MaxMsgSize(m int) ServerOption {
368 return MaxRecvMsgSize(m)
369}
370
371// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
372// If this is not set, gRPC uses the default 4MB.
373func MaxRecvMsgSize(m int) ServerOption {
374 return newFuncServerOption(func(o *serverOptions) {
375 o.maxReceiveMessageSize = m
376 })
377}
378
379// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
380// If this is not set, gRPC uses the default `math.MaxInt32`.
381func MaxSendMsgSize(m int) ServerOption {
382 return newFuncServerOption(func(o *serverOptions) {
383 o.maxSendMessageSize = m
384 })
385}
386
387// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
388// of concurrent streams to each ServerTransport.
389func MaxConcurrentStreams(n uint32) ServerOption {
390 return newFuncServerOption(func(o *serverOptions) {
391 o.maxConcurrentStreams = n
392 })
393}
394
395// Creds returns a ServerOption that sets credentials for server connections.
396func Creds(c credentials.TransportCredentials) ServerOption {
397 return newFuncServerOption(func(o *serverOptions) {
398 o.creds = c
399 })
400}
401
402// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
403// server. Only one unary interceptor can be installed. The construction of multiple
404// interceptors (e.g., chaining) can be implemented at the caller.
405func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
406 return newFuncServerOption(func(o *serverOptions) {
407 if o.unaryInt != nil {
408 panic("The unary server interceptor was already set and may not be reset.")
409 }
410 o.unaryInt = i
411 })
412}
413
414// ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor
415// for unary RPCs. The first interceptor will be the outer most,
416// while the last interceptor will be the inner most wrapper around the real call.
417// All unary interceptors added by this method will be chained.
418func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption {
419 return newFuncServerOption(func(o *serverOptions) {
420 o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
421 })
422}
423
424// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
425// server. Only one stream interceptor can be installed.
426func StreamInterceptor(i StreamServerInterceptor) ServerOption {
427 return newFuncServerOption(func(o *serverOptions) {
428 if o.streamInt != nil {
429 panic("The stream server interceptor was already set and may not be reset.")
430 }
431 o.streamInt = i
432 })
433}
434
435// ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor
436// for streaming RPCs. The first interceptor will be the outer most,
437// while the last interceptor will be the inner most wrapper around the real call.
438// All stream interceptors added by this method will be chained.
439func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption {
440 return newFuncServerOption(func(o *serverOptions) {
441 o.chainStreamInts = append(o.chainStreamInts, interceptors...)
442 })
443}
444
445// InTapHandle returns a ServerOption that sets the tap handle for all the server
446// transport to be created. Only one can be installed.
447//
Joey Armstrongba3d9d12024-01-15 14:22:11 -0500448// # Experimental
khenaidoo5fc5cea2021-08-11 17:39:16 -0400449//
450// Notice: This API is EXPERIMENTAL and may be changed or removed in a
451// later release.
452func InTapHandle(h tap.ServerInHandle) ServerOption {
453 return newFuncServerOption(func(o *serverOptions) {
454 if o.inTapHandle != nil {
455 panic("The tap handle was already set and may not be reset.")
456 }
457 o.inTapHandle = h
458 })
459}
460
461// StatsHandler returns a ServerOption that sets the stats handler for the server.
462func StatsHandler(h stats.Handler) ServerOption {
463 return newFuncServerOption(func(o *serverOptions) {
Akash Kankanala761955c2024-02-21 19:32:20 +0530464 if h == nil {
465 logger.Error("ignoring nil parameter in grpc.StatsHandler ServerOption")
466 // Do not allow a nil stats handler, which would otherwise cause
467 // panics.
468 return
469 }
470 o.statsHandlers = append(o.statsHandlers, h)
471 })
472}
473
474// binaryLogger returns a ServerOption that can set the binary logger for the
475// server.
476func binaryLogger(bl binarylog.Logger) ServerOption {
477 return newFuncServerOption(func(o *serverOptions) {
478 o.binaryLogger = bl
khenaidoo5fc5cea2021-08-11 17:39:16 -0400479 })
480}
481
482// UnknownServiceHandler returns a ServerOption that allows for adding a custom
483// unknown service handler. The provided method is a bidi-streaming RPC service
484// handler that will be invoked instead of returning the "unimplemented" gRPC
485// error whenever a request is received for an unregistered service or method.
486// The handling function and stream interceptor (if set) have full access to
487// the ServerStream, including its Context.
488func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
489 return newFuncServerOption(func(o *serverOptions) {
490 o.unknownStreamDesc = &StreamDesc{
491 StreamName: "unknown_service_handler",
492 Handler: streamHandler,
493 // We need to assume that the users of the streamHandler will want to use both.
494 ClientStreams: true,
495 ServerStreams: true,
496 }
497 })
498}
499
500// ConnectionTimeout returns a ServerOption that sets the timeout for
501// connection establishment (up to and including HTTP/2 handshaking) for all
502// new connections. If this is not set, the default is 120 seconds. A zero or
503// negative value will result in an immediate timeout.
504//
Joey Armstrongba3d9d12024-01-15 14:22:11 -0500505// # Experimental
khenaidoo5fc5cea2021-08-11 17:39:16 -0400506//
507// Notice: This API is EXPERIMENTAL and may be changed or removed in a
508// later release.
509func ConnectionTimeout(d time.Duration) ServerOption {
510 return newFuncServerOption(func(o *serverOptions) {
511 o.connectionTimeout = d
512 })
513}
514
515// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
516// of header list that the server is prepared to accept.
517func MaxHeaderListSize(s uint32) ServerOption {
518 return newFuncServerOption(func(o *serverOptions) {
519 o.maxHeaderListSize = &s
520 })
521}
522
523// HeaderTableSize returns a ServerOption that sets the size of dynamic
524// header table for stream.
525//
Joey Armstrongba3d9d12024-01-15 14:22:11 -0500526// # Experimental
khenaidoo5fc5cea2021-08-11 17:39:16 -0400527//
528// Notice: This API is EXPERIMENTAL and may be changed or removed in a
529// later release.
530func HeaderTableSize(s uint32) ServerOption {
531 return newFuncServerOption(func(o *serverOptions) {
532 o.headerTableSize = &s
533 })
534}
535
536// NumStreamWorkers returns a ServerOption that sets the number of worker
537// goroutines that should be used to process incoming streams. Setting this to
538// zero (default) will disable workers and spawn a new goroutine for each
539// stream.
540//
Joey Armstrongba3d9d12024-01-15 14:22:11 -0500541// # Experimental
khenaidoo5fc5cea2021-08-11 17:39:16 -0400542//
543// Notice: This API is EXPERIMENTAL and may be changed or removed in a
544// later release.
545func NumStreamWorkers(numServerWorkers uint32) ServerOption {
546 // TODO: If/when this API gets stabilized (i.e. stream workers become the
547 // only way streams are processed), change the behavior of the zero value to
548 // a sane default. Preliminary experiments suggest that a value equal to the
549 // number of CPUs available is most performant; requires thorough testing.
550 return newFuncServerOption(func(o *serverOptions) {
551 o.numServerWorkers = numServerWorkers
552 })
553}
554
555// serverWorkerResetThreshold defines how often the stack must be reset. Every
556// N requests, by spawning a new goroutine in its place, a worker can reset its
557// stack so that large stacks don't live in memory forever. 2^16 should allow
558// each goroutine stack to live for at least a few seconds in a typical
559// workload (assuming a QPS of a few thousand requests/sec).
560const serverWorkerResetThreshold = 1 << 16
561
562// serverWorkers blocks on a *transport.Stream channel forever and waits for
Akash Kankanala761955c2024-02-21 19:32:20 +0530563// data to be fed by serveStreams. This allows multiple requests to be
khenaidoo5fc5cea2021-08-11 17:39:16 -0400564// processed by the same goroutine, removing the need for expensive stack
565// re-allocations (see the runtime.morestack problem [1]).
566//
567// [1] https://github.com/golang/go/issues/18138
Akash Kankanala761955c2024-02-21 19:32:20 +0530568func (s *Server) serverWorker() {
569 for completed := 0; completed < serverWorkerResetThreshold; completed++ {
570 data, ok := <-s.serverWorkerChannel
khenaidoo5fc5cea2021-08-11 17:39:16 -0400571 if !ok {
572 return
573 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530574 s.handleSingleStream(data)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400575 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530576 go s.serverWorker()
khenaidoo5fc5cea2021-08-11 17:39:16 -0400577}
578
Akash Kankanala761955c2024-02-21 19:32:20 +0530579func (s *Server) handleSingleStream(data *serverWorkerData) {
580 defer data.wg.Done()
581 s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
582}
583
584// initServerWorkers creates worker goroutines and a channel to process incoming
khenaidoo5fc5cea2021-08-11 17:39:16 -0400585// connections to reduce the time spent overall on runtime.morestack.
586func (s *Server) initServerWorkers() {
Akash Kankanala761955c2024-02-21 19:32:20 +0530587 s.serverWorkerChannel = make(chan *serverWorkerData)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400588 for i := uint32(0); i < s.opts.numServerWorkers; i++ {
Akash Kankanala761955c2024-02-21 19:32:20 +0530589 go s.serverWorker()
khenaidoo5fc5cea2021-08-11 17:39:16 -0400590 }
591}
592
593func (s *Server) stopServerWorkers() {
Akash Kankanala761955c2024-02-21 19:32:20 +0530594 close(s.serverWorkerChannel)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400595}
596
597// NewServer creates a gRPC server which has no service registered and has not
598// started to accept requests yet.
599func NewServer(opt ...ServerOption) *Server {
600 opts := defaultServerOptions
Akash Kankanala761955c2024-02-21 19:32:20 +0530601 for _, o := range globalServerOptions {
602 o.apply(&opts)
603 }
khenaidoo5fc5cea2021-08-11 17:39:16 -0400604 for _, o := range opt {
605 o.apply(&opts)
606 }
607 s := &Server{
608 lis: make(map[net.Listener]bool),
609 opts: opts,
610 conns: make(map[string]map[transport.ServerTransport]bool),
611 services: make(map[string]*serviceInfo),
612 quit: grpcsync.NewEvent(),
613 done: grpcsync.NewEvent(),
614 czData: new(channelzData),
615 }
616 chainUnaryServerInterceptors(s)
617 chainStreamServerInterceptors(s)
618 s.cv = sync.NewCond(&s.mu)
619 if EnableTracing {
620 _, file, line, _ := runtime.Caller(1)
621 s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
622 }
623
624 if s.opts.numServerWorkers > 0 {
625 s.initServerWorkers()
626 }
627
Akash Kankanala761955c2024-02-21 19:32:20 +0530628 s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
629 channelz.Info(logger, s.channelzID, "Server created")
khenaidoo5fc5cea2021-08-11 17:39:16 -0400630 return s
631}
632
633// printf records an event in s's event log, unless s has been stopped.
634// REQUIRES s.mu is held.
635func (s *Server) printf(format string, a ...interface{}) {
636 if s.events != nil {
637 s.events.Printf(format, a...)
638 }
639}
640
641// errorf records an error in s's event log, unless s has been stopped.
642// REQUIRES s.mu is held.
643func (s *Server) errorf(format string, a ...interface{}) {
644 if s.events != nil {
645 s.events.Errorf(format, a...)
646 }
647}
648
649// ServiceRegistrar wraps a single method that supports service registration. It
650// enables users to pass concrete types other than grpc.Server to the service
651// registration methods exported by the IDL generated code.
652type ServiceRegistrar interface {
653 // RegisterService registers a service and its implementation to the
654 // concrete type implementing this interface. It may not be called
655 // once the server has started serving.
656 // desc describes the service and its methods and handlers. impl is the
657 // service implementation which is passed to the method handlers.
658 RegisterService(desc *ServiceDesc, impl interface{})
659}
660
661// RegisterService registers a service and its implementation to the gRPC
662// server. It is called from the IDL generated code. This must be called before
663// invoking Serve. If ss is non-nil (for legacy code), its type is checked to
664// ensure it implements sd.HandlerType.
665func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
666 if ss != nil {
667 ht := reflect.TypeOf(sd.HandlerType).Elem()
668 st := reflect.TypeOf(ss)
669 if !st.Implements(ht) {
670 logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
671 }
672 }
673 s.register(sd, ss)
674}
675
676func (s *Server) register(sd *ServiceDesc, ss interface{}) {
677 s.mu.Lock()
678 defer s.mu.Unlock()
679 s.printf("RegisterService(%q)", sd.ServiceName)
680 if s.serve {
681 logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
682 }
683 if _, ok := s.services[sd.ServiceName]; ok {
684 logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
685 }
686 info := &serviceInfo{
687 serviceImpl: ss,
688 methods: make(map[string]*MethodDesc),
689 streams: make(map[string]*StreamDesc),
690 mdata: sd.Metadata,
691 }
692 for i := range sd.Methods {
693 d := &sd.Methods[i]
694 info.methods[d.MethodName] = d
695 }
696 for i := range sd.Streams {
697 d := &sd.Streams[i]
698 info.streams[d.StreamName] = d
699 }
700 s.services[sd.ServiceName] = info
701}
702
703// MethodInfo contains the information of an RPC including its method name and type.
704type MethodInfo struct {
705 // Name is the method name only, without the service name or package name.
706 Name string
707 // IsClientStream indicates whether the RPC is a client streaming RPC.
708 IsClientStream bool
709 // IsServerStream indicates whether the RPC is a server streaming RPC.
710 IsServerStream bool
711}
712
713// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
714type ServiceInfo struct {
715 Methods []MethodInfo
716 // Metadata is the metadata specified in ServiceDesc when registering service.
717 Metadata interface{}
718}
719
720// GetServiceInfo returns a map from service names to ServiceInfo.
721// Service names include the package names, in the form of <package>.<service>.
722func (s *Server) GetServiceInfo() map[string]ServiceInfo {
723 ret := make(map[string]ServiceInfo)
724 for n, srv := range s.services {
725 methods := make([]MethodInfo, 0, len(srv.methods)+len(srv.streams))
726 for m := range srv.methods {
727 methods = append(methods, MethodInfo{
728 Name: m,
729 IsClientStream: false,
730 IsServerStream: false,
731 })
732 }
733 for m, d := range srv.streams {
734 methods = append(methods, MethodInfo{
735 Name: m,
736 IsClientStream: d.ClientStreams,
737 IsServerStream: d.ServerStreams,
738 })
739 }
740
741 ret[n] = ServiceInfo{
742 Methods: methods,
743 Metadata: srv.mdata,
744 }
745 }
746 return ret
747}
748
749// ErrServerStopped indicates that the operation is now illegal because of
750// the server being stopped.
751var ErrServerStopped = errors.New("grpc: the server has been stopped")
752
753type listenSocket struct {
754 net.Listener
Akash Kankanala761955c2024-02-21 19:32:20 +0530755 channelzID *channelz.Identifier
khenaidoo5fc5cea2021-08-11 17:39:16 -0400756}
757
758func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
759 return &channelz.SocketInternalMetric{
760 SocketOptions: channelz.GetSocketOption(l.Listener),
761 LocalAddr: l.Listener.Addr(),
762 }
763}
764
765func (l *listenSocket) Close() error {
766 err := l.Listener.Close()
Akash Kankanala761955c2024-02-21 19:32:20 +0530767 channelz.RemoveEntry(l.channelzID)
768 channelz.Info(logger, l.channelzID, "ListenSocket deleted")
khenaidoo5fc5cea2021-08-11 17:39:16 -0400769 return err
770}
771
772// Serve accepts incoming connections on the listener lis, creating a new
773// ServerTransport and service goroutine for each. The service goroutines
774// read gRPC requests and then call the registered handlers to reply to them.
775// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
776// this method returns.
777// Serve will return a non-nil error unless Stop or GracefulStop is called.
778func (s *Server) Serve(lis net.Listener) error {
779 s.mu.Lock()
780 s.printf("serving")
781 s.serve = true
782 if s.lis == nil {
783 // Serve called after Stop or GracefulStop.
784 s.mu.Unlock()
785 lis.Close()
786 return ErrServerStopped
787 }
788
789 s.serveWG.Add(1)
790 defer func() {
791 s.serveWG.Done()
792 if s.quit.HasFired() {
793 // Stop or GracefulStop called; block until done and return nil.
794 <-s.done.Done()
795 }
796 }()
797
798 ls := &listenSocket{Listener: lis}
799 s.lis[ls] = true
800
khenaidoo5fc5cea2021-08-11 17:39:16 -0400801 defer func() {
802 s.mu.Lock()
803 if s.lis != nil && s.lis[ls] {
804 ls.Close()
805 delete(s.lis, ls)
806 }
807 s.mu.Unlock()
808 }()
809
Akash Kankanala761955c2024-02-21 19:32:20 +0530810 var err error
811 ls.channelzID, err = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
812 if err != nil {
813 s.mu.Unlock()
814 return err
815 }
816 s.mu.Unlock()
817 channelz.Info(logger, ls.channelzID, "ListenSocket created")
khenaidoo5fc5cea2021-08-11 17:39:16 -0400818
Akash Kankanala761955c2024-02-21 19:32:20 +0530819 var tempDelay time.Duration // how long to sleep on accept failure
khenaidoo5fc5cea2021-08-11 17:39:16 -0400820 for {
821 rawConn, err := lis.Accept()
822 if err != nil {
823 if ne, ok := err.(interface {
824 Temporary() bool
825 }); ok && ne.Temporary() {
826 if tempDelay == 0 {
827 tempDelay = 5 * time.Millisecond
828 } else {
829 tempDelay *= 2
830 }
831 if max := 1 * time.Second; tempDelay > max {
832 tempDelay = max
833 }
834 s.mu.Lock()
835 s.printf("Accept error: %v; retrying in %v", err, tempDelay)
836 s.mu.Unlock()
837 timer := time.NewTimer(tempDelay)
838 select {
839 case <-timer.C:
840 case <-s.quit.Done():
841 timer.Stop()
842 return nil
843 }
844 continue
845 }
846 s.mu.Lock()
847 s.printf("done serving; Accept = %v", err)
848 s.mu.Unlock()
849
850 if s.quit.HasFired() {
851 return nil
852 }
853 return err
854 }
855 tempDelay = 0
856 // Start a new goroutine to deal with rawConn so we don't stall this Accept
857 // loop goroutine.
858 //
859 // Make sure we account for the goroutine so GracefulStop doesn't nil out
860 // s.conns before this conn can be added.
861 s.serveWG.Add(1)
862 go func() {
863 s.handleRawConn(lis.Addr().String(), rawConn)
864 s.serveWG.Done()
865 }()
866 }
867}
868
869// handleRawConn forks a goroutine to handle a just-accepted connection that
870// has not had any I/O performed on it yet.
871func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
872 if s.quit.HasFired() {
873 rawConn.Close()
874 return
875 }
876 rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
877
878 // Finish handshaking (HTTP2)
879 st := s.newHTTP2Transport(rawConn)
880 rawConn.SetDeadline(time.Time{})
881 if st == nil {
882 return
883 }
884
885 if !s.addConn(lisAddr, st) {
886 return
887 }
888 go func() {
889 s.serveStreams(st)
890 s.removeConn(lisAddr, st)
891 }()
892}
893
894func (s *Server) drainServerTransports(addr string) {
895 s.mu.Lock()
896 conns := s.conns[addr]
897 for st := range conns {
Akash Kankanala761955c2024-02-21 19:32:20 +0530898 st.Drain("")
khenaidoo5fc5cea2021-08-11 17:39:16 -0400899 }
900 s.mu.Unlock()
901}
902
903// newHTTP2Transport sets up a http/2 transport (using the
904// gRPC http2 server transport in transport/http2_server.go).
905func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
906 config := &transport.ServerConfig{
907 MaxStreams: s.opts.maxConcurrentStreams,
908 ConnectionTimeout: s.opts.connectionTimeout,
909 Credentials: s.opts.creds,
910 InTapHandle: s.opts.inTapHandle,
Akash Kankanala761955c2024-02-21 19:32:20 +0530911 StatsHandlers: s.opts.statsHandlers,
khenaidoo5fc5cea2021-08-11 17:39:16 -0400912 KeepaliveParams: s.opts.keepaliveParams,
913 KeepalivePolicy: s.opts.keepalivePolicy,
914 InitialWindowSize: s.opts.initialWindowSize,
915 InitialConnWindowSize: s.opts.initialConnWindowSize,
916 WriteBufferSize: s.opts.writeBufferSize,
917 ReadBufferSize: s.opts.readBufferSize,
918 ChannelzParentID: s.channelzID,
919 MaxHeaderListSize: s.opts.maxHeaderListSize,
920 HeaderTableSize: s.opts.headerTableSize,
921 }
922 st, err := transport.NewServerTransport(c, config)
923 if err != nil {
924 s.mu.Lock()
925 s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
926 s.mu.Unlock()
927 // ErrConnDispatched means that the connection was dispatched away from
928 // gRPC; those connections should be left open.
929 if err != credentials.ErrConnDispatched {
khenaidoo5cb0d402021-12-08 14:09:16 -0500930 // Don't log on ErrConnDispatched and io.EOF to prevent log spam.
khenaidoo5fc5cea2021-08-11 17:39:16 -0400931 if err != io.EOF {
Akash Kankanala761955c2024-02-21 19:32:20 +0530932 channelz.Info(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400933 }
khenaidoo5cb0d402021-12-08 14:09:16 -0500934 c.Close()
khenaidoo5fc5cea2021-08-11 17:39:16 -0400935 }
936 return nil
937 }
938
939 return st
940}
941
942func (s *Server) serveStreams(st transport.ServerTransport) {
Akash Kankanala761955c2024-02-21 19:32:20 +0530943 defer st.Close(errors.New("finished serving streams for the server transport"))
khenaidoo5fc5cea2021-08-11 17:39:16 -0400944 var wg sync.WaitGroup
945
khenaidoo5fc5cea2021-08-11 17:39:16 -0400946 st.HandleStreams(func(stream *transport.Stream) {
947 wg.Add(1)
948 if s.opts.numServerWorkers > 0 {
949 data := &serverWorkerData{st: st, wg: &wg, stream: stream}
950 select {
Akash Kankanala761955c2024-02-21 19:32:20 +0530951 case s.serverWorkerChannel <- data:
952 return
khenaidoo5fc5cea2021-08-11 17:39:16 -0400953 default:
954 // If all stream workers are busy, fallback to the default code path.
khenaidoo5fc5cea2021-08-11 17:39:16 -0400955 }
khenaidoo5fc5cea2021-08-11 17:39:16 -0400956 }
Akash Kankanala761955c2024-02-21 19:32:20 +0530957 go func() {
958 defer wg.Done()
959 s.handleStream(st, stream, s.traceInfo(st, stream))
960 }()
khenaidoo5fc5cea2021-08-11 17:39:16 -0400961 }, func(ctx context.Context, method string) context.Context {
962 if !EnableTracing {
963 return ctx
964 }
965 tr := trace.New("grpc.Recv."+methodFamily(method), method)
966 return trace.NewContext(ctx, tr)
967 })
968 wg.Wait()
969}
970
971var _ http.Handler = (*Server)(nil)
972
973// ServeHTTP implements the Go standard library's http.Handler
974// interface by responding to the gRPC request r, by looking up
975// the requested gRPC method in the gRPC server s.
976//
977// The provided HTTP request must have arrived on an HTTP/2
978// connection. When using the Go standard library's server,
979// practically this means that the Request must also have arrived
980// over TLS.
981//
982// To share one port (such as 443 for https) between gRPC and an
983// existing http.Handler, use a root http.Handler such as:
984//
Joey Armstrongba3d9d12024-01-15 14:22:11 -0500985// if r.ProtoMajor == 2 && strings.HasPrefix(
986// r.Header.Get("Content-Type"), "application/grpc") {
987// grpcServer.ServeHTTP(w, r)
988// } else {
989// yourMux.ServeHTTP(w, r)
990// }
khenaidoo5fc5cea2021-08-11 17:39:16 -0400991//
992// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
993// separate from grpc-go's HTTP/2 server. Performance and features may vary
994// between the two paths. ServeHTTP does not support some gRPC features
995// available through grpc-go's HTTP/2 server.
996//
Joey Armstrongba3d9d12024-01-15 14:22:11 -0500997// # Experimental
khenaidoo5fc5cea2021-08-11 17:39:16 -0400998//
999// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1000// later release.
1001func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Akash Kankanala761955c2024-02-21 19:32:20 +05301002 st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandlers)
khenaidoo5fc5cea2021-08-11 17:39:16 -04001003 if err != nil {
Akash Kankanala761955c2024-02-21 19:32:20 +05301004 // Errors returned from transport.NewServerHandlerTransport have
1005 // already been written to w.
khenaidoo5fc5cea2021-08-11 17:39:16 -04001006 return
1007 }
1008 if !s.addConn(listenerAddressForServeHTTP, st) {
1009 return
1010 }
1011 defer s.removeConn(listenerAddressForServeHTTP, st)
1012 s.serveStreams(st)
1013}
1014
1015// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
1016// If tracing is not enabled, it returns nil.
1017func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
1018 if !EnableTracing {
1019 return nil
1020 }
1021 tr, ok := trace.FromContext(stream.Context())
1022 if !ok {
1023 return nil
1024 }
1025
1026 trInfo = &traceInfo{
1027 tr: tr,
1028 firstLine: firstLine{
1029 client: false,
1030 remoteAddr: st.RemoteAddr(),
1031 },
1032 }
1033 if dl, ok := stream.Context().Deadline(); ok {
1034 trInfo.firstLine.deadline = time.Until(dl)
1035 }
1036 return trInfo
1037}
1038
1039func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
1040 s.mu.Lock()
1041 defer s.mu.Unlock()
1042 if s.conns == nil {
Akash Kankanala761955c2024-02-21 19:32:20 +05301043 st.Close(errors.New("Server.addConn called when server has already been stopped"))
khenaidoo5fc5cea2021-08-11 17:39:16 -04001044 return false
1045 }
1046 if s.drain {
1047 // Transport added after we drained our existing conns: drain it
1048 // immediately.
Akash Kankanala761955c2024-02-21 19:32:20 +05301049 st.Drain("")
khenaidoo5fc5cea2021-08-11 17:39:16 -04001050 }
1051
1052 if s.conns[addr] == nil {
1053 // Create a map entry if this is the first connection on this listener.
1054 s.conns[addr] = make(map[transport.ServerTransport]bool)
1055 }
1056 s.conns[addr][st] = true
1057 return true
1058}
1059
1060func (s *Server) removeConn(addr string, st transport.ServerTransport) {
1061 s.mu.Lock()
1062 defer s.mu.Unlock()
1063
1064 conns := s.conns[addr]
1065 if conns != nil {
1066 delete(conns, st)
1067 if len(conns) == 0 {
1068 // If the last connection for this address is being removed, also
1069 // remove the map entry corresponding to the address. This is used
1070 // in GracefulStop() when waiting for all connections to be closed.
1071 delete(s.conns, addr)
1072 }
1073 s.cv.Broadcast()
1074 }
1075}
1076
1077func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
1078 return &channelz.ServerInternalMetric{
1079 CallsStarted: atomic.LoadInt64(&s.czData.callsStarted),
1080 CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded),
1081 CallsFailed: atomic.LoadInt64(&s.czData.callsFailed),
1082 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
1083 }
1084}
1085
1086func (s *Server) incrCallsStarted() {
1087 atomic.AddInt64(&s.czData.callsStarted, 1)
1088 atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
1089}
1090
1091func (s *Server) incrCallsSucceeded() {
1092 atomic.AddInt64(&s.czData.callsSucceeded, 1)
1093}
1094
1095func (s *Server) incrCallsFailed() {
1096 atomic.AddInt64(&s.czData.callsFailed, 1)
1097}
1098
1099func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
1100 data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
1101 if err != nil {
1102 channelz.Error(logger, s.channelzID, "grpc: server failed to encode response: ", err)
1103 return err
1104 }
1105 compData, err := compress(data, cp, comp)
1106 if err != nil {
1107 channelz.Error(logger, s.channelzID, "grpc: server failed to compress response: ", err)
1108 return err
1109 }
1110 hdr, payload := msgHeader(data, compData)
1111 // TODO(dfawley): should we be checking len(data) instead?
1112 if len(payload) > s.opts.maxSendMessageSize {
1113 return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
1114 }
1115 err = t.Write(stream, hdr, payload, opts)
Akash Kankanala761955c2024-02-21 19:32:20 +05301116 if err == nil {
1117 for _, sh := range s.opts.statsHandlers {
1118 sh.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
1119 }
khenaidoo5fc5cea2021-08-11 17:39:16 -04001120 }
1121 return err
1122}
1123
1124// chainUnaryServerInterceptors chains all unary server interceptors into one.
1125func chainUnaryServerInterceptors(s *Server) {
1126 // Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will
1127 // be executed before any other chained interceptors.
1128 interceptors := s.opts.chainUnaryInts
1129 if s.opts.unaryInt != nil {
1130 interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
1131 }
1132
1133 var chainedInt UnaryServerInterceptor
1134 if len(interceptors) == 0 {
1135 chainedInt = nil
1136 } else if len(interceptors) == 1 {
1137 chainedInt = interceptors[0]
1138 } else {
1139 chainedInt = chainUnaryInterceptors(interceptors)
1140 }
1141
1142 s.opts.unaryInt = chainedInt
1143}
1144
1145func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerInterceptor {
1146 return func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
Akash Kankanala761955c2024-02-21 19:32:20 +05301147 return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
1148 }
1149}
1150
1151func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
1152 if curr == len(interceptors)-1 {
1153 return finalHandler
1154 }
1155 return func(ctx context.Context, req interface{}) (interface{}, error) {
1156 return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
khenaidoo5fc5cea2021-08-11 17:39:16 -04001157 }
1158}
1159
1160func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
Akash Kankanala761955c2024-02-21 19:32:20 +05301161 shs := s.opts.statsHandlers
1162 if len(shs) != 0 || trInfo != nil || channelz.IsOn() {
khenaidoo5fc5cea2021-08-11 17:39:16 -04001163 if channelz.IsOn() {
1164 s.incrCallsStarted()
1165 }
1166 var statsBegin *stats.Begin
Akash Kankanala761955c2024-02-21 19:32:20 +05301167 for _, sh := range shs {
khenaidoo5fc5cea2021-08-11 17:39:16 -04001168 beginTime := time.Now()
1169 statsBegin = &stats.Begin{
1170 BeginTime: beginTime,
1171 IsClientStream: false,
1172 IsServerStream: false,
1173 }
1174 sh.HandleRPC(stream.Context(), statsBegin)
1175 }
1176 if trInfo != nil {
1177 trInfo.tr.LazyLog(&trInfo.firstLine, false)
1178 }
1179 // The deferred error handling for tracing, stats handler and channelz are
1180 // combined into one function to reduce stack usage -- a defer takes ~56-64
1181 // bytes on the stack, so overflowing the stack will require a stack
1182 // re-allocation, which is expensive.
1183 //
1184 // To maintain behavior similar to separate deferred statements, statements
1185 // should be executed in the reverse order. That is, tracing first, stats
1186 // handler second, and channelz last. Note that panics *within* defers will
1187 // lead to different behavior, but that's an acceptable compromise; that
1188 // would be undefined behavior territory anyway.
1189 defer func() {
1190 if trInfo != nil {
1191 if err != nil && err != io.EOF {
1192 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1193 trInfo.tr.SetError()
1194 }
1195 trInfo.tr.Finish()
1196 }
1197
Akash Kankanala761955c2024-02-21 19:32:20 +05301198 for _, sh := range shs {
khenaidoo5fc5cea2021-08-11 17:39:16 -04001199 end := &stats.End{
1200 BeginTime: statsBegin.BeginTime,
1201 EndTime: time.Now(),
1202 }
1203 if err != nil && err != io.EOF {
1204 end.Error = toRPCErr(err)
1205 }
1206 sh.HandleRPC(stream.Context(), end)
1207 }
1208
1209 if channelz.IsOn() {
1210 if err != nil && err != io.EOF {
1211 s.incrCallsFailed()
1212 } else {
1213 s.incrCallsSucceeded()
1214 }
1215 }
1216 }()
1217 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301218 var binlogs []binarylog.MethodLogger
1219 if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil {
1220 binlogs = append(binlogs, ml)
1221 }
1222 if s.opts.binaryLogger != nil {
1223 if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil {
1224 binlogs = append(binlogs, ml)
1225 }
1226 }
1227 if len(binlogs) != 0 {
khenaidoo5fc5cea2021-08-11 17:39:16 -04001228 ctx := stream.Context()
1229 md, _ := metadata.FromIncomingContext(ctx)
1230 logEntry := &binarylog.ClientHeader{
1231 Header: md,
1232 MethodName: stream.Method(),
1233 PeerAddr: nil,
1234 }
1235 if deadline, ok := ctx.Deadline(); ok {
1236 logEntry.Timeout = time.Until(deadline)
1237 if logEntry.Timeout < 0 {
1238 logEntry.Timeout = 0
1239 }
1240 }
1241 if a := md[":authority"]; len(a) > 0 {
1242 logEntry.Authority = a[0]
1243 }
1244 if peer, ok := peer.FromContext(ctx); ok {
1245 logEntry.PeerAddr = peer.Addr
1246 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301247 for _, binlog := range binlogs {
1248 binlog.Log(ctx, logEntry)
1249 }
khenaidoo5fc5cea2021-08-11 17:39:16 -04001250 }
1251
1252 // comp and cp are used for compression. decomp and dc are used for
1253 // decompression. If comp and decomp are both set, they are the same;
1254 // however they are kept separate to ensure that at most one of the
1255 // compressor/decompressor variable pairs are set for use later.
1256 var comp, decomp encoding.Compressor
1257 var cp Compressor
1258 var dc Decompressor
Akash Kankanala761955c2024-02-21 19:32:20 +05301259 var sendCompressorName string
khenaidoo5fc5cea2021-08-11 17:39:16 -04001260
1261 // If dc is set and matches the stream's compression, use it. Otherwise, try
1262 // to find a matching registered compressor for decomp.
1263 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1264 dc = s.opts.dc
1265 } else if rc != "" && rc != encoding.Identity {
1266 decomp = encoding.GetCompressor(rc)
1267 if decomp == nil {
1268 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1269 t.WriteStatus(stream, st)
1270 return st.Err()
1271 }
1272 }
1273
1274 // If cp is set, use it. Otherwise, attempt to compress the response using
1275 // the incoming message compression method.
1276 //
1277 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1278 if s.opts.cp != nil {
1279 cp = s.opts.cp
Akash Kankanala761955c2024-02-21 19:32:20 +05301280 sendCompressorName = cp.Type()
khenaidoo5fc5cea2021-08-11 17:39:16 -04001281 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1282 // Legacy compressor not specified; attempt to respond with same encoding.
1283 comp = encoding.GetCompressor(rc)
1284 if comp != nil {
Akash Kankanala761955c2024-02-21 19:32:20 +05301285 sendCompressorName = comp.Name()
1286 }
1287 }
1288
1289 if sendCompressorName != "" {
1290 if err := stream.SetSendCompress(sendCompressorName); err != nil {
1291 return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err)
khenaidoo5fc5cea2021-08-11 17:39:16 -04001292 }
1293 }
1294
1295 var payInfo *payloadInfo
Akash Kankanala761955c2024-02-21 19:32:20 +05301296 if len(shs) != 0 || len(binlogs) != 0 {
khenaidoo5fc5cea2021-08-11 17:39:16 -04001297 payInfo = &payloadInfo{}
1298 }
1299 d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
1300 if err != nil {
1301 if e := t.WriteStatus(stream, status.Convert(err)); e != nil {
Akash Kankanala761955c2024-02-21 19:32:20 +05301302 channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
khenaidoo5fc5cea2021-08-11 17:39:16 -04001303 }
1304 return err
1305 }
1306 if channelz.IsOn() {
1307 t.IncrMsgRecv()
1308 }
1309 df := func(v interface{}) error {
1310 if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
1311 return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
1312 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301313 for _, sh := range shs {
khenaidoo5fc5cea2021-08-11 17:39:16 -04001314 sh.HandleRPC(stream.Context(), &stats.InPayload{
Akash Kankanala761955c2024-02-21 19:32:20 +05301315 RecvTime: time.Now(),
1316 Payload: v,
1317 Length: len(d),
1318 WireLength: payInfo.compressedLength + headerLen,
1319 CompressedLength: payInfo.compressedLength,
1320 Data: d,
khenaidoo5fc5cea2021-08-11 17:39:16 -04001321 })
1322 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301323 if len(binlogs) != 0 {
1324 cm := &binarylog.ClientMessage{
khenaidoo5fc5cea2021-08-11 17:39:16 -04001325 Message: d,
Akash Kankanala761955c2024-02-21 19:32:20 +05301326 }
1327 for _, binlog := range binlogs {
1328 binlog.Log(stream.Context(), cm)
1329 }
khenaidoo5fc5cea2021-08-11 17:39:16 -04001330 }
1331 if trInfo != nil {
1332 trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
1333 }
1334 return nil
1335 }
1336 ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1337 reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
1338 if appErr != nil {
1339 appStatus, ok := status.FromError(appErr)
1340 if !ok {
Akash Kankanala761955c2024-02-21 19:32:20 +05301341 // Convert non-status application error to a status error with code
1342 // Unknown, but handle context errors specifically.
1343 appStatus = status.FromContextError(appErr)
1344 appErr = appStatus.Err()
khenaidoo5fc5cea2021-08-11 17:39:16 -04001345 }
1346 if trInfo != nil {
1347 trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1348 trInfo.tr.SetError()
1349 }
1350 if e := t.WriteStatus(stream, appStatus); e != nil {
1351 channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
1352 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301353 if len(binlogs) != 0 {
khenaidoo5fc5cea2021-08-11 17:39:16 -04001354 if h, _ := stream.Header(); h.Len() > 0 {
1355 // Only log serverHeader if there was header. Otherwise it can
1356 // be trailer only.
Akash Kankanala761955c2024-02-21 19:32:20 +05301357 sh := &binarylog.ServerHeader{
khenaidoo5fc5cea2021-08-11 17:39:16 -04001358 Header: h,
Akash Kankanala761955c2024-02-21 19:32:20 +05301359 }
1360 for _, binlog := range binlogs {
1361 binlog.Log(stream.Context(), sh)
1362 }
khenaidoo5fc5cea2021-08-11 17:39:16 -04001363 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301364 st := &binarylog.ServerTrailer{
khenaidoo5fc5cea2021-08-11 17:39:16 -04001365 Trailer: stream.Trailer(),
1366 Err: appErr,
Akash Kankanala761955c2024-02-21 19:32:20 +05301367 }
1368 for _, binlog := range binlogs {
1369 binlog.Log(stream.Context(), st)
1370 }
khenaidoo5fc5cea2021-08-11 17:39:16 -04001371 }
1372 return appErr
1373 }
1374 if trInfo != nil {
1375 trInfo.tr.LazyLog(stringer("OK"), false)
1376 }
1377 opts := &transport.Options{Last: true}
1378
Akash Kankanala761955c2024-02-21 19:32:20 +05301379 // Server handler could have set new compressor by calling SetSendCompressor.
1380 // In case it is set, we need to use it for compressing outbound message.
1381 if stream.SendCompress() != sendCompressorName {
1382 comp = encoding.GetCompressor(stream.SendCompress())
1383 }
khenaidoo5fc5cea2021-08-11 17:39:16 -04001384 if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
1385 if err == io.EOF {
1386 // The entire stream is done (for unary RPC only).
1387 return err
1388 }
1389 if sts, ok := status.FromError(err); ok {
1390 if e := t.WriteStatus(stream, sts); e != nil {
1391 channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
1392 }
1393 } else {
1394 switch st := err.(type) {
1395 case transport.ConnectionError:
1396 // Nothing to do here.
1397 default:
1398 panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
1399 }
1400 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301401 if len(binlogs) != 0 {
khenaidoo5fc5cea2021-08-11 17:39:16 -04001402 h, _ := stream.Header()
Akash Kankanala761955c2024-02-21 19:32:20 +05301403 sh := &binarylog.ServerHeader{
khenaidoo5fc5cea2021-08-11 17:39:16 -04001404 Header: h,
Akash Kankanala761955c2024-02-21 19:32:20 +05301405 }
1406 st := &binarylog.ServerTrailer{
khenaidoo5fc5cea2021-08-11 17:39:16 -04001407 Trailer: stream.Trailer(),
1408 Err: appErr,
Akash Kankanala761955c2024-02-21 19:32:20 +05301409 }
1410 for _, binlog := range binlogs {
1411 binlog.Log(stream.Context(), sh)
1412 binlog.Log(stream.Context(), st)
1413 }
khenaidoo5fc5cea2021-08-11 17:39:16 -04001414 }
1415 return err
1416 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301417 if len(binlogs) != 0 {
khenaidoo5fc5cea2021-08-11 17:39:16 -04001418 h, _ := stream.Header()
Akash Kankanala761955c2024-02-21 19:32:20 +05301419 sh := &binarylog.ServerHeader{
khenaidoo5fc5cea2021-08-11 17:39:16 -04001420 Header: h,
Akash Kankanala761955c2024-02-21 19:32:20 +05301421 }
1422 sm := &binarylog.ServerMessage{
khenaidoo5fc5cea2021-08-11 17:39:16 -04001423 Message: reply,
Akash Kankanala761955c2024-02-21 19:32:20 +05301424 }
1425 for _, binlog := range binlogs {
1426 binlog.Log(stream.Context(), sh)
1427 binlog.Log(stream.Context(), sm)
1428 }
khenaidoo5fc5cea2021-08-11 17:39:16 -04001429 }
1430 if channelz.IsOn() {
1431 t.IncrMsgSent()
1432 }
1433 if trInfo != nil {
1434 trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
1435 }
1436 // TODO: Should we be logging if writing status failed here, like above?
1437 // Should the logging be in WriteStatus? Should we ignore the WriteStatus
1438 // error or allow the stats handler to see it?
Akash Kankanala761955c2024-02-21 19:32:20 +05301439 if len(binlogs) != 0 {
1440 st := &binarylog.ServerTrailer{
khenaidoo5fc5cea2021-08-11 17:39:16 -04001441 Trailer: stream.Trailer(),
1442 Err: appErr,
Akash Kankanala761955c2024-02-21 19:32:20 +05301443 }
1444 for _, binlog := range binlogs {
1445 binlog.Log(stream.Context(), st)
1446 }
khenaidoo5fc5cea2021-08-11 17:39:16 -04001447 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301448 return t.WriteStatus(stream, statusOK)
khenaidoo5fc5cea2021-08-11 17:39:16 -04001449}
1450
1451// chainStreamServerInterceptors chains all stream server interceptors into one.
1452func chainStreamServerInterceptors(s *Server) {
1453 // Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will
1454 // be executed before any other chained interceptors.
1455 interceptors := s.opts.chainStreamInts
1456 if s.opts.streamInt != nil {
1457 interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...)
1458 }
1459
1460 var chainedInt StreamServerInterceptor
1461 if len(interceptors) == 0 {
1462 chainedInt = nil
1463 } else if len(interceptors) == 1 {
1464 chainedInt = interceptors[0]
1465 } else {
1466 chainedInt = chainStreamInterceptors(interceptors)
1467 }
1468
1469 s.opts.streamInt = chainedInt
1470}
1471
1472func chainStreamInterceptors(interceptors []StreamServerInterceptor) StreamServerInterceptor {
1473 return func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
Akash Kankanala761955c2024-02-21 19:32:20 +05301474 return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler))
1475 }
1476}
1477
1478func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler {
1479 if curr == len(interceptors)-1 {
1480 return finalHandler
1481 }
1482 return func(srv interface{}, stream ServerStream) error {
1483 return interceptors[curr+1](srv, stream, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler))
khenaidoo5fc5cea2021-08-11 17:39:16 -04001484 }
1485}
1486
1487func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) {
1488 if channelz.IsOn() {
1489 s.incrCallsStarted()
1490 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301491 shs := s.opts.statsHandlers
khenaidoo5fc5cea2021-08-11 17:39:16 -04001492 var statsBegin *stats.Begin
Akash Kankanala761955c2024-02-21 19:32:20 +05301493 if len(shs) != 0 {
khenaidoo5fc5cea2021-08-11 17:39:16 -04001494 beginTime := time.Now()
1495 statsBegin = &stats.Begin{
1496 BeginTime: beginTime,
1497 IsClientStream: sd.ClientStreams,
1498 IsServerStream: sd.ServerStreams,
1499 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301500 for _, sh := range shs {
1501 sh.HandleRPC(stream.Context(), statsBegin)
1502 }
khenaidoo5fc5cea2021-08-11 17:39:16 -04001503 }
1504 ctx := NewContextWithServerTransportStream(stream.Context(), stream)
1505 ss := &serverStream{
1506 ctx: ctx,
1507 t: t,
1508 s: stream,
1509 p: &parser{r: stream},
1510 codec: s.getCodec(stream.ContentSubtype()),
1511 maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
1512 maxSendMessageSize: s.opts.maxSendMessageSize,
1513 trInfo: trInfo,
Akash Kankanala761955c2024-02-21 19:32:20 +05301514 statsHandler: shs,
khenaidoo5fc5cea2021-08-11 17:39:16 -04001515 }
1516
Akash Kankanala761955c2024-02-21 19:32:20 +05301517 if len(shs) != 0 || trInfo != nil || channelz.IsOn() {
khenaidoo5fc5cea2021-08-11 17:39:16 -04001518 // See comment in processUnaryRPC on defers.
1519 defer func() {
1520 if trInfo != nil {
1521 ss.mu.Lock()
1522 if err != nil && err != io.EOF {
1523 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1524 ss.trInfo.tr.SetError()
1525 }
1526 ss.trInfo.tr.Finish()
1527 ss.trInfo.tr = nil
1528 ss.mu.Unlock()
1529 }
1530
Akash Kankanala761955c2024-02-21 19:32:20 +05301531 if len(shs) != 0 {
khenaidoo5fc5cea2021-08-11 17:39:16 -04001532 end := &stats.End{
1533 BeginTime: statsBegin.BeginTime,
1534 EndTime: time.Now(),
1535 }
1536 if err != nil && err != io.EOF {
1537 end.Error = toRPCErr(err)
1538 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301539 for _, sh := range shs {
1540 sh.HandleRPC(stream.Context(), end)
1541 }
khenaidoo5fc5cea2021-08-11 17:39:16 -04001542 }
1543
1544 if channelz.IsOn() {
1545 if err != nil && err != io.EOF {
1546 s.incrCallsFailed()
1547 } else {
1548 s.incrCallsSucceeded()
1549 }
1550 }
1551 }()
1552 }
1553
Akash Kankanala761955c2024-02-21 19:32:20 +05301554 if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil {
1555 ss.binlogs = append(ss.binlogs, ml)
1556 }
1557 if s.opts.binaryLogger != nil {
1558 if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil {
1559 ss.binlogs = append(ss.binlogs, ml)
1560 }
1561 }
1562 if len(ss.binlogs) != 0 {
khenaidoo5fc5cea2021-08-11 17:39:16 -04001563 md, _ := metadata.FromIncomingContext(ctx)
1564 logEntry := &binarylog.ClientHeader{
1565 Header: md,
1566 MethodName: stream.Method(),
1567 PeerAddr: nil,
1568 }
1569 if deadline, ok := ctx.Deadline(); ok {
1570 logEntry.Timeout = time.Until(deadline)
1571 if logEntry.Timeout < 0 {
1572 logEntry.Timeout = 0
1573 }
1574 }
1575 if a := md[":authority"]; len(a) > 0 {
1576 logEntry.Authority = a[0]
1577 }
1578 if peer, ok := peer.FromContext(ss.Context()); ok {
1579 logEntry.PeerAddr = peer.Addr
1580 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301581 for _, binlog := range ss.binlogs {
1582 binlog.Log(stream.Context(), logEntry)
1583 }
khenaidoo5fc5cea2021-08-11 17:39:16 -04001584 }
1585
1586 // If dc is set and matches the stream's compression, use it. Otherwise, try
1587 // to find a matching registered compressor for decomp.
1588 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1589 ss.dc = s.opts.dc
1590 } else if rc != "" && rc != encoding.Identity {
1591 ss.decomp = encoding.GetCompressor(rc)
1592 if ss.decomp == nil {
1593 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1594 t.WriteStatus(ss.s, st)
1595 return st.Err()
1596 }
1597 }
1598
1599 // If cp is set, use it. Otherwise, attempt to compress the response using
1600 // the incoming message compression method.
1601 //
1602 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1603 if s.opts.cp != nil {
1604 ss.cp = s.opts.cp
Akash Kankanala761955c2024-02-21 19:32:20 +05301605 ss.sendCompressorName = s.opts.cp.Type()
khenaidoo5fc5cea2021-08-11 17:39:16 -04001606 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1607 // Legacy compressor not specified; attempt to respond with same encoding.
1608 ss.comp = encoding.GetCompressor(rc)
1609 if ss.comp != nil {
Akash Kankanala761955c2024-02-21 19:32:20 +05301610 ss.sendCompressorName = rc
1611 }
1612 }
1613
1614 if ss.sendCompressorName != "" {
1615 if err := stream.SetSendCompress(ss.sendCompressorName); err != nil {
1616 return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err)
khenaidoo5fc5cea2021-08-11 17:39:16 -04001617 }
1618 }
1619
1620 ss.ctx = newContextWithRPCInfo(ss.ctx, false, ss.codec, ss.cp, ss.comp)
1621
1622 if trInfo != nil {
1623 trInfo.tr.LazyLog(&trInfo.firstLine, false)
1624 }
1625 var appErr error
1626 var server interface{}
1627 if info != nil {
1628 server = info.serviceImpl
1629 }
1630 if s.opts.streamInt == nil {
1631 appErr = sd.Handler(server, ss)
1632 } else {
1633 info := &StreamServerInfo{
1634 FullMethod: stream.Method(),
1635 IsClientStream: sd.ClientStreams,
1636 IsServerStream: sd.ServerStreams,
1637 }
1638 appErr = s.opts.streamInt(server, ss, info, sd.Handler)
1639 }
1640 if appErr != nil {
1641 appStatus, ok := status.FromError(appErr)
1642 if !ok {
Akash Kankanala761955c2024-02-21 19:32:20 +05301643 // Convert non-status application error to a status error with code
1644 // Unknown, but handle context errors specifically.
1645 appStatus = status.FromContextError(appErr)
khenaidoo5fc5cea2021-08-11 17:39:16 -04001646 appErr = appStatus.Err()
1647 }
1648 if trInfo != nil {
1649 ss.mu.Lock()
1650 ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1651 ss.trInfo.tr.SetError()
1652 ss.mu.Unlock()
1653 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301654 if len(ss.binlogs) != 0 {
1655 st := &binarylog.ServerTrailer{
khenaidoo5fc5cea2021-08-11 17:39:16 -04001656 Trailer: ss.s.Trailer(),
1657 Err: appErr,
Akash Kankanala761955c2024-02-21 19:32:20 +05301658 }
1659 for _, binlog := range ss.binlogs {
1660 binlog.Log(stream.Context(), st)
1661 }
khenaidoo5fc5cea2021-08-11 17:39:16 -04001662 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301663 t.WriteStatus(ss.s, appStatus)
khenaidoo5fc5cea2021-08-11 17:39:16 -04001664 // TODO: Should we log an error from WriteStatus here and below?
1665 return appErr
1666 }
1667 if trInfo != nil {
1668 ss.mu.Lock()
1669 ss.trInfo.tr.LazyLog(stringer("OK"), false)
1670 ss.mu.Unlock()
1671 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301672 if len(ss.binlogs) != 0 {
1673 st := &binarylog.ServerTrailer{
khenaidoo5fc5cea2021-08-11 17:39:16 -04001674 Trailer: ss.s.Trailer(),
1675 Err: appErr,
Akash Kankanala761955c2024-02-21 19:32:20 +05301676 }
1677 for _, binlog := range ss.binlogs {
1678 binlog.Log(stream.Context(), st)
1679 }
khenaidoo5fc5cea2021-08-11 17:39:16 -04001680 }
Akash Kankanala761955c2024-02-21 19:32:20 +05301681 return t.WriteStatus(ss.s, statusOK)
khenaidoo5fc5cea2021-08-11 17:39:16 -04001682}
1683
1684func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
1685 sm := stream.Method()
1686 if sm != "" && sm[0] == '/' {
1687 sm = sm[1:]
1688 }
1689 pos := strings.LastIndex(sm, "/")
1690 if pos == -1 {
1691 if trInfo != nil {
1692 trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
1693 trInfo.tr.SetError()
1694 }
1695 errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
1696 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1697 if trInfo != nil {
1698 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1699 trInfo.tr.SetError()
1700 }
1701 channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
1702 }
1703 if trInfo != nil {
1704 trInfo.tr.Finish()
1705 }
1706 return
1707 }
1708 service := sm[:pos]
1709 method := sm[pos+1:]
1710
1711 srv, knownService := s.services[service]
1712 if knownService {
1713 if md, ok := srv.methods[method]; ok {
1714 s.processUnaryRPC(t, stream, srv, md, trInfo)
1715 return
1716 }
1717 if sd, ok := srv.streams[method]; ok {
1718 s.processStreamingRPC(t, stream, srv, sd, trInfo)
1719 return
1720 }
1721 }
1722 // Unknown service, or known server unknown method.
1723 if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
1724 s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
1725 return
1726 }
1727 var errDesc string
1728 if !knownService {
1729 errDesc = fmt.Sprintf("unknown service %v", service)
1730 } else {
1731 errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
1732 }
1733 if trInfo != nil {
1734 trInfo.tr.LazyPrintf("%s", errDesc)
1735 trInfo.tr.SetError()
1736 }
1737 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1738 if trInfo != nil {
1739 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1740 trInfo.tr.SetError()
1741 }
1742 channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
1743 }
1744 if trInfo != nil {
1745 trInfo.tr.Finish()
1746 }
1747}
1748
1749// The key to save ServerTransportStream in the context.
1750type streamKey struct{}
1751
1752// NewContextWithServerTransportStream creates a new context from ctx and
1753// attaches stream to it.
1754//
Joey Armstrongba3d9d12024-01-15 14:22:11 -05001755// # Experimental
khenaidoo5fc5cea2021-08-11 17:39:16 -04001756//
1757// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1758// later release.
1759func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
1760 return context.WithValue(ctx, streamKey{}, stream)
1761}
1762
1763// ServerTransportStream is a minimal interface that a transport stream must
1764// implement. This can be used to mock an actual transport stream for tests of
1765// handler code that use, for example, grpc.SetHeader (which requires some
1766// stream to be in context).
1767//
1768// See also NewContextWithServerTransportStream.
1769//
Joey Armstrongba3d9d12024-01-15 14:22:11 -05001770// # Experimental
khenaidoo5fc5cea2021-08-11 17:39:16 -04001771//
1772// Notice: This type is EXPERIMENTAL and may be changed or removed in a
1773// later release.
1774type ServerTransportStream interface {
1775 Method() string
1776 SetHeader(md metadata.MD) error
1777 SendHeader(md metadata.MD) error
1778 SetTrailer(md metadata.MD) error
1779}
1780
1781// ServerTransportStreamFromContext returns the ServerTransportStream saved in
1782// ctx. Returns nil if the given context has no stream associated with it
1783// (which implies it is not an RPC invocation context).
1784//
Joey Armstrongba3d9d12024-01-15 14:22:11 -05001785// # Experimental
khenaidoo5fc5cea2021-08-11 17:39:16 -04001786//
1787// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1788// later release.
1789func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
1790 s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
1791 return s
1792}
1793
1794// Stop stops the gRPC server. It immediately closes all open
1795// connections and listeners.
1796// It cancels all active RPCs on the server side and the corresponding
1797// pending RPCs on the client side will get notified by connection
1798// errors.
1799func (s *Server) Stop() {
1800 s.quit.Fire()
1801
1802 defer func() {
1803 s.serveWG.Wait()
1804 s.done.Fire()
1805 }()
1806
Akash Kankanala761955c2024-02-21 19:32:20 +05301807 s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) })
khenaidoo5fc5cea2021-08-11 17:39:16 -04001808
1809 s.mu.Lock()
1810 listeners := s.lis
1811 s.lis = nil
1812 conns := s.conns
1813 s.conns = nil
1814 // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
1815 s.cv.Broadcast()
1816 s.mu.Unlock()
1817
1818 for lis := range listeners {
1819 lis.Close()
1820 }
1821 for _, cs := range conns {
1822 for st := range cs {
Akash Kankanala761955c2024-02-21 19:32:20 +05301823 st.Close(errors.New("Server.Stop called"))
khenaidoo5fc5cea2021-08-11 17:39:16 -04001824 }
1825 }
1826 if s.opts.numServerWorkers > 0 {
1827 s.stopServerWorkers()
1828 }
1829
1830 s.mu.Lock()
1831 if s.events != nil {
1832 s.events.Finish()
1833 s.events = nil
1834 }
1835 s.mu.Unlock()
1836}
1837
1838// GracefulStop stops the gRPC server gracefully. It stops the server from
1839// accepting new connections and RPCs and blocks until all the pending RPCs are
1840// finished.
1841func (s *Server) GracefulStop() {
1842 s.quit.Fire()
1843 defer s.done.Fire()
1844
Akash Kankanala761955c2024-02-21 19:32:20 +05301845 s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) })
khenaidoo5fc5cea2021-08-11 17:39:16 -04001846 s.mu.Lock()
1847 if s.conns == nil {
1848 s.mu.Unlock()
1849 return
1850 }
1851
1852 for lis := range s.lis {
1853 lis.Close()
1854 }
1855 s.lis = nil
1856 if !s.drain {
1857 for _, conns := range s.conns {
1858 for st := range conns {
Akash Kankanala761955c2024-02-21 19:32:20 +05301859 st.Drain("graceful_stop")
khenaidoo5fc5cea2021-08-11 17:39:16 -04001860 }
1861 }
1862 s.drain = true
1863 }
1864
1865 // Wait for serving threads to be ready to exit. Only then can we be sure no
1866 // new conns will be created.
1867 s.mu.Unlock()
1868 s.serveWG.Wait()
1869 s.mu.Lock()
1870
1871 for len(s.conns) != 0 {
1872 s.cv.Wait()
1873 }
1874 s.conns = nil
1875 if s.events != nil {
1876 s.events.Finish()
1877 s.events = nil
1878 }
1879 s.mu.Unlock()
1880}
1881
1882// contentSubtype must be lowercase
1883// cannot return nil
1884func (s *Server) getCodec(contentSubtype string) baseCodec {
1885 if s.opts.codec != nil {
1886 return s.opts.codec
1887 }
1888 if contentSubtype == "" {
1889 return encoding.GetCodec(proto.Name)
1890 }
1891 codec := encoding.GetCodec(contentSubtype)
1892 if codec == nil {
1893 return encoding.GetCodec(proto.Name)
1894 }
1895 return codec
1896}
1897
Akash Kankanala761955c2024-02-21 19:32:20 +05301898// SetHeader sets the header metadata to be sent from the server to the client.
1899// The context provided must be the context passed to the server's handler.
1900//
1901// Streaming RPCs should prefer the SetHeader method of the ServerStream.
1902//
1903// When called multiple times, all the provided metadata will be merged. All
1904// the metadata will be sent out when one of the following happens:
1905//
1906// - grpc.SendHeader is called, or for streaming handlers, stream.SendHeader.
1907// - The first response message is sent. For unary handlers, this occurs when
1908// the handler returns; for streaming handlers, this can happen when stream's
1909// SendMsg method is called.
1910// - An RPC status is sent out (error or success). This occurs when the handler
1911// returns.
1912//
1913// SetHeader will fail if called after any of the events above.
1914//
1915// The error returned is compatible with the status package. However, the
1916// status code will often not match the RPC status as seen by the client
1917// application, and therefore, should not be relied upon for this purpose.
khenaidoo5fc5cea2021-08-11 17:39:16 -04001918func SetHeader(ctx context.Context, md metadata.MD) error {
1919 if md.Len() == 0 {
1920 return nil
1921 }
1922 stream := ServerTransportStreamFromContext(ctx)
1923 if stream == nil {
1924 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1925 }
1926 return stream.SetHeader(md)
1927}
1928
Akash Kankanala761955c2024-02-21 19:32:20 +05301929// SendHeader sends header metadata. It may be called at most once, and may not
1930// be called after any event that causes headers to be sent (see SetHeader for
1931// a complete list). The provided md and headers set by SetHeader() will be
1932// sent.
1933//
1934// The error returned is compatible with the status package. However, the
1935// status code will often not match the RPC status as seen by the client
1936// application, and therefore, should not be relied upon for this purpose.
khenaidoo5fc5cea2021-08-11 17:39:16 -04001937func SendHeader(ctx context.Context, md metadata.MD) error {
1938 stream := ServerTransportStreamFromContext(ctx)
1939 if stream == nil {
1940 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1941 }
1942 if err := stream.SendHeader(md); err != nil {
1943 return toRPCErr(err)
1944 }
1945 return nil
1946}
1947
Akash Kankanala761955c2024-02-21 19:32:20 +05301948// SetSendCompressor sets a compressor for outbound messages from the server.
1949// It must not be called after any event that causes headers to be sent
1950// (see ServerStream.SetHeader for the complete list). Provided compressor is
1951// used when below conditions are met:
1952//
1953// - compressor is registered via encoding.RegisterCompressor
1954// - compressor name must exist in the client advertised compressor names
1955// sent in grpc-accept-encoding header. Use ClientSupportedCompressors to
1956// get client supported compressor names.
1957//
1958// The context provided must be the context passed to the server's handler.
1959// It must be noted that compressor name encoding.Identity disables the
1960// outbound compression.
1961// By default, server messages will be sent using the same compressor with
1962// which request messages were sent.
1963//
1964// It is not safe to call SetSendCompressor concurrently with SendHeader and
1965// SendMsg.
1966//
1967// # Experimental
1968//
1969// Notice: This function is EXPERIMENTAL and may be changed or removed in a
1970// later release.
1971func SetSendCompressor(ctx context.Context, name string) error {
1972 stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream)
1973 if !ok || stream == nil {
1974 return fmt.Errorf("failed to fetch the stream from the given context")
1975 }
1976
1977 if err := validateSendCompressor(name, stream.ClientAdvertisedCompressors()); err != nil {
1978 return fmt.Errorf("unable to set send compressor: %w", err)
1979 }
1980
1981 return stream.SetSendCompress(name)
1982}
1983
1984// ClientSupportedCompressors returns compressor names advertised by the client
1985// via grpc-accept-encoding header.
1986//
1987// The context provided must be the context passed to the server's handler.
1988//
1989// # Experimental
1990//
1991// Notice: This function is EXPERIMENTAL and may be changed or removed in a
1992// later release.
1993func ClientSupportedCompressors(ctx context.Context) ([]string, error) {
1994 stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream)
1995 if !ok || stream == nil {
1996 return nil, fmt.Errorf("failed to fetch the stream from the given context %v", ctx)
1997 }
1998
1999 return strings.Split(stream.ClientAdvertisedCompressors(), ","), nil
2000}
2001
khenaidoo5fc5cea2021-08-11 17:39:16 -04002002// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
2003// When called more than once, all the provided metadata will be merged.
Akash Kankanala761955c2024-02-21 19:32:20 +05302004//
2005// The error returned is compatible with the status package. However, the
2006// status code will often not match the RPC status as seen by the client
2007// application, and therefore, should not be relied upon for this purpose.
khenaidoo5fc5cea2021-08-11 17:39:16 -04002008func SetTrailer(ctx context.Context, md metadata.MD) error {
2009 if md.Len() == 0 {
2010 return nil
2011 }
2012 stream := ServerTransportStreamFromContext(ctx)
2013 if stream == nil {
2014 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
2015 }
2016 return stream.SetTrailer(md)
2017}
2018
2019// Method returns the method string for the server context. The returned
2020// string is in the format of "/service/method".
2021func Method(ctx context.Context) (string, bool) {
2022 s := ServerTransportStreamFromContext(ctx)
2023 if s == nil {
2024 return "", false
2025 }
2026 return s.Method(), true
2027}
2028
2029type channelzServer struct {
2030 s *Server
2031}
2032
2033func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
2034 return c.s.channelzMetric()
2035}
Akash Kankanala761955c2024-02-21 19:32:20 +05302036
2037// validateSendCompressor returns an error when given compressor name cannot be
2038// handled by the server or the client based on the advertised compressors.
2039func validateSendCompressor(name, clientCompressors string) error {
2040 if name == encoding.Identity {
2041 return nil
2042 }
2043
2044 if !grpcutil.IsCompressorNameRegistered(name) {
2045 return fmt.Errorf("compressor not registered %q", name)
2046 }
2047
2048 for _, c := range strings.Split(clientCompressors, ",") {
2049 if c == name {
2050 return nil // found match
2051 }
2052 }
2053 return fmt.Errorf("client does not support compressor %q", name)
2054}