blob: b45f4c94490a150b7b80119073ea0baf8208c124 [file] [log] [blame]
Joey Armstronge8c091f2023-01-17 16:56:26 -05001// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package internal
16
17import (
18 "context"
19 "sync"
20 "sync/atomic"
21 "unsafe"
22
23 "go.opentelemetry.io/otel/api/metric"
24 "go.opentelemetry.io/otel/api/metric/registry"
25 "go.opentelemetry.io/otel/label"
26)
27
28// This file contains the forwarding implementation of MeterProvider used as
29// the default global instance. Metric events using instruments provided by
30// this implementation are no-ops until the first Meter implementation is set
31// as the global provider.
32//
33// The implementation here uses Mutexes to maintain a list of active Meters in
34// the MeterProvider and Instruments in each Meter, under the assumption that
35// these interfaces are not performance-critical.
36//
37// We have the invariant that setDelegate() will be called before a new
38// MeterProvider implementation is registered as the global provider. Mutexes
39// in the MeterProvider and Meters ensure that each instrument has a delegate
40// before the global provider is set.
41//
42// Bound instrument operations are implemented by delegating to the
43// instrument after it is registered, with a sync.Once initializer to
44// protect against races with Release().
45//
46// Metric uniqueness checking is implemented by calling the exported
47// methods of the api/metric/registry package.
48
49type meterKey struct {
50 Name, Version string
51}
52
53type meterProvider struct {
54 delegate metric.MeterProvider
55
56 // lock protects `delegate` and `meters`.
57 lock sync.Mutex
58
59 // meters maintains a unique entry for every named Meter
60 // that has been registered through the global instance.
61 meters map[meterKey]*meterEntry
62}
63
64type meterImpl struct {
65 delegate unsafe.Pointer // (*metric.MeterImpl)
66
67 lock sync.Mutex
68 syncInsts []*syncImpl
69 asyncInsts []*asyncImpl
70}
71
72type meterEntry struct {
73 unique metric.MeterImpl
74 impl meterImpl
75}
76
77type instrument struct {
78 descriptor metric.Descriptor
79}
80
81type syncImpl struct {
82 delegate unsafe.Pointer // (*metric.SyncImpl)
83
84 instrument
85}
86
87type asyncImpl struct {
88 delegate unsafe.Pointer // (*metric.AsyncImpl)
89
90 instrument
91
92 runner metric.AsyncRunner
93}
94
95// SyncImpler is implemented by all of the sync metric
96// instruments.
97type SyncImpler interface {
98 SyncImpl() metric.SyncImpl
99}
100
101// AsyncImpler is implemented by all of the async
102// metric instruments.
103type AsyncImpler interface {
104 AsyncImpl() metric.AsyncImpl
105}
106
107type syncHandle struct {
108 delegate unsafe.Pointer // (*metric.HandleImpl)
109
110 inst *syncImpl
111 labels []label.KeyValue
112
113 initialize sync.Once
114}
115
116var _ metric.MeterProvider = &meterProvider{}
117var _ metric.MeterImpl = &meterImpl{}
118var _ metric.InstrumentImpl = &syncImpl{}
119var _ metric.BoundSyncImpl = &syncHandle{}
120var _ metric.AsyncImpl = &asyncImpl{}
121
122func (inst *instrument) Descriptor() metric.Descriptor {
123 return inst.descriptor
124}
125
126// MeterProvider interface and delegation
127
128func newMeterProvider() *meterProvider {
129 return &meterProvider{
130 meters: map[meterKey]*meterEntry{},
131 }
132}
133
134func (p *meterProvider) setDelegate(provider metric.MeterProvider) {
135 p.lock.Lock()
136 defer p.lock.Unlock()
137
138 p.delegate = provider
139 for key, entry := range p.meters {
140 entry.impl.setDelegate(key.Name, key.Version, provider)
141 }
142 p.meters = nil
143}
144
145func (p *meterProvider) Meter(instrumentationName string, opts ...metric.MeterOption) metric.Meter {
146 p.lock.Lock()
147 defer p.lock.Unlock()
148
149 if p.delegate != nil {
150 return p.delegate.Meter(instrumentationName, opts...)
151 }
152
153 key := meterKey{
154 Name: instrumentationName,
155 Version: metric.NewMeterConfig(opts...).InstrumentationVersion,
156 }
157 entry, ok := p.meters[key]
158 if !ok {
159 entry = &meterEntry{}
160 entry.unique = registry.NewUniqueInstrumentMeterImpl(&entry.impl)
161 p.meters[key] = entry
162
163 }
164 return metric.WrapMeterImpl(entry.unique, key.Name, metric.WithInstrumentationVersion(key.Version))
165}
166
167// Meter interface and delegation
168
169func (m *meterImpl) setDelegate(name, version string, provider metric.MeterProvider) {
170 m.lock.Lock()
171 defer m.lock.Unlock()
172
173 d := new(metric.MeterImpl)
174 *d = provider.Meter(name, metric.WithInstrumentationVersion(version)).MeterImpl()
175 m.delegate = unsafe.Pointer(d)
176
177 for _, inst := range m.syncInsts {
178 inst.setDelegate(*d)
179 }
180 m.syncInsts = nil
181 for _, obs := range m.asyncInsts {
182 obs.setDelegate(*d)
183 }
184 m.asyncInsts = nil
185}
186
187func (m *meterImpl) NewSyncInstrument(desc metric.Descriptor) (metric.SyncImpl, error) {
188 m.lock.Lock()
189 defer m.lock.Unlock()
190
191 if meterPtr := (*metric.MeterImpl)(atomic.LoadPointer(&m.delegate)); meterPtr != nil {
192 return (*meterPtr).NewSyncInstrument(desc)
193 }
194
195 inst := &syncImpl{
196 instrument: instrument{
197 descriptor: desc,
198 },
199 }
200 m.syncInsts = append(m.syncInsts, inst)
201 return inst, nil
202}
203
204// Synchronous delegation
205
206func (inst *syncImpl) setDelegate(d metric.MeterImpl) {
207 implPtr := new(metric.SyncImpl)
208
209 var err error
210 *implPtr, err = d.NewSyncInstrument(inst.descriptor)
211
212 if err != nil {
213 // TODO: There is no standard way to deliver this error to the user.
214 // See https://github.com/open-telemetry/opentelemetry-go/issues/514
215 // Note that the default SDK will not generate any errors yet, this is
216 // only for added safety.
217 panic(err)
218 }
219
220 atomic.StorePointer(&inst.delegate, unsafe.Pointer(implPtr))
221}
222
223func (inst *syncImpl) Implementation() interface{} {
224 if implPtr := (*metric.SyncImpl)(atomic.LoadPointer(&inst.delegate)); implPtr != nil {
225 return (*implPtr).Implementation()
226 }
227 return inst
228}
229
230func (inst *syncImpl) Bind(labels []label.KeyValue) metric.BoundSyncImpl {
231 if implPtr := (*metric.SyncImpl)(atomic.LoadPointer(&inst.delegate)); implPtr != nil {
232 return (*implPtr).Bind(labels)
233 }
234 return &syncHandle{
235 inst: inst,
236 labels: labels,
237 }
238}
239
240func (bound *syncHandle) Unbind() {
241 bound.initialize.Do(func() {})
242
243 implPtr := (*metric.BoundSyncImpl)(atomic.LoadPointer(&bound.delegate))
244
245 if implPtr == nil {
246 return
247 }
248
249 (*implPtr).Unbind()
250}
251
252// Async delegation
253
254func (m *meterImpl) NewAsyncInstrument(
255 desc metric.Descriptor,
256 runner metric.AsyncRunner,
257) (metric.AsyncImpl, error) {
258
259 m.lock.Lock()
260 defer m.lock.Unlock()
261
262 if meterPtr := (*metric.MeterImpl)(atomic.LoadPointer(&m.delegate)); meterPtr != nil {
263 return (*meterPtr).NewAsyncInstrument(desc, runner)
264 }
265
266 inst := &asyncImpl{
267 instrument: instrument{
268 descriptor: desc,
269 },
270 runner: runner,
271 }
272 m.asyncInsts = append(m.asyncInsts, inst)
273 return inst, nil
274}
275
276func (obs *asyncImpl) Implementation() interface{} {
277 if implPtr := (*metric.AsyncImpl)(atomic.LoadPointer(&obs.delegate)); implPtr != nil {
278 return (*implPtr).Implementation()
279 }
280 return obs
281}
282
283func (obs *asyncImpl) setDelegate(d metric.MeterImpl) {
284 implPtr := new(metric.AsyncImpl)
285
286 var err error
287 *implPtr, err = d.NewAsyncInstrument(obs.descriptor, obs.runner)
288
289 if err != nil {
290 // TODO: There is no standard way to deliver this error to the user.
291 // See https://github.com/open-telemetry/opentelemetry-go/issues/514
292 // Note that the default SDK will not generate any errors yet, this is
293 // only for added safety.
294 panic(err)
295 }
296
297 atomic.StorePointer(&obs.delegate, unsafe.Pointer(implPtr))
298}
299
300// Metric updates
301
302func (m *meterImpl) RecordBatch(ctx context.Context, labels []label.KeyValue, measurements ...metric.Measurement) {
303 if delegatePtr := (*metric.MeterImpl)(atomic.LoadPointer(&m.delegate)); delegatePtr != nil {
304 (*delegatePtr).RecordBatch(ctx, labels, measurements...)
305 }
306}
307
308func (inst *syncImpl) RecordOne(ctx context.Context, number metric.Number, labels []label.KeyValue) {
309 if instPtr := (*metric.SyncImpl)(atomic.LoadPointer(&inst.delegate)); instPtr != nil {
310 (*instPtr).RecordOne(ctx, number, labels)
311 }
312}
313
314// Bound instrument initialization
315
316func (bound *syncHandle) RecordOne(ctx context.Context, number metric.Number) {
317 instPtr := (*metric.SyncImpl)(atomic.LoadPointer(&bound.inst.delegate))
318 if instPtr == nil {
319 return
320 }
321 var implPtr *metric.BoundSyncImpl
322 bound.initialize.Do(func() {
323 implPtr = new(metric.BoundSyncImpl)
324 *implPtr = (*instPtr).Bind(bound.labels)
325 atomic.StorePointer(&bound.delegate, unsafe.Pointer(implPtr))
326 })
327 if implPtr == nil {
328 implPtr = (*metric.BoundSyncImpl)(atomic.LoadPointer(&bound.delegate))
329 }
330 // This may still be nil if instrument was created and bound
331 // without a delegate, then the instrument was set to have a
332 // delegate and unbound.
333 if implPtr == nil {
334 return
335 }
336 (*implPtr).RecordOne(ctx, number)
337}
338
339func AtomicFieldOffsets() map[string]uintptr {
340 return map[string]uintptr{
341 "meterProvider.delegate": unsafe.Offsetof(meterProvider{}.delegate),
342 "meterImpl.delegate": unsafe.Offsetof(meterImpl{}.delegate),
343 "syncImpl.delegate": unsafe.Offsetof(syncImpl{}.delegate),
344 "asyncImpl.delegate": unsafe.Offsetof(asyncImpl{}.delegate),
345 "syncHandle.delegate": unsafe.Offsetof(syncHandle{}.delegate),
346 }
347}