khenaidoo | 59ce9dd | 2019-11-11 13:05:32 -0500 | [diff] [blame] | 1 | package grpc_prometheus |
| 2 | |
| 3 | import ( |
| 4 | "io" |
| 5 | |
| 6 | prom "github.com/prometheus/client_golang/prometheus" |
| 7 | "golang.org/x/net/context" |
| 8 | "google.golang.org/grpc" |
| 9 | "google.golang.org/grpc/codes" |
| 10 | "google.golang.org/grpc/status" |
| 11 | ) |
| 12 | |
| 13 | // ClientMetrics represents a collection of metrics to be registered on a |
| 14 | // Prometheus metrics registry for a gRPC client. |
| 15 | type ClientMetrics struct { |
| 16 | clientStartedCounter *prom.CounterVec |
| 17 | clientHandledCounter *prom.CounterVec |
| 18 | clientStreamMsgReceived *prom.CounterVec |
| 19 | clientStreamMsgSent *prom.CounterVec |
| 20 | clientHandledHistogramEnabled bool |
| 21 | clientHandledHistogramOpts prom.HistogramOpts |
| 22 | clientHandledHistogram *prom.HistogramVec |
| 23 | } |
| 24 | |
| 25 | // NewClientMetrics returns a ClientMetrics object. Use a new instance of |
| 26 | // ClientMetrics when not using the default Prometheus metrics registry, for |
| 27 | // example when wanting to control which metrics are added to a registry as |
| 28 | // opposed to automatically adding metrics via init functions. |
| 29 | func NewClientMetrics(counterOpts ...CounterOption) *ClientMetrics { |
| 30 | opts := counterOptions(counterOpts) |
| 31 | return &ClientMetrics{ |
| 32 | clientStartedCounter: prom.NewCounterVec( |
| 33 | opts.apply(prom.CounterOpts{ |
| 34 | Name: "grpc_client_started_total", |
| 35 | Help: "Total number of RPCs started on the client.", |
| 36 | }), []string{"grpc_type", "grpc_service", "grpc_method"}), |
| 37 | |
| 38 | clientHandledCounter: prom.NewCounterVec( |
| 39 | opts.apply(prom.CounterOpts{ |
| 40 | Name: "grpc_client_handled_total", |
| 41 | Help: "Total number of RPCs completed by the client, regardless of success or failure.", |
| 42 | }), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}), |
| 43 | |
| 44 | clientStreamMsgReceived: prom.NewCounterVec( |
| 45 | opts.apply(prom.CounterOpts{ |
| 46 | Name: "grpc_client_msg_received_total", |
| 47 | Help: "Total number of RPC stream messages received by the client.", |
| 48 | }), []string{"grpc_type", "grpc_service", "grpc_method"}), |
| 49 | |
| 50 | clientStreamMsgSent: prom.NewCounterVec( |
| 51 | opts.apply(prom.CounterOpts{ |
| 52 | Name: "grpc_client_msg_sent_total", |
| 53 | Help: "Total number of gRPC stream messages sent by the client.", |
| 54 | }), []string{"grpc_type", "grpc_service", "grpc_method"}), |
| 55 | |
| 56 | clientHandledHistogramEnabled: false, |
| 57 | clientHandledHistogramOpts: prom.HistogramOpts{ |
| 58 | Name: "grpc_client_handling_seconds", |
| 59 | Help: "Histogram of response latency (seconds) of the gRPC until it is finished by the application.", |
| 60 | Buckets: prom.DefBuckets, |
| 61 | }, |
| 62 | clientHandledHistogram: nil, |
| 63 | } |
| 64 | } |
| 65 | |
| 66 | // Describe sends the super-set of all possible descriptors of metrics |
| 67 | // collected by this Collector to the provided channel and returns once |
| 68 | // the last descriptor has been sent. |
| 69 | func (m *ClientMetrics) Describe(ch chan<- *prom.Desc) { |
| 70 | m.clientStartedCounter.Describe(ch) |
| 71 | m.clientHandledCounter.Describe(ch) |
| 72 | m.clientStreamMsgReceived.Describe(ch) |
| 73 | m.clientStreamMsgSent.Describe(ch) |
| 74 | if m.clientHandledHistogramEnabled { |
| 75 | m.clientHandledHistogram.Describe(ch) |
| 76 | } |
| 77 | } |
| 78 | |
| 79 | // Collect is called by the Prometheus registry when collecting |
| 80 | // metrics. The implementation sends each collected metric via the |
| 81 | // provided channel and returns once the last metric has been sent. |
| 82 | func (m *ClientMetrics) Collect(ch chan<- prom.Metric) { |
| 83 | m.clientStartedCounter.Collect(ch) |
| 84 | m.clientHandledCounter.Collect(ch) |
| 85 | m.clientStreamMsgReceived.Collect(ch) |
| 86 | m.clientStreamMsgSent.Collect(ch) |
| 87 | if m.clientHandledHistogramEnabled { |
| 88 | m.clientHandledHistogram.Collect(ch) |
| 89 | } |
| 90 | } |
| 91 | |
| 92 | // EnableClientHandlingTimeHistogram turns on recording of handling time of RPCs. |
| 93 | // Histogram metrics can be very expensive for Prometheus to retain and query. |
| 94 | func (m *ClientMetrics) EnableClientHandlingTimeHistogram(opts ...HistogramOption) { |
| 95 | for _, o := range opts { |
| 96 | o(&m.clientHandledHistogramOpts) |
| 97 | } |
| 98 | if !m.clientHandledHistogramEnabled { |
| 99 | m.clientHandledHistogram = prom.NewHistogramVec( |
| 100 | m.clientHandledHistogramOpts, |
| 101 | []string{"grpc_type", "grpc_service", "grpc_method"}, |
| 102 | ) |
| 103 | } |
| 104 | m.clientHandledHistogramEnabled = true |
| 105 | } |
| 106 | |
| 107 | // UnaryClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Unary RPCs. |
| 108 | func (m *ClientMetrics) UnaryClientInterceptor() func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { |
| 109 | return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { |
| 110 | monitor := newClientReporter(m, Unary, method) |
| 111 | monitor.SentMessage() |
| 112 | err := invoker(ctx, method, req, reply, cc, opts...) |
| 113 | if err != nil { |
| 114 | monitor.ReceivedMessage() |
| 115 | } |
| 116 | st, _ := status.FromError(err) |
| 117 | monitor.Handled(st.Code()) |
| 118 | return err |
| 119 | } |
| 120 | } |
| 121 | |
| 122 | // StreamClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs. |
| 123 | func (m *ClientMetrics) StreamClientInterceptor() func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { |
| 124 | return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { |
| 125 | monitor := newClientReporter(m, clientStreamType(desc), method) |
| 126 | clientStream, err := streamer(ctx, desc, cc, method, opts...) |
| 127 | if err != nil { |
| 128 | st, _ := status.FromError(err) |
| 129 | monitor.Handled(st.Code()) |
| 130 | return nil, err |
| 131 | } |
| 132 | return &monitoredClientStream{clientStream, monitor}, nil |
| 133 | } |
| 134 | } |
| 135 | |
| 136 | func clientStreamType(desc *grpc.StreamDesc) grpcType { |
| 137 | if desc.ClientStreams && !desc.ServerStreams { |
| 138 | return ClientStream |
| 139 | } else if !desc.ClientStreams && desc.ServerStreams { |
| 140 | return ServerStream |
| 141 | } |
| 142 | return BidiStream |
| 143 | } |
| 144 | |
| 145 | // monitoredClientStream wraps grpc.ClientStream allowing each Sent/Recv of message to increment counters. |
| 146 | type monitoredClientStream struct { |
| 147 | grpc.ClientStream |
| 148 | monitor *clientReporter |
| 149 | } |
| 150 | |
| 151 | func (s *monitoredClientStream) SendMsg(m interface{}) error { |
| 152 | err := s.ClientStream.SendMsg(m) |
| 153 | if err == nil { |
| 154 | s.monitor.SentMessage() |
| 155 | } |
| 156 | return err |
| 157 | } |
| 158 | |
| 159 | func (s *monitoredClientStream) RecvMsg(m interface{}) error { |
| 160 | err := s.ClientStream.RecvMsg(m) |
| 161 | if err == nil { |
| 162 | s.monitor.ReceivedMessage() |
| 163 | } else if err == io.EOF { |
| 164 | s.monitor.Handled(codes.OK) |
| 165 | } else { |
| 166 | st, _ := status.FromError(err) |
| 167 | s.monitor.Handled(st.Code()) |
| 168 | } |
| 169 | return err |
| 170 | } |