blob: 43161868b9b9097f601bf3c50fe5288a85573199 [file] [log] [blame]
Girish Gowdrae09a6202021-01-12 18:10:59 -08001/*
2 * Copyright 2021-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//Package adaptercoreonu provides the utility for onu devices, flows and statistics
18package adaptercoreonu
19
20import (
21 "context"
22 "errors"
23 "fmt"
24 "github.com/opencord/omci-lib-go"
25 me "github.com/opencord/omci-lib-go/generated"
26 "github.com/opencord/voltha-lib-go/v4/pkg/log"
27 "github.com/opencord/voltha-protos/v4/go/voltha"
Girish Gowdra5a7c4922021-01-22 18:33:41 -080028 "sync"
Girish Gowdrae09a6202021-01-12 18:10:59 -080029 "time"
30)
31
Girish Gowdra5a7c4922021-01-22 18:33:41 -080032// general constants used for overall Metric Collection management
33const (
34 DefaultMetricCollectionFrequency = 15 * 60 // unit in seconds. This setting can be changed from voltha NBI PmConfig configuration
35 GroupMetricEnabled = true // This is READONLY and cannot be changed from VOLTHA NBI
36 DefaultFrequencyOverrideEnabled = true // This is READONLY and cannot be changed from VOLTHA NBI
37 FrequencyGranularity = 5 // The frequency (in seconds) has to be multiple of 5. This setting cannot changed later.
38)
39
40// OpticalPowerGroupMetrics are supported optical pm names
41var OpticalPowerGroupMetrics = map[string]voltha.PmConfig_PmType{
42 "ani_g_instance_id": voltha.PmConfig_CONTEXT,
43 "transmit_power": voltha.PmConfig_GAUGE,
44 "receive_power": voltha.PmConfig_GAUGE,
45}
46
47// OpticalPowerGroupMetrics specific constants
48const (
49 OpticalPowerGroupMetricName = "OpticalPower"
50 OpticalPowerGroupMetricEnabled = true // This setting can be changed from voltha NBI PmConfig configuration
51 OpticalPowerMetricGroupCollectionFrequency = 5 * 60 // unit in seconds. This setting can be changed from voltha NBI PmConfig configuration
52)
53
54// UniStatusGroupMetrics are supported UNI status names
55var UniStatusGroupMetrics = map[string]voltha.PmConfig_PmType{
56 "uni_port_no": voltha.PmConfig_CONTEXT,
57 "ethernet_type": voltha.PmConfig_GAUGE,
58 "oper_status": voltha.PmConfig_GAUGE,
59 "uni_admin_state": voltha.PmConfig_GAUGE,
60}
61
62// UniStatusGroupMetrics specific constants
63const (
64 UniStatusGroupMetricName = "UniStatus"
65 UniStatusGroupMetricEnabled = true // This setting can be changed from voltha NBI PmConfig configuration
66 UniStatusMetricGroupCollectionFrequency = 5 * 60 // unit in seconds. This setting can be changed from voltha NBI PmConfig configuration
67)
68
69type groupMetric struct {
70 groupName string
71 enabled bool
72 frequency uint32 // valid only if FrequencyOverride is enabled.
73 metricMap map[string]voltha.PmConfig_PmType
74 nextCollectionInterval time.Time // valid only if FrequencyOverride is enabled.
75}
76
77type standaloneMetric struct {
78 metricName string
79 enabled bool
80 frequency uint32 // valid only if FrequencyOverride is enabled.
81 nextCollectionInterval time.Time // valid only if FrequencyOverride is enabled.
82}
83
Girish Gowdrae09a6202021-01-12 18:10:59 -080084type onuMetricsManager struct {
85 pDeviceHandler *deviceHandler
86
87 commMetricsChan chan Message
88 opticalMetricsChan chan me.AttributeValueMap
89 uniStatusMetricsChan chan me.AttributeValueMap
90
Girish Gowdra5a7c4922021-01-22 18:33:41 -080091 groupMetricMap map[string]*groupMetric
92 standaloneMetricMap map[string]*standaloneMetric
93
Girish Gowdrae09a6202021-01-12 18:10:59 -080094 stopProcessingOmciResponses chan bool
Girish Gowdra5a7c4922021-01-22 18:33:41 -080095
96 nextGlobalMetricCollectionTime time.Time // valid only if pmConfig.FreqOverride is set to false.
97
98 onuMetricsManagerLock sync.RWMutex
Girish Gowdrae09a6202021-01-12 18:10:59 -080099}
100
101// newonuMetricsManager returns a new instance of the newonuMetricsManager
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800102// Note that none of the context stored internally in onuMetricsManager is backed up on KV store for resiliency.
103// Metric collection is not a critical operation that needs support for resiliency. On adapter restart, some context
104// could be lost (except for Device.PmConfigs which is backed up the rw-core on KV store). An example of information
105// that is lost on adapter restart is nextCollectionInterval time.
Girish Gowdrae09a6202021-01-12 18:10:59 -0800106func newonuMetricsManager(ctx context.Context, dh *deviceHandler) *onuMetricsManager {
107
108 var metricsManager onuMetricsManager
109 logger.Debugw(ctx, "init-onuMetricsManager", log.Fields{"device-id": dh.deviceID})
110 metricsManager.pDeviceHandler = dh
111
112 metricsManager.commMetricsChan = make(chan Message)
113 metricsManager.opticalMetricsChan = make(chan me.AttributeValueMap)
114 metricsManager.uniStatusMetricsChan = make(chan me.AttributeValueMap)
115 metricsManager.stopProcessingOmciResponses = make(chan bool)
116
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800117 metricsManager.groupMetricMap = make(map[string]*groupMetric)
118 metricsManager.standaloneMetricMap = make(map[string]*standaloneMetric)
119
120 if dh.pmConfigs == nil { // dh.pmConfigs is NOT nil if adapter comes back from a restart. We should NOT go back to defaults in this case
121 dh.pmConfigs = &voltha.PmConfigs{}
122 dh.pmConfigs.Id = dh.deviceID
123 dh.pmConfigs.DefaultFreq = DefaultMetricCollectionFrequency
124 dh.pmConfigs.Grouped = GroupMetricEnabled
125 dh.pmConfigs.FreqOverride = DefaultFrequencyOverrideEnabled
126
127 // Populate group metrics.
128 // Lets populate irrespective of GroupMetricEnabled is true or not.
129 // The group metrics collection will decided on this flag later
130
131 // Populate optical power group metrics
132 var opPmConfigSlice []*voltha.PmConfig
133 for k, v := range OpticalPowerGroupMetrics {
134 opPmConfigSlice = append(opPmConfigSlice, &voltha.PmConfig{Name: k, Type: v})
135 }
136 opticalPowerGroupMetric := voltha.PmGroupConfig{
137 GroupName: OpticalPowerGroupMetricName,
138 Enabled: OpticalPowerGroupMetricEnabled,
139 GroupFreq: OpticalPowerMetricGroupCollectionFrequency,
140 Metrics: opPmConfigSlice,
141 }
142 dh.pmConfigs.Groups = append(dh.pmConfigs.Groups, &opticalPowerGroupMetric)
143
144 // Populate uni status group metrics
145 var uniStPmConfigSlice []*voltha.PmConfig
146 for k, v := range UniStatusGroupMetrics {
147 uniStPmConfigSlice = append(uniStPmConfigSlice, &voltha.PmConfig{Name: k, Type: v})
148 }
149 uniStatusGroupMetric := voltha.PmGroupConfig{
150 GroupName: UniStatusGroupMetricName,
151 Enabled: UniStatusGroupMetricEnabled,
152 GroupFreq: UniStatusMetricGroupCollectionFrequency,
153 Metrics: uniStPmConfigSlice,
154 }
155 dh.pmConfigs.Groups = append(dh.pmConfigs.Groups, &uniStatusGroupMetric)
156
157 // Add standalone metric (if present) after this (will be added to dh.pmConfigs.Metrics)
158 }
159
160 // Populate local group metric structures
161 for _, g := range dh.pmConfigs.Groups {
162 metricsManager.groupMetricMap[g.GroupName] = &groupMetric{
163 groupName: g.GroupName,
164 enabled: g.Enabled,
165 frequency: g.GroupFreq,
166 }
167 switch g.GroupName {
168 case OpticalPowerGroupMetricName:
169 metricsManager.groupMetricMap[g.GroupName].metricMap = OpticalPowerGroupMetrics
170 case UniStatusGroupMetricName:
171 metricsManager.groupMetricMap[g.GroupName].metricMap = UniStatusGroupMetrics
172
173 default:
174 logger.Errorw(ctx, "unhandled-group-name", log.Fields{"groupName": g.GroupName})
175 }
176 }
177
178 // Populate local standalone metric structures
179 for _, m := range dh.pmConfigs.Metrics {
180 metricsManager.standaloneMetricMap[m.Name] = &standaloneMetric{
181 metricName: m.Name,
182 enabled: m.Enabled,
183 frequency: m.SampleFreq,
184 }
185 switch m.Name {
186 // None exist as of now. Add when available.
187 default:
188 logger.Errorw(ctx, "unhandled-metric-name", log.Fields{"metricName": m.Name})
189 }
190 }
191
192 // initialize the next metric collection intervals.
193 metricsManager.initializeMetricCollectionTime(ctx)
194 logger.Info(ctx, "init-onuMetricsManager completed", log.Fields{"device-id": dh.deviceID})
Girish Gowdrae09a6202021-01-12 18:10:59 -0800195 return &metricsManager
196}
197
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800198func (mm *onuMetricsManager) initializeMetricCollectionTime(ctx context.Context) {
199 if mm.pDeviceHandler.pmConfigs.FreqOverride {
200 // If mm.pDeviceHandler.pmConfigs.FreqOverride is set to true, then group/standalone metric specific interval applies
201 mm.onuMetricsManagerLock.Lock()
202 defer mm.onuMetricsManagerLock.Unlock()
203 for _, v := range mm.groupMetricMap {
204 if v.enabled {
205 v.nextCollectionInterval = time.Now().Add(time.Duration(v.frequency) * time.Second)
206 }
207 }
208
209 for _, v := range mm.standaloneMetricMap {
210 if v.enabled {
211 v.nextCollectionInterval = time.Now().Add(time.Duration(v.frequency) * time.Second)
212 }
213 }
214 } else {
215 // If mm.pDeviceHandler.pmConfigs.FreqOverride is set to false, then overall metric specific interval applies
216 mm.nextGlobalMetricCollectionTime = time.Now().Add(time.Duration(mm.pDeviceHandler.pmConfigs.DefaultFreq) * time.Second)
217 }
218 logger.Infow(ctx, "initialized standalone group/metric collection time", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
219}
220
221func (mm *onuMetricsManager) updateDefaultFrequency(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
222 // Verify that the configured DefaultFrequency is > 0 and is a multiple of FrequencyGranularity
223 if pmConfigs.DefaultFreq == 0 && pmConfigs.DefaultFreq%FrequencyGranularity != 0 {
224 logger.Errorf(ctx, "frequency-%u-should-be-a-multiple-of-%u", pmConfigs.DefaultFreq, FrequencyGranularity)
225 return fmt.Errorf("frequency-%d-should-be-a-multiple-of-%d", pmConfigs.DefaultFreq, FrequencyGranularity)
226 }
227 mm.pDeviceHandler.pmConfigs.DefaultFreq = pmConfigs.DefaultFreq
228 // re-set the nextGlobalMetricCollectionTime based on the new DefaultFreq
229 mm.nextGlobalMetricCollectionTime = time.Now().Add(time.Duration(mm.pDeviceHandler.pmConfigs.DefaultFreq) * time.Second)
230 logger.Debugw(ctx, "frequency-updated--new-frequency", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "frequency": mm.pDeviceHandler.pmConfigs.DefaultFreq})
231 return nil
232}
233
234func (mm *onuMetricsManager) updateGroupFreq(ctx context.Context, aGroupName string, pmConfigs *voltha.PmConfigs) error {
235 var newGroupFreq uint32
236 found := false
237 groupSliceIdx := 0
238 var group *voltha.PmGroupConfig
239 for groupSliceIdx, group = range pmConfigs.Groups {
240 if group.GroupName == aGroupName {
241 if group.GroupFreq != 0 { // freq 0 not allowed
242 newGroupFreq = group.GroupFreq
243 found = true
244 break
245 }
246 }
247 }
248 // if not found update group freq and next collection interval for the group
249 if !found {
250 logger.Errorw(ctx, "group name not found", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": aGroupName})
251 return fmt.Errorf("group-name-not-found-%v", aGroupName)
252 }
253
254 updated := false
255 mm.onuMetricsManagerLock.Lock()
256 defer mm.onuMetricsManagerLock.Unlock()
257 for k, v := range mm.groupMetricMap {
258 if k == aGroupName && newGroupFreq != 0 { // freq 0 not allowed
259 v.frequency = newGroupFreq
260 // update internal pm config
261 mm.pDeviceHandler.pmConfigs.Groups[groupSliceIdx].GroupFreq = newGroupFreq
262 // Also updated the next group metric collection time from now
263 v.nextCollectionInterval = time.Now().Add(time.Duration(newGroupFreq) * time.Second)
264 updated = true
265 logger.Infow(ctx, "group frequency updated", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "newGroupFreq": newGroupFreq, "groupName": aGroupName})
266 }
267 }
268 if !updated {
269 logger.Errorw(ctx, "group frequency not updated", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "newGroupFreq": newGroupFreq, "groupName": aGroupName})
270 return fmt.Errorf("internal-error-during-group-freq-update--groupname-%s-freq-%d", aGroupName, newGroupFreq)
271 }
272 return nil
273}
274
275func (mm *onuMetricsManager) updateMetricFreq(ctx context.Context, aMetricName string, pmConfigs *voltha.PmConfigs) error {
276 var newMetricFreq uint32
277 found := false
278 metricSliceIdx := 0
279 var metric *voltha.PmConfig
280 for metricSliceIdx, metric = range pmConfigs.Metrics {
281 if metric.Name == aMetricName {
282 if metric.SampleFreq != 0 { // freq 0 not allowed
283 newMetricFreq = metric.SampleFreq
284 found = true
285 break
286 }
287 }
288 }
289 if !found {
290 logger.Errorw(ctx, "metric name not found", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "metricName": aMetricName})
291 return fmt.Errorf("metric-name-not-found-%v", aMetricName)
292 }
293
294 updated := false
295 mm.onuMetricsManagerLock.Lock()
296 defer mm.onuMetricsManagerLock.Unlock()
297 for k, v := range mm.groupMetricMap {
298 if k == aMetricName && newMetricFreq != 0 {
299 v.frequency = newMetricFreq
300 // update internal pm config
301 mm.pDeviceHandler.pmConfigs.Metrics[metricSliceIdx].SampleFreq = newMetricFreq
302 // Also updated the next standalone metric collection time from now
303 v.nextCollectionInterval = time.Now().Add(time.Duration(newMetricFreq) * time.Second)
304 updated = true
305 logger.Infow(ctx, "metric frequency updated", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "newMetricFreq": newMetricFreq, "aMetricName": aMetricName})
306 }
307 }
308 if !updated {
309 logger.Errorw(ctx, "metric frequency not updated", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "newMetricFreq": newMetricFreq, "aMetricName": aMetricName})
310 return fmt.Errorf("internal-error-during-standalone-metric-update--matricnane-%s-freq-%d", aMetricName, newMetricFreq)
311 }
312 return nil
313}
314
315func (mm *onuMetricsManager) updateGroupSupport(ctx context.Context, aGroupName string, pmConfigs *voltha.PmConfigs) error {
316 groupSliceIdx := 0
317 var group *voltha.PmGroupConfig
318
319 for groupSliceIdx, group = range pmConfigs.Groups {
320 if group.GroupName == aGroupName {
321 break
322 }
323 }
324 if group == nil {
325 logger.Errorw(ctx, "group metric not found", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": aGroupName})
326 return fmt.Errorf("group-not-found--groupName-%s", aGroupName)
327 }
328
329 updated := false
330 mm.onuMetricsManagerLock.Lock()
331 defer mm.onuMetricsManagerLock.Unlock()
332 for k, v := range mm.groupMetricMap {
333 if k == aGroupName && v.enabled != group.Enabled {
334 mm.pDeviceHandler.pmConfigs.Groups[groupSliceIdx].Enabled = group.Enabled
335 v.enabled = group.Enabled
336 // If the group is now enabled and frequency override is enabled, set the next group metric collection time
337 if group.Enabled && mm.pDeviceHandler.pmConfigs.FreqOverride {
338 v.nextCollectionInterval = time.Now().Add(time.Duration(v.frequency) * time.Second)
339 }
340 updated = true
341 logger.Infow(ctx, "group metric support updated", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": aGroupName, "enabled": group.Enabled})
342 }
343 }
344
345 if !updated {
346 logger.Errorw(ctx, "group metric support not updated", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": aGroupName})
347 return fmt.Errorf("internal-error-during-group-support-update--groupName-%s", aGroupName)
348 }
349 return nil
350}
351
352func (mm *onuMetricsManager) updateMetricSupport(ctx context.Context, aMetricName string, pmConfigs *voltha.PmConfigs) error {
353 metricSliceIdx := 0
354 var metric *voltha.PmConfig
355
356 for metricSliceIdx, metric = range pmConfigs.Metrics {
357 if metric.Name == aMetricName {
358 break
359 }
360 }
361
362 if metric == nil {
363 logger.Errorw(ctx, "standalone metric not found", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "metricName": aMetricName})
364 return fmt.Errorf("metric-not-found--metricname-%s", aMetricName)
365 }
366
367 updated := false
368 mm.onuMetricsManagerLock.Lock()
369 defer mm.onuMetricsManagerLock.Unlock()
370 for k, v := range mm.standaloneMetricMap {
371 if k == aMetricName && v.enabled != metric.Enabled {
372 mm.pDeviceHandler.pmConfigs.Metrics[metricSliceIdx].Enabled = metric.Enabled
373 v.enabled = metric.Enabled
374 // If the standalone metric is now enabled and frequency override is enabled, set the next metric collection time
375 if metric.Enabled && mm.pDeviceHandler.pmConfigs.FreqOverride {
376 v.nextCollectionInterval = time.Now().Add(time.Duration(v.frequency) * time.Second)
377 }
378 updated = true
379 logger.Infow(ctx, "standalone metric support updated", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "metricName": aMetricName, "enabled": metric.Enabled})
380 }
381 }
382 if !updated {
383 logger.Errorw(ctx, "standalone metric support not updated", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "metricName": aMetricName})
384 return fmt.Errorf("internal-error-during-standalone-support-update--metricname-%s", aMetricName)
385 }
386 return nil
387}
388
389func (mm *onuMetricsManager) collectAllGroupAndStandaloneMetrics(ctx context.Context) {
390 if mm.pDeviceHandler.pmConfigs.Grouped { // metrics are managed as a group.
391 go mm.collectAllGroupMetrics(ctx)
392 } else {
393 go mm.collectAllStandaloneMetrics(ctx)
394 }
395}
396
397func (mm *onuMetricsManager) collectAllGroupMetrics(ctx context.Context) {
398 go func() {
399 logger.Debug(ctx, "startCollector before collecting optical metrics")
400 metricInfo := mm.collectOpticalMetrics(ctx)
401 if metricInfo != nil {
402 mm.publishMetrics(ctx, metricInfo)
403 }
404 }()
405
406 go func() {
407 logger.Debug(ctx, "startCollector before collecting uni metrics")
408 metricInfo := mm.collectUniStatusMetrics(ctx)
409 if metricInfo != nil {
410 mm.publishMetrics(ctx, metricInfo)
411 }
412 }()
413
414 // Add more here
415}
416
417func (mm *onuMetricsManager) collectAllStandaloneMetrics(ctx context.Context) {
418 // None exists as of now, add when available here
419}
420
421func (mm *onuMetricsManager) collectGroupMetric(ctx context.Context, groupName string) {
422 switch groupName {
423 case OpticalPowerGroupMetricName:
424 go func() {
425 if mi := mm.collectOpticalMetrics(ctx); mm != nil {
426 mm.publishMetrics(ctx, mi)
427 }
428 }()
429 case UniStatusGroupMetricName:
430 go func() {
431 if mi := mm.collectUniStatusMetrics(ctx); mm != nil {
432 mm.publishMetrics(ctx, mi)
433 }
434 }()
435 default:
436 logger.Errorw(ctx, "unhandled group metric name", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": groupName})
437 }
438}
439
440func (mm *onuMetricsManager) collectStandaloneMetric(ctx context.Context, metricName string) {
441 switch metricName {
442 // None exist as of now, add when available
443 default:
444 logger.Errorw(ctx, "unhandled standalone metric name", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "metricName": metricName})
445 }
446}
447
448// collectOpticalMetrics collects groups metrics related to optical power from ani-g ME.
Girish Gowdrae09a6202021-01-12 18:10:59 -0800449func (mm *onuMetricsManager) collectOpticalMetrics(ctx context.Context) []*voltha.MetricInformation {
450 logger.Debugw(ctx, "collectOpticalMetrics", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800451
452 mm.onuMetricsManagerLock.RLock()
453 if !mm.groupMetricMap[OpticalPowerGroupMetricName].enabled {
454 mm.onuMetricsManagerLock.RUnlock()
455 logger.Debugw(ctx, "optical power group metric is not enabled", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
456 return nil
457 }
458 mm.onuMetricsManagerLock.RUnlock()
459
Girish Gowdrae09a6202021-01-12 18:10:59 -0800460 var metricInfoSlice []*voltha.MetricInformation
461 metricsContext := make(map[string]string)
462 metricsContext["onuID"] = fmt.Sprintf("%d", mm.pDeviceHandler.device.ProxyAddress.OnuId)
463 metricsContext["intfID"] = fmt.Sprintf("%d", mm.pDeviceHandler.device.ProxyAddress.ChannelId)
464 metricsContext["devicetype"] = mm.pDeviceHandler.DeviceType
465
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800466 raisedTs := time.Now().Unix()
Girish Gowdrae09a6202021-01-12 18:10:59 -0800467 mmd := voltha.MetricMetaData{
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800468 Title: OpticalPowerGroupMetricName,
Girish Gowdrae09a6202021-01-12 18:10:59 -0800469 Ts: float64(raisedTs),
470 Context: metricsContext,
471 DeviceId: mm.pDeviceHandler.deviceID,
472 LogicalDeviceId: mm.pDeviceHandler.logicalDeviceID,
473 SerialNo: mm.pDeviceHandler.device.SerialNumber,
474 }
475
Girish Gowdrae09a6202021-01-12 18:10:59 -0800476 // get the ANI-G instance IDs
477 anigInstKeys := mm.pDeviceHandler.pOnuOmciDevice.pOnuDB.getSortedInstKeys(ctx, me.AniGClassID)
478loop:
479 for _, anigInstID := range anigInstKeys {
480 var meAttributes me.AttributeValueMap
481 opticalMetrics := make(map[string]float32)
482 // Get the ANI-G instance optical power attributes
483 requestedAttributes := me.AttributeValueMap{"OpticalSignalLevel": 0, "TransmitOpticalLevel": 0}
484 if meInstance := mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendGetMe(ctx, me.AniGClassID, anigInstID, requestedAttributes, ConstDefaultOmciTimeout, true, mm.commMetricsChan); meInstance != nil {
485 select {
486 case meAttributes = <-mm.opticalMetricsChan:
487 logger.Debugw(ctx, "received optical metrics", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
488 case <-time.After(time.Duration(ConstDefaultOmciTimeout) * time.Second):
489 logger.Errorw(ctx, "timeout waiting for omci-get response for uni status", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
490 // The metrics will be empty in this case
491 break loop
492 }
493 // Populate metric only if it was enabled.
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800494 for k := range OpticalPowerGroupMetrics {
495 switch k {
496 case "ani_g_instance_id":
497 if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
498 opticalMetrics[k] = float32(val.(uint16))
499 }
Girish Gowdrae09a6202021-01-12 18:10:59 -0800500 case "transmit_power":
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800501 if val, ok := meAttributes["TransmitOpticalLevel"]; ok && val != nil {
502 opticalMetrics[k] = float32(val.(uint16))
503 }
Girish Gowdrae09a6202021-01-12 18:10:59 -0800504 case "receive_power":
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800505 if val, ok := meAttributes["OpticalSignalLevel"]; ok && val != nil {
506 opticalMetrics[k] = float32(val.(uint16))
507 }
Girish Gowdrae09a6202021-01-12 18:10:59 -0800508 default:
509 // do nothing
510 }
511 }
512 }
513 // create slice of metrics given that there could be more than one ANI-G instance and
514 // optical metrics are collected per ANI-G instance
515 metricInfo := voltha.MetricInformation{Metadata: &mmd, Metrics: opticalMetrics}
516 metricInfoSlice = append(metricInfoSlice, &metricInfo)
517 }
518
519 return metricInfoSlice
520}
521
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800522// collectUniStatusMetrics collects UNI status group metric from various MEs (uni-g, pptp and veip).
Girish Gowdrae09a6202021-01-12 18:10:59 -0800523// nolint: gocyclo
524func (mm *onuMetricsManager) collectUniStatusMetrics(ctx context.Context) []*voltha.MetricInformation {
525 logger.Debugw(ctx, "collectUniStatusMetrics", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800526 mm.onuMetricsManagerLock.RLock()
527 if !mm.groupMetricMap[UniStatusGroupMetricName].enabled {
528 mm.onuMetricsManagerLock.RUnlock()
529 logger.Debugw(ctx, "uni status group metric is not enabled", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
530 return nil
531 }
532 mm.onuMetricsManagerLock.RUnlock()
533
Girish Gowdrae09a6202021-01-12 18:10:59 -0800534 var metricInfoSlice []*voltha.MetricInformation
535 metricsContext := make(map[string]string)
536 metricsContext["onuID"] = fmt.Sprintf("%d", mm.pDeviceHandler.device.ProxyAddress.OnuId)
537 metricsContext["intfID"] = fmt.Sprintf("%d", mm.pDeviceHandler.device.ProxyAddress.ChannelId)
538 metricsContext["devicetype"] = mm.pDeviceHandler.DeviceType
539
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800540 raisedTs := time.Now().Unix()
Girish Gowdrae09a6202021-01-12 18:10:59 -0800541 mmd := voltha.MetricMetaData{
542 Title: "UniStatus", // Is this ok to hard code?
543 Ts: float64(raisedTs),
544 Context: metricsContext,
545 DeviceId: mm.pDeviceHandler.deviceID,
546 LogicalDeviceId: mm.pDeviceHandler.logicalDeviceID,
547 SerialNo: mm.pDeviceHandler.device.SerialNumber,
548 }
549
Girish Gowdrae09a6202021-01-12 18:10:59 -0800550 // get the UNI-G instance IDs
551 unigInstKeys := mm.pDeviceHandler.pOnuOmciDevice.pOnuDB.getSortedInstKeys(ctx, me.UniGClassID)
552loop1:
553 for _, unigInstID := range unigInstKeys {
554 // TODO: Include additional information in the voltha.MetricMetaData - like portno, uni-id, instance-id
555 // to uniquely identify this ME instance and also to correlate the ME instance to physical instance
556 unigMetrics := make(map[string]float32)
557 var meAttributes me.AttributeValueMap
558 // Get the UNI-G instance optical power attributes
559 requestedAttributes := me.AttributeValueMap{"AdministrativeState": 0}
560 if meInstance := mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendGetMe(ctx, me.UniGClassID, unigInstID, requestedAttributes, ConstDefaultOmciTimeout, true, mm.commMetricsChan); meInstance != nil {
561 // Wait for metrics or timeout
562 select {
563 case meAttributes = <-mm.uniStatusMetricsChan:
564 logger.Debugw(ctx, "received uni-g metrics", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
565 case <-time.After(time.Duration(ConstDefaultOmciTimeout) * time.Second):
566 logger.Errorw(ctx, "timeout waiting for omci-get response for uni status", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
567 // The metrics could be empty in this case
568 break loop1
569 }
570 // Populate metric only if it was enabled.
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800571 for k := range UniStatusGroupMetrics {
572 switch k {
Girish Gowdrae09a6202021-01-12 18:10:59 -0800573 case "uni_admin_state":
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800574 if val, ok := meAttributes["AdministrativeState"]; ok && val != nil {
575 unigMetrics[k] = float32(val.(byte))
576 }
Girish Gowdrae09a6202021-01-12 18:10:59 -0800577 default:
578 // do nothing
579 }
580 }
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800581 var entityID uint32
582 if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
583 entityID = uint32(val.(uint16))
584 }
585 // TODO: Rlock needed for reading uniEntityMap?
586 if uniPort, ok := mm.pDeviceHandler.uniEntityMap[entityID]; ok && uniPort != nil {
587 unigMetrics["uni_port_no"] = float32(uniPort.portNo)
588 }
Girish Gowdrae09a6202021-01-12 18:10:59 -0800589 // create slice of metrics given that there could be more than one UNI-G instance
590 metricInfo := voltha.MetricInformation{Metadata: &mmd, Metrics: unigMetrics}
591 metricInfoSlice = append(metricInfoSlice, &metricInfo)
592 }
593 }
594
595 // get the PPTP instance IDs
596 pptpInstKeys := mm.pDeviceHandler.pOnuOmciDevice.pOnuDB.getSortedInstKeys(ctx, me.PhysicalPathTerminationPointEthernetUniClassID)
597loop2:
598 for _, pptpInstID := range pptpInstKeys {
599 // TODO: Include additional information in the voltha.MetricMetaData - like portno, uni-id, instance-id
600 // to uniquely identify this ME instance and also to correlate the ME instance to physical instance
601 var meAttributes me.AttributeValueMap
602 pptpMetrics := make(map[string]float32)
603
604 requestedAttributes := me.AttributeValueMap{"SensedType": 0, "OperationalState": 0, "AdministrativeState": 0}
605 if meInstance := mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendGetMe(ctx, me.PhysicalPathTerminationPointEthernetUniClassID, pptpInstID, requestedAttributes, ConstDefaultOmciTimeout, true, mm.commMetricsChan); meInstance != nil {
606 // Wait for metrics or timeout
607 select {
608 case meAttributes = <-mm.uniStatusMetricsChan:
609 logger.Debugw(ctx, "received pptp metrics", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
610 case <-time.After(time.Duration(ConstDefaultOmciTimeout) * time.Second):
611 logger.Errorw(ctx, "timeout waiting for omci-get response for uni status", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
612 // The metrics could be empty in this case
613 break loop2
614 }
615
616 // Populate metric only if it was enabled.
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800617 for k := range UniStatusGroupMetrics {
618 switch k {
Girish Gowdrae09a6202021-01-12 18:10:59 -0800619 case "ethernet_type":
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800620 if val, ok := meAttributes["SensedType"]; ok && val != nil {
621 pptpMetrics[k] = float32(val.(byte))
622 }
Girish Gowdrae09a6202021-01-12 18:10:59 -0800623 case "oper_status":
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800624 if val, ok := meAttributes["OperationalState"]; ok && val != nil {
625 pptpMetrics[k] = float32(val.(byte))
626 }
Girish Gowdrae09a6202021-01-12 18:10:59 -0800627 case "uni_admin_state":
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800628 if val, ok := meAttributes["AdministrativeState"]; ok && val != nil {
629 pptpMetrics[k] = float32(val.(byte))
630 }
Girish Gowdrae09a6202021-01-12 18:10:59 -0800631 default:
632 // do nothing
633 }
634 }
635 }
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800636 var entityID uint32
637 if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
638 entityID = uint32(val.(uint16))
639 }
640 // TODO: Rlock needed for reading uniEntityMap?
641 if uniPort, ok := mm.pDeviceHandler.uniEntityMap[entityID]; ok && uniPort != nil {
642 pptpMetrics["uni_port_no"] = float32(uniPort.portNo)
643 }
644
Girish Gowdrae09a6202021-01-12 18:10:59 -0800645 // create slice of metrics given that there could be more than one PPTP instance and
646 metricInfo := voltha.MetricInformation{Metadata: &mmd, Metrics: pptpMetrics}
647 metricInfoSlice = append(metricInfoSlice, &metricInfo)
648 }
649
650 // get the VEIP instance IDs
651 veipInstKeys := mm.pDeviceHandler.pOnuOmciDevice.pOnuDB.getSortedInstKeys(ctx, me.VirtualEthernetInterfacePointClassID)
652loop3:
653 for _, veipInstID := range veipInstKeys {
654 // TODO: Include additional information in the voltha.MetricMetaData - like portno, uni-id, instance-id
655 // to uniquely identify this ME instance and also to correlate the ME instance to physical instance
656 var meAttributes me.AttributeValueMap
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800657 veipMetrics := make(map[string]float32)
Girish Gowdrae09a6202021-01-12 18:10:59 -0800658
659 requestedAttributes := me.AttributeValueMap{"OperationalState": 0, "AdministrativeState": 0}
660 if meInstance := mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendGetMe(ctx, me.VirtualEthernetInterfacePointClassID, veipInstID, requestedAttributes, ConstDefaultOmciTimeout, true, mm.commMetricsChan); meInstance != nil {
661 // Wait for metrics or timeout
662 select {
663 case meAttributes = <-mm.uniStatusMetricsChan:
664 logger.Debugw(ctx, "received veip metrics", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
665 case <-time.After(time.Duration(ConstDefaultOmciTimeout) * time.Second):
666 logger.Errorw(ctx, "timeout waiting for omci-get response for uni status", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
667 // The metrics could be empty in this case
668 break loop3
669 }
670
671 // Populate metric only if it was enabled.
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800672 for k := range UniStatusGroupMetrics {
673 switch k {
Girish Gowdrae09a6202021-01-12 18:10:59 -0800674 case "oper_status":
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800675 if val, ok := meAttributes["OperationalState"]; ok && val != nil {
676 veipMetrics[k] = float32(val.(byte))
677 }
Girish Gowdrae09a6202021-01-12 18:10:59 -0800678 case "uni_admin_state":
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800679 if val, ok := meAttributes["AdministrativeState"]; ok && val != nil {
680 veipMetrics[k] = float32(val.(byte))
681 }
Girish Gowdrae09a6202021-01-12 18:10:59 -0800682 default:
683 // do nothing
684 }
685 }
686 }
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800687
688 var entityID uint32
689 if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
690 entityID = uint32(meAttributes["ManagedEntityId"].(uint16))
691 }
692 // TODO: Rlock needed for reading uniEntityMap?
693 if uniPort, ok := mm.pDeviceHandler.uniEntityMap[entityID]; ok && uniPort != nil {
694 veipMetrics["uni_port_no"] = float32(uniPort.portNo)
695 }
696
Girish Gowdrae09a6202021-01-12 18:10:59 -0800697 // create slice of metrics given that there could be more than one VEIP instance
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800698 metricInfo := voltha.MetricInformation{Metadata: &mmd, Metrics: veipMetrics}
Girish Gowdrae09a6202021-01-12 18:10:59 -0800699 metricInfoSlice = append(metricInfoSlice, &metricInfo)
700 }
701
702 return metricInfoSlice
703}
704
705// publishMetrics publishes the metrics on kafka
706func (mm *onuMetricsManager) publishMetrics(ctx context.Context, metricInfo []*voltha.MetricInformation) {
707 var ke voltha.KpiEvent2
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800708 ts := time.Now().Unix()
Girish Gowdrae09a6202021-01-12 18:10:59 -0800709 ke.SliceData = metricInfo
710 ke.Type = voltha.KpiEventType_slice
711 ke.Ts = float64(ts)
712
713 if err := mm.pDeviceHandler.EventProxy.SendKpiEvent(ctx, "STATS_EVENT", &ke, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_ONU, ts); err != nil {
714 logger.Errorw(ctx, "failed-to-send-pon-stats", log.Fields{"err": err})
715 }
716}
717
718func (mm *onuMetricsManager) processOmciMessages(ctx context.Context) {
719 logger.Infow(ctx, "Start routine to process OMCI-GET messages for device-id", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
720 // Flush metric collection channels to be safe.
721 // It is possible that there is stale data on this channel if the processOmciMessages routine
722 // is stopped right after issuing a OMCI-GET request and started again.
723 // The processOmciMessages routine will get stopped if startCollector routine (in device_handler.go)
724 // is stopped - as a result of ONU going down.
725 mm.flushMetricCollectionChannels(ctx)
726
727 for {
728 select {
729 case <-mm.stopProcessingOmciResponses: // stop this routine
730 logger.Infow(ctx, "Stop routine to process OMCI-GET messages for device-id", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
731 return
732 case message, ok := <-mm.commMetricsChan:
733 if !ok {
734 logger.Errorw(ctx, "Message couldn't be read from channel", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
735 continue
736 }
737 logger.Debugw(ctx, "Received message on ONU metrics channel", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
738
739 switch message.Type {
740 case OMCI:
741 msg, _ := message.Data.(OmciMessage)
742 mm.handleOmciMessage(ctx, msg)
743 default:
744 logger.Warn(ctx, "Unknown message type received", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "message.Type": message.Type})
745 }
746 }
747 }
748}
749
750func (mm *onuMetricsManager) handleOmciMessage(ctx context.Context, msg OmciMessage) {
751 logger.Debugw(ctx, "omci Msg", log.Fields{"device-id": mm.pDeviceHandler.deviceID,
752 "msgType": msg.OmciMsg.MessageType, "msg": msg})
753 switch msg.OmciMsg.MessageType {
754 case omci.GetResponseType:
755 //TODO: error handling
756 _ = mm.handleOmciGetResponseMessage(ctx, msg)
757
758 default:
759 logger.Warnw(ctx, "Unknown Message Type", log.Fields{"msgType": msg.OmciMsg.MessageType})
760
761 }
762}
763
764func (mm *onuMetricsManager) handleOmciGetResponseMessage(ctx context.Context, msg OmciMessage) error {
765 msgLayer := (*msg.OmciPacket).Layer(omci.LayerTypeGetResponse)
766 if msgLayer == nil {
767 logger.Errorw(ctx, "omci Msg layer could not be detected for GetResponse - handling stopped", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
768 return fmt.Errorf("omci Msg layer could not be detected for GetResponse - handling stopped: %s", mm.pDeviceHandler.deviceID)
769 }
770 msgObj, msgOk := msgLayer.(*omci.GetResponse)
771 if !msgOk {
772 logger.Errorw(ctx, "omci Msg layer could not be assigned for GetResponse - handling stopped", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
773 return fmt.Errorf("omci Msg layer could not be assigned for GetResponse - handling stopped: %s", mm.pDeviceHandler.deviceID)
774 }
775 logger.Debugw(ctx, "OMCI GetResponse Data", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "data-fields": msgObj})
776 if msgObj.Result == me.Success {
777 meAttributes := msgObj.Attributes
778 switch msgObj.EntityClass {
779 case me.AniGClassID:
780 mm.opticalMetricsChan <- meAttributes
781 return nil
782 case me.UniGClassID:
783 mm.uniStatusMetricsChan <- meAttributes
784 return nil
785 case me.PhysicalPathTerminationPointEthernetUniClassID:
786 mm.uniStatusMetricsChan <- meAttributes
787 return nil
788 case me.VirtualEthernetInterfacePointClassID:
789 mm.uniStatusMetricsChan <- meAttributes
790 return nil
791 default:
792 logger.Errorw(ctx, "unhandled omci get response message",
793 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "class-id": msgObj.EntityClass})
794 }
795 }
796
797 return errors.New("unhandled-omci-get-response-message")
798}
799
800// flushMetricCollectionChannels flushes all metric collection channels for any stale OMCI responses
801func (mm *onuMetricsManager) flushMetricCollectionChannels(ctx context.Context) {
802 // flush commMetricsChan
803 select {
804 case <-mm.commMetricsChan:
805 logger.Debug(ctx, "flushed common metrics channel")
806 default:
807 }
808
809 // flush opticalMetricsChan
810 select {
811 case <-mm.opticalMetricsChan:
812 logger.Debug(ctx, "flushed optical metrics channel")
813 default:
814 }
815
816 // flush uniStatusMetricsChan
817 select {
818 case <-mm.uniStatusMetricsChan:
819 logger.Debug(ctx, "flushed uni status metrics channel")
820 default:
821 }
822}