blob: eca5ff6f3b9882a840779c865982121fbac85f47 [file] [log] [blame]
mpagenkoaf801632020-07-03 10:00:42 +00001// Copyright (c) 2017 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package rpcmetrics
16
17import (
18 "strconv"
19 "sync"
20 "time"
21
22 "github.com/opentracing/opentracing-go"
23 "github.com/opentracing/opentracing-go/ext"
24 "github.com/uber/jaeger-lib/metrics"
25
26 jaeger "github.com/uber/jaeger-client-go"
27)
28
29const defaultMaxNumberOfEndpoints = 200
30
31// Observer is an observer that can emit RPC metrics.
32type Observer struct {
33 metricsByEndpoint *MetricsByEndpoint
34}
35
36// NewObserver creates a new observer that can emit RPC metrics.
37func NewObserver(metricsFactory metrics.Factory, normalizer NameNormalizer) *Observer {
38 return &Observer{
39 metricsByEndpoint: newMetricsByEndpoint(
40 metricsFactory,
41 normalizer,
42 defaultMaxNumberOfEndpoints,
43 ),
44 }
45}
46
47// OnStartSpan creates a new Observer for the span.
48func (o *Observer) OnStartSpan(
49 operationName string,
50 options opentracing.StartSpanOptions,
51) jaeger.SpanObserver {
52 return NewSpanObserver(o.metricsByEndpoint, operationName, options)
53}
54
55// SpanKind identifies the span as inboud, outbound, or internal
56type SpanKind int
57
58const (
59 // Local span kind
60 Local SpanKind = iota
61 // Inbound span kind
62 Inbound
63 // Outbound span kind
64 Outbound
65)
66
67// SpanObserver collects RPC metrics
68type SpanObserver struct {
69 metricsByEndpoint *MetricsByEndpoint
70 operationName string
71 startTime time.Time
72 mux sync.Mutex
73 kind SpanKind
74 httpStatusCode uint16
75 err bool
76}
77
78// NewSpanObserver creates a new SpanObserver that can emit RPC metrics.
79func NewSpanObserver(
80 metricsByEndpoint *MetricsByEndpoint,
81 operationName string,
82 options opentracing.StartSpanOptions,
83) *SpanObserver {
84 so := &SpanObserver{
85 metricsByEndpoint: metricsByEndpoint,
86 operationName: operationName,
87 startTime: options.StartTime,
88 }
89 for k, v := range options.Tags {
90 so.handleTagInLock(k, v)
91 }
92 return so
93}
94
95// handleTags watches for special tags
96// - SpanKind
97// - HttpStatusCode
98// - Error
99func (so *SpanObserver) handleTagInLock(key string, value interface{}) {
100 if key == string(ext.SpanKind) {
101 if v, ok := value.(ext.SpanKindEnum); ok {
102 value = string(v)
103 }
104 if v, ok := value.(string); ok {
105 if v == string(ext.SpanKindRPCClientEnum) {
106 so.kind = Outbound
107 } else if v == string(ext.SpanKindRPCServerEnum) {
108 so.kind = Inbound
109 }
110 }
111 return
112 }
113 if key == string(ext.HTTPStatusCode) {
114 if v, ok := value.(uint16); ok {
115 so.httpStatusCode = v
116 } else if v, ok := value.(int); ok {
117 so.httpStatusCode = uint16(v)
118 } else if v, ok := value.(string); ok {
119 if vv, err := strconv.Atoi(v); err == nil {
120 so.httpStatusCode = uint16(vv)
121 }
122 }
123 return
124 }
125 if key == string(ext.Error) {
126 if v, ok := value.(bool); ok {
127 so.err = v
128 } else if v, ok := value.(string); ok {
129 if vv, err := strconv.ParseBool(v); err == nil {
130 so.err = vv
131 }
132 }
133 return
134 }
135}
136
137// OnFinish emits the RPC metrics. It only has an effect when operation name
138// is not blank, and the span kind is an RPC server.
139func (so *SpanObserver) OnFinish(options opentracing.FinishOptions) {
140 so.mux.Lock()
141 defer so.mux.Unlock()
142
143 if so.operationName == "" || so.kind != Inbound {
144 return
145 }
146
147 mets := so.metricsByEndpoint.get(so.operationName)
148 latency := options.FinishTime.Sub(so.startTime)
149 if so.err {
150 mets.RequestCountFailures.Inc(1)
151 mets.RequestLatencyFailures.Record(latency)
152 } else {
153 mets.RequestCountSuccess.Inc(1)
154 mets.RequestLatencySuccess.Record(latency)
155 }
156 mets.recordHTTPStatusCode(so.httpStatusCode)
157}
158
159// OnSetOperationName records new operation name.
160func (so *SpanObserver) OnSetOperationName(operationName string) {
161 so.mux.Lock()
162 so.operationName = operationName
163 so.mux.Unlock()
164}
165
166// OnSetTag implements SpanObserver
167func (so *SpanObserver) OnSetTag(key string, value interface{}) {
168 so.mux.Lock()
169 so.handleTagInLock(key, value)
170 so.mux.Unlock()
171}