blob: 984f8abb515db4f63982afc1a3c832a97dabea77 [file] [log] [blame]
Girish Gowdrae09a6202021-01-12 18:10:59 -08001/*
2 * Copyright 2021-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//Package adaptercoreonu provides the utility for onu devices, flows and statistics
18package adaptercoreonu
19
20import (
21 "context"
Girish Gowdra0e533642021-03-02 22:02:51 -080022 "encoding/json"
Girish Gowdrae09a6202021-01-12 18:10:59 -080023 "fmt"
Holger Hildebrandt44a0d4f2021-03-18 14:00:54 +000024 "math"
25 "sync"
26 "time"
27
Girish Gowdrae0140f02021-02-02 16:55:09 -080028 "github.com/looplab/fsm"
Girish Gowdrae09a6202021-01-12 18:10:59 -080029 "github.com/opencord/omci-lib-go"
30 me "github.com/opencord/omci-lib-go/generated"
Girish Gowdra0e533642021-03-02 22:02:51 -080031 "github.com/opencord/voltha-lib-go/v4/pkg/db"
32 "github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore"
Girish Gowdrae09a6202021-01-12 18:10:59 -080033 "github.com/opencord/voltha-lib-go/v4/pkg/log"
34 "github.com/opencord/voltha-protos/v4/go/voltha"
Girish Gowdrae09a6202021-01-12 18:10:59 -080035)
36
Girish Gowdrae0140f02021-02-02 16:55:09 -080037const (
38 // events of L2 PM FSM
39 l2PmEventInit = "l2PmEventInit"
40 l2PmEventTick = "l2PmEventTick"
41 l2PmEventSuccess = "l2PmEventSuccess"
42 l2PmEventFailure = "l2PmEventFailure"
43 l2PmEventAddMe = "l2PmEventAddMe"
44 l2PmEventDeleteMe = "l2PmEventDeleteMe"
45 l2PmEventStop = "l2PmEventStop"
46)
47const (
48 // states of L2 PM FSM
49 l2PmStNull = "l2PmStNull"
50 l2PmStStarting = "l2PmStStarting"
51 l2PmStSyncTime = "l2PmStSyncTime"
52 l2PmStIdle = "l2PmStIdle"
53 l2PmStCreatePmMe = "l2PmStCreatePm"
54 l2PmStDeletePmMe = "l2PmStDeletePmMe"
55 l2PmStCollectData = "l2PmStCollectData"
56)
57
58const cL2PmFsmIdleState = l2PmStIdle
59
Girish Gowdra5a7c4922021-01-22 18:33:41 -080060// general constants used for overall Metric Collection management
61const (
62 DefaultMetricCollectionFrequency = 15 * 60 // unit in seconds. This setting can be changed from voltha NBI PmConfig configuration
63 GroupMetricEnabled = true // This is READONLY and cannot be changed from VOLTHA NBI
64 DefaultFrequencyOverrideEnabled = true // This is READONLY and cannot be changed from VOLTHA NBI
65 FrequencyGranularity = 5 // The frequency (in seconds) has to be multiple of 5. This setting cannot changed later.
66)
67
68// OpticalPowerGroupMetrics are supported optical pm names
69var OpticalPowerGroupMetrics = map[string]voltha.PmConfig_PmType{
Girish Gowdrae20a4f62021-03-09 16:06:23 -080070 "ani_g_instance_id": voltha.PmConfig_CONTEXT,
71 "transmit_power_dBm": voltha.PmConfig_GAUGE,
72 "receive_power_dBm": voltha.PmConfig_GAUGE,
Girish Gowdra5a7c4922021-01-22 18:33:41 -080073}
74
75// OpticalPowerGroupMetrics specific constants
76const (
Girish Gowdrae0140f02021-02-02 16:55:09 -080077 OpticalPowerGroupMetricName = "PON_Optical"
Girish Gowdra5a7c4922021-01-22 18:33:41 -080078 OpticalPowerGroupMetricEnabled = true // This setting can be changed from voltha NBI PmConfig configuration
79 OpticalPowerMetricGroupCollectionFrequency = 5 * 60 // unit in seconds. This setting can be changed from voltha NBI PmConfig configuration
80)
81
82// UniStatusGroupMetrics are supported UNI status names
83var UniStatusGroupMetrics = map[string]voltha.PmConfig_PmType{
84 "uni_port_no": voltha.PmConfig_CONTEXT,
Girish Gowdrada3a52f2021-03-17 11:24:11 -070085 "me_class_id": voltha.PmConfig_CONTEXT,
Girish Gowdra0e533642021-03-02 22:02:51 -080086 "entity_id": voltha.PmConfig_CONTEXT,
Girish Gowdrada3a52f2021-03-17 11:24:11 -070087 "sensed_type": voltha.PmConfig_GAUGE,
Girish Gowdra5a7c4922021-01-22 18:33:41 -080088 "oper_status": voltha.PmConfig_GAUGE,
89 "uni_admin_state": voltha.PmConfig_GAUGE,
90}
91
92// UniStatusGroupMetrics specific constants
93const (
Girish Gowdrae0140f02021-02-02 16:55:09 -080094 UniStatusGroupMetricName = "UNI_Status"
Girish Gowdra5a7c4922021-01-22 18:33:41 -080095 UniStatusGroupMetricEnabled = true // This setting can be changed from voltha NBI PmConfig configuration
96 UniStatusMetricGroupCollectionFrequency = 5 * 60 // unit in seconds. This setting can be changed from voltha NBI PmConfig configuration
97)
98
Girish Gowdrae0140f02021-02-02 16:55:09 -080099// *** Classical L2 PM Counters begin ***
100
101// EthernetBridgeHistory are supported ethernet bridge history counters fetched from
102// Ethernet Frame Performance Monitoring History Data Downstream and Ethernet Frame Performance Monitoring History Data Upstream MEs.
103var EthernetBridgeHistory = map[string]voltha.PmConfig_PmType{
104 "class_id": voltha.PmConfig_CONTEXT,
105 "entity_id": voltha.PmConfig_CONTEXT,
106 "interval_end_time": voltha.PmConfig_CONTEXT,
107 "parent_class_id": voltha.PmConfig_CONTEXT,
108 "parent_entity_id": voltha.PmConfig_CONTEXT,
109 "upstream": voltha.PmConfig_CONTEXT,
110
111 "drop_events": voltha.PmConfig_COUNTER,
112 "octets": voltha.PmConfig_COUNTER,
113 "packets": voltha.PmConfig_COUNTER,
114 "broadcast_packets": voltha.PmConfig_COUNTER,
115 "multicast_packets": voltha.PmConfig_COUNTER,
116 "crc_errored_packets": voltha.PmConfig_COUNTER,
117 "undersize_packets": voltha.PmConfig_COUNTER,
118 "oversize_packets": voltha.PmConfig_COUNTER,
119 "64_octets": voltha.PmConfig_COUNTER,
120 "65_to_127_octets": voltha.PmConfig_COUNTER,
121 "128_to_255_octets": voltha.PmConfig_COUNTER,
122 "256_to_511_octets": voltha.PmConfig_COUNTER,
123 "512_to_1023_octets": voltha.PmConfig_COUNTER,
124 "1024_to_1518_octets": voltha.PmConfig_COUNTER,
125}
126
127// EthernetUniHistory are supported ethernet uni history counters fetched from
128// Ethernet Performance Monitoring History Data ME.
129var EthernetUniHistory = map[string]voltha.PmConfig_PmType{
130 "class_id": voltha.PmConfig_CONTEXT,
131 "entity_id": voltha.PmConfig_CONTEXT,
132 "interval_end_time": voltha.PmConfig_CONTEXT,
133
134 "fcs_errors": voltha.PmConfig_COUNTER,
135 "excessive_collision_counter": voltha.PmConfig_COUNTER,
136 "late_collision_counter": voltha.PmConfig_COUNTER,
137 "frames_too_long": voltha.PmConfig_COUNTER,
138 "buffer_overflows_on_rx": voltha.PmConfig_COUNTER,
139 "buffer_overflows_on_tx": voltha.PmConfig_COUNTER,
140 "single_collision_frame_counter": voltha.PmConfig_COUNTER,
141 "multiple_collisions_frame_counter": voltha.PmConfig_COUNTER,
142 "sqe_counter": voltha.PmConfig_COUNTER,
143 "deferred_tx_counter": voltha.PmConfig_COUNTER,
144 "internal_mac_tx_error_counter": voltha.PmConfig_COUNTER,
145 "carrier_sense_error_counter": voltha.PmConfig_COUNTER,
146 "alignment_error_counter": voltha.PmConfig_COUNTER,
147 "internal_mac_rx_error_counter": voltha.PmConfig_COUNTER,
148}
149
Girish Gowdra5c5aaf42021-02-17 19:40:50 -0800150// FecHistory is supported FEC Performance Monitoring History Data related metrics
151var FecHistory = map[string]voltha.PmConfig_PmType{
152 "class_id": voltha.PmConfig_CONTEXT,
153 "entity_id": voltha.PmConfig_CONTEXT,
154 "interval_end_time": voltha.PmConfig_CONTEXT,
155
156 "corrected_bytes": voltha.PmConfig_COUNTER,
157 "corrected_code_words": voltha.PmConfig_COUNTER,
158 "uncorrectable_code_words": voltha.PmConfig_COUNTER,
159 "total_code_words": voltha.PmConfig_COUNTER,
160 "fec_seconds": voltha.PmConfig_COUNTER,
161}
162
163// GemPortHistory is supported GEM Port Network Ctp Performance Monitoring History Data
164// related metrics
165var GemPortHistory = map[string]voltha.PmConfig_PmType{
166 "class_id": voltha.PmConfig_CONTEXT,
167 "entity_id": voltha.PmConfig_CONTEXT,
168 "interval_end_time": voltha.PmConfig_CONTEXT,
169
170 "transmitted_gem_frames": voltha.PmConfig_COUNTER,
171 "received_gem_frames": voltha.PmConfig_COUNTER,
172 "received_payload_bytes": voltha.PmConfig_COUNTER,
173 "transmitted_payload_bytes": voltha.PmConfig_COUNTER,
174 "encryption_key_errors": voltha.PmConfig_COUNTER,
175}
176
Girish Gowdrae0140f02021-02-02 16:55:09 -0800177// Constants specific for L2 PM collection
178const (
179 L2PmCollectionInterval = 15 * 60 // Unit in seconds. Do not change this as this fixed by OMCI specification for L2 PM counters
180 SyncTimeRetryInterval = 15 // Unit seconds
181 L2PmCreateAttempts = 3
Girish Gowdra36ccf7d2021-02-25 20:42:51 -0800182 L2PmDeleteAttempts = 3
Girish Gowdrae0140f02021-02-02 16:55:09 -0800183 L2PmCollectAttempts = 3
Girish Gowdra453750f2021-02-16 16:36:46 -0800184 // Per Table 11.2.9-1 – OMCI baseline message limitations in G.988 spec, the max GET Response
185 // payload size is 25. We define 24 (one less) to allow for dynamic insertion of IntervalEndTime
186 // attribute (1 byte) in L2 PM GET Requests.
187 MaxL2PMGetPayLoadSize = 24
Girish Gowdrae0140f02021-02-02 16:55:09 -0800188)
189
190// EthernetUniHistoryName specific constants
191const (
192 EthernetBridgeHistoryName = "Ethernet_Bridge_Port_History"
193 EthernetBridgeHistoryEnabled = true // This setting can be changed from voltha NBI PmConfig configuration
194 EthernetBridgeHistoryFrequency = L2PmCollectionInterval
195)
196
197// EthernetBridgeHistory specific constants
198const (
199 EthernetUniHistoryName = "Ethernet_UNI_History"
200 EthernetUniHistoryEnabled = true // This setting can be changed from voltha NBI PmConfig configuration
201 EthernetUniHistoryFrequency = L2PmCollectionInterval
202)
203
Girish Gowdra5c5aaf42021-02-17 19:40:50 -0800204// FecHistory specific constants
205const (
206 FecHistoryName = "FEC_History"
207 FecHistoryEnabled = true // This setting can be changed from voltha NBI PmConfig configuration
208 FecHistoryFrequency = L2PmCollectionInterval
209)
210
211// GemPortHistory specific constants
212const (
213 GemPortHistoryName = "GEM_Port_History"
214 GemPortHistoryEnabled = true // This setting can be changed from voltha NBI PmConfig configuration
215 GemPortHistoryFrequency = L2PmCollectionInterval
216)
217
Girish Gowdra0e533642021-03-02 22:02:51 -0800218// KV Store related constants
219const (
220 cPmKvStorePrefix = "%s/openonu/pm-data/%s" // <some-base-path>/openonu/pm-data/<onu-device-id>
221 cPmAdd = "add"
222 cPmAdded = "added"
223 cPmRemove = "remove"
224 cPmRemoved = "removed"
225)
226
Girish Gowdra5c5aaf42021-02-17 19:40:50 -0800227// Defines the type for generic metric population function
228type groupMetricPopulateFunc func(context.Context, me.ClassID, uint16, me.AttributeValueMap, me.AttributeValueMap, map[string]float32, *int) error
229
Girish Gowdrae0140f02021-02-02 16:55:09 -0800230// *** Classical L2 PM Counters end ***
231
Girish Gowdra0e533642021-03-02 22:02:51 -0800232type pmMEData struct {
233 InstancesActive []uint16 `json:"instances_active"` // list of active ME instance IDs for the group
234 InstancesToDelete []uint16 `json:"instances_to_delete"` // list of ME instance IDs marked for deletion for the group
235 InstancesToAdd []uint16 `json:"instances_to_add"` // list of ME instance IDs marked for addition for the group
236}
237
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800238type groupMetric struct {
239 groupName string
240 enabled bool
241 frequency uint32 // valid only if FrequencyOverride is enabled.
242 metricMap map[string]voltha.PmConfig_PmType
243 nextCollectionInterval time.Time // valid only if FrequencyOverride is enabled.
Girish Gowdrae0140f02021-02-02 16:55:09 -0800244 isL2PMCounter bool // true for only L2 PM counters
245 collectAttempts uint32 // number of attempts to collect L2 PM data
Girish Gowdra0e533642021-03-02 22:02:51 -0800246 pmMEData *pmMEData
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800247}
248
249type standaloneMetric struct {
250 metricName string
251 enabled bool
252 frequency uint32 // valid only if FrequencyOverride is enabled.
253 nextCollectionInterval time.Time // valid only if FrequencyOverride is enabled.
254}
255
Girish Gowdrae09a6202021-01-12 18:10:59 -0800256type onuMetricsManager struct {
257 pDeviceHandler *deviceHandler
Girish Gowdrae0140f02021-02-02 16:55:09 -0800258 pAdaptFsm *AdapterFsm
Girish Gowdrae09a6202021-01-12 18:10:59 -0800259
Girish Gowdrae0140f02021-02-02 16:55:09 -0800260 opticalMetricsChan chan me.AttributeValueMap
261 uniStatusMetricsChan chan me.AttributeValueMap
262 l2PmChan chan me.AttributeValueMap
263 syncTimeResponseChan chan bool // true is success, false is fail
264 l2PmCreateOrDeleteResponseChan chan bool // true is success, false is fail
265
266 activeL2Pms []string // list of active l2 pm MEs created on the ONU.
267 l2PmToDelete []string // list of L2 PMs to delete
268 l2PmToAdd []string // list of L2 PM to add
Girish Gowdrae09a6202021-01-12 18:10:59 -0800269
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800270 groupMetricMap map[string]*groupMetric
271 standaloneMetricMap map[string]*standaloneMetric
272
Girish Gowdrae09a6202021-01-12 18:10:59 -0800273 stopProcessingOmciResponses chan bool
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800274
Girish Gowdrae0140f02021-02-02 16:55:09 -0800275 stopTicks chan bool
276
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800277 nextGlobalMetricCollectionTime time.Time // valid only if pmConfig.FreqOverride is set to false.
278
279 onuMetricsManagerLock sync.RWMutex
Girish Gowdra0e533642021-03-02 22:02:51 -0800280
281 pmKvStore *db.Backend
Girish Gowdrae09a6202021-01-12 18:10:59 -0800282}
283
284// newonuMetricsManager returns a new instance of the newonuMetricsManager
Girish Gowdra0e533642021-03-02 22:02:51 -0800285// The metrics manager module is responsible for configuration and management of individual and group metrics.
286// Currently all the metrics are managed as a group which fall into two categories - L2 PM and "all others"
287// The L2 PM counters have a fixed 15min interval for PM collection while all other group counters have
288// the collection interval configurable.
289// The global PM config is part of the voltha.Device struct and is backed up on KV store (by rw-core).
290// This module also implements resiliency for L2 PM ME instances that are active/pending-delete/pending-add.
Girish Gowdrae09a6202021-01-12 18:10:59 -0800291func newonuMetricsManager(ctx context.Context, dh *deviceHandler) *onuMetricsManager {
292
293 var metricsManager onuMetricsManager
294 logger.Debugw(ctx, "init-onuMetricsManager", log.Fields{"device-id": dh.deviceID})
295 metricsManager.pDeviceHandler = dh
296
Girish Gowdrae0140f02021-02-02 16:55:09 -0800297 commMetricsChan := make(chan Message)
Girish Gowdrae09a6202021-01-12 18:10:59 -0800298 metricsManager.opticalMetricsChan = make(chan me.AttributeValueMap)
299 metricsManager.uniStatusMetricsChan = make(chan me.AttributeValueMap)
Girish Gowdrae0140f02021-02-02 16:55:09 -0800300 metricsManager.l2PmChan = make(chan me.AttributeValueMap)
301
302 metricsManager.syncTimeResponseChan = make(chan bool)
303 metricsManager.l2PmCreateOrDeleteResponseChan = make(chan bool)
304
Girish Gowdrae09a6202021-01-12 18:10:59 -0800305 metricsManager.stopProcessingOmciResponses = make(chan bool)
Girish Gowdrae0140f02021-02-02 16:55:09 -0800306 metricsManager.stopTicks = make(chan bool)
Girish Gowdrae09a6202021-01-12 18:10:59 -0800307
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800308 metricsManager.groupMetricMap = make(map[string]*groupMetric)
309 metricsManager.standaloneMetricMap = make(map[string]*standaloneMetric)
310
311 if dh.pmConfigs == nil { // dh.pmConfigs is NOT nil if adapter comes back from a restart. We should NOT go back to defaults in this case
Girish Gowdra5c5aaf42021-02-17 19:40:50 -0800312 metricsManager.initializeAllGroupMetrics()
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800313 }
314
Girish Gowdra5c5aaf42021-02-17 19:40:50 -0800315 metricsManager.populateLocalGroupMetricData(ctx)
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800316
Girish Gowdra5c5aaf42021-02-17 19:40:50 -0800317 if err := metricsManager.initializeL2PmFsm(ctx, commMetricsChan); err != nil {
Girish Gowdrae0140f02021-02-02 16:55:09 -0800318 return nil
319 }
Girish Gowdrae0140f02021-02-02 16:55:09 -0800320
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800321 // initialize the next metric collection intervals.
322 metricsManager.initializeMetricCollectionTime(ctx)
Girish Gowdra0e533642021-03-02 22:02:51 -0800323
324 baseKvStorePath := fmt.Sprintf(cPmKvStorePrefix, dh.pOpenOnuAc.cm.Backend.PathPrefix, dh.deviceID)
325 metricsManager.pmKvStore = dh.setBackend(ctx, baseKvStorePath)
326 if metricsManager.pmKvStore == nil {
327 logger.Errorw(ctx, "Can't initialize pmKvStore - no backend connection to PM module",
328 log.Fields{"device-id": dh.deviceID, "service": baseKvStorePath})
329 return nil
330 }
331
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800332 logger.Info(ctx, "init-onuMetricsManager completed", log.Fields{"device-id": dh.deviceID})
Girish Gowdrae09a6202021-01-12 18:10:59 -0800333 return &metricsManager
334}
335
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800336func (mm *onuMetricsManager) initializeMetricCollectionTime(ctx context.Context) {
337 if mm.pDeviceHandler.pmConfigs.FreqOverride {
338 // If mm.pDeviceHandler.pmConfigs.FreqOverride is set to true, then group/standalone metric specific interval applies
339 mm.onuMetricsManagerLock.Lock()
340 defer mm.onuMetricsManagerLock.Unlock()
341 for _, v := range mm.groupMetricMap {
Girish Gowdrae0140f02021-02-02 16:55:09 -0800342 if v.enabled && !v.isL2PMCounter { // L2 PM counter collection is managed in a L2PmFsm
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800343 v.nextCollectionInterval = time.Now().Add(time.Duration(v.frequency) * time.Second)
344 }
345 }
346
347 for _, v := range mm.standaloneMetricMap {
348 if v.enabled {
349 v.nextCollectionInterval = time.Now().Add(time.Duration(v.frequency) * time.Second)
350 }
351 }
352 } else {
353 // If mm.pDeviceHandler.pmConfigs.FreqOverride is set to false, then overall metric specific interval applies
354 mm.nextGlobalMetricCollectionTime = time.Now().Add(time.Duration(mm.pDeviceHandler.pmConfigs.DefaultFreq) * time.Second)
355 }
356 logger.Infow(ctx, "initialized standalone group/metric collection time", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
357}
358
359func (mm *onuMetricsManager) updateDefaultFrequency(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
360 // Verify that the configured DefaultFrequency is > 0 and is a multiple of FrequencyGranularity
Girish Gowdraaf0ad632021-01-27 13:00:01 -0800361 if pmConfigs.DefaultFreq == 0 || (pmConfigs.DefaultFreq > 0 && pmConfigs.DefaultFreq%FrequencyGranularity != 0) {
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800362 logger.Errorf(ctx, "frequency-%u-should-be-a-multiple-of-%u", pmConfigs.DefaultFreq, FrequencyGranularity)
363 return fmt.Errorf("frequency-%d-should-be-a-multiple-of-%d", pmConfigs.DefaultFreq, FrequencyGranularity)
364 }
365 mm.pDeviceHandler.pmConfigs.DefaultFreq = pmConfigs.DefaultFreq
366 // re-set the nextGlobalMetricCollectionTime based on the new DefaultFreq
367 mm.nextGlobalMetricCollectionTime = time.Now().Add(time.Duration(mm.pDeviceHandler.pmConfigs.DefaultFreq) * time.Second)
368 logger.Debugw(ctx, "frequency-updated--new-frequency", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "frequency": mm.pDeviceHandler.pmConfigs.DefaultFreq})
369 return nil
370}
371
372func (mm *onuMetricsManager) updateGroupFreq(ctx context.Context, aGroupName string, pmConfigs *voltha.PmConfigs) error {
373 var newGroupFreq uint32
374 found := false
375 groupSliceIdx := 0
376 var group *voltha.PmGroupConfig
377 for groupSliceIdx, group = range pmConfigs.Groups {
378 if group.GroupName == aGroupName {
Girish Gowdraaf0ad632021-01-27 13:00:01 -0800379 // freq 0 is not allowed and it should be multiple of FrequencyGranularity
380 if group.GroupFreq == 0 || (group.GroupFreq > 0 && group.GroupFreq%FrequencyGranularity != 0) {
381 logger.Errorf(ctx, "frequency-%u-should-be-a-multiple-of-%u", group.GroupFreq, FrequencyGranularity)
382 return fmt.Errorf("frequency-%d-should-be-a-multiple-of-%d", group.GroupFreq, FrequencyGranularity)
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800383 }
Girish Gowdraaf0ad632021-01-27 13:00:01 -0800384 newGroupFreq = group.GroupFreq
385 found = true
386 break
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800387 }
388 }
389 // if not found update group freq and next collection interval for the group
390 if !found {
391 logger.Errorw(ctx, "group name not found", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": aGroupName})
392 return fmt.Errorf("group-name-not-found-%v", aGroupName)
393 }
394
395 updated := false
396 mm.onuMetricsManagerLock.Lock()
397 defer mm.onuMetricsManagerLock.Unlock()
398 for k, v := range mm.groupMetricMap {
Girish Gowdrae0140f02021-02-02 16:55:09 -0800399 if k == aGroupName && !v.isL2PMCounter { // We cannot allow the L2 PM counter frequency to be updated. It is 15min fixed by OMCI spec
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800400 v.frequency = newGroupFreq
401 // update internal pm config
402 mm.pDeviceHandler.pmConfigs.Groups[groupSliceIdx].GroupFreq = newGroupFreq
403 // Also updated the next group metric collection time from now
404 v.nextCollectionInterval = time.Now().Add(time.Duration(newGroupFreq) * time.Second)
405 updated = true
406 logger.Infow(ctx, "group frequency updated", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "newGroupFreq": newGroupFreq, "groupName": aGroupName})
Girish Gowdra36ccf7d2021-02-25 20:42:51 -0800407 break
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800408 }
409 }
410 if !updated {
411 logger.Errorw(ctx, "group frequency not updated", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "newGroupFreq": newGroupFreq, "groupName": aGroupName})
412 return fmt.Errorf("internal-error-during-group-freq-update--groupname-%s-freq-%d", aGroupName, newGroupFreq)
413 }
414 return nil
415}
416
417func (mm *onuMetricsManager) updateMetricFreq(ctx context.Context, aMetricName string, pmConfigs *voltha.PmConfigs) error {
418 var newMetricFreq uint32
419 found := false
420 metricSliceIdx := 0
421 var metric *voltha.PmConfig
422 for metricSliceIdx, metric = range pmConfigs.Metrics {
423 if metric.Name == aMetricName {
Girish Gowdraaf0ad632021-01-27 13:00:01 -0800424 // freq 0 is not allowed and it should be multiple of FrequencyGranularity
425 if metric.SampleFreq == 0 || (metric.SampleFreq > 0 && metric.SampleFreq%FrequencyGranularity != 0) {
426 logger.Errorf(ctx, "frequency-%u-should-be-a-multiple-of-%u", metric.SampleFreq, FrequencyGranularity)
427 return fmt.Errorf("frequency-%d-should-be-a-multiple-of-%d", metric.SampleFreq, FrequencyGranularity)
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800428 }
Girish Gowdraaf0ad632021-01-27 13:00:01 -0800429 newMetricFreq = metric.SampleFreq
430 found = true
431 break
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800432 }
433 }
434 if !found {
435 logger.Errorw(ctx, "metric name not found", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "metricName": aMetricName})
436 return fmt.Errorf("metric-name-not-found-%v", aMetricName)
437 }
438
439 updated := false
440 mm.onuMetricsManagerLock.Lock()
441 defer mm.onuMetricsManagerLock.Unlock()
442 for k, v := range mm.groupMetricMap {
Girish Gowdraaf0ad632021-01-27 13:00:01 -0800443 if k == aMetricName {
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800444 v.frequency = newMetricFreq
445 // update internal pm config
446 mm.pDeviceHandler.pmConfigs.Metrics[metricSliceIdx].SampleFreq = newMetricFreq
447 // Also updated the next standalone metric collection time from now
448 v.nextCollectionInterval = time.Now().Add(time.Duration(newMetricFreq) * time.Second)
449 updated = true
450 logger.Infow(ctx, "metric frequency updated", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "newMetricFreq": newMetricFreq, "aMetricName": aMetricName})
Girish Gowdra36ccf7d2021-02-25 20:42:51 -0800451 break
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800452 }
453 }
454 if !updated {
455 logger.Errorw(ctx, "metric frequency not updated", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "newMetricFreq": newMetricFreq, "aMetricName": aMetricName})
456 return fmt.Errorf("internal-error-during-standalone-metric-update--matricnane-%s-freq-%d", aMetricName, newMetricFreq)
457 }
458 return nil
459}
460
461func (mm *onuMetricsManager) updateGroupSupport(ctx context.Context, aGroupName string, pmConfigs *voltha.PmConfigs) error {
462 groupSliceIdx := 0
463 var group *voltha.PmGroupConfig
464
465 for groupSliceIdx, group = range pmConfigs.Groups {
466 if group.GroupName == aGroupName {
467 break
468 }
469 }
470 if group == nil {
471 logger.Errorw(ctx, "group metric not found", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": aGroupName})
472 return fmt.Errorf("group-not-found--groupName-%s", aGroupName)
473 }
474
475 updated := false
476 mm.onuMetricsManagerLock.Lock()
477 defer mm.onuMetricsManagerLock.Unlock()
478 for k, v := range mm.groupMetricMap {
479 if k == aGroupName && v.enabled != group.Enabled {
480 mm.pDeviceHandler.pmConfigs.Groups[groupSliceIdx].Enabled = group.Enabled
481 v.enabled = group.Enabled
Girish Gowdrae0140f02021-02-02 16:55:09 -0800482 if group.Enabled {
483 if v.isL2PMCounter {
484 // If it is a L2 PM counter we need to mark the PM to be added
Girish Gowdra5c5aaf42021-02-17 19:40:50 -0800485 mm.l2PmToAdd = mm.appendIfMissingString(mm.l2PmToAdd, v.groupName)
Girish Gowdrae0140f02021-02-02 16:55:09 -0800486 // If the group support flag toggles too soon, we need to delete the group name from l2PmToDelete slice
Girish Gowdra5c5aaf42021-02-17 19:40:50 -0800487 mm.l2PmToDelete = mm.removeIfFoundString(mm.l2PmToDelete, v.groupName)
488
489 // The GemPortHistory group requires some special handling as the instance IDs are not pre-defined
490 // unlike other L2 PM counters. We need to fetch the active gemport instance IDs in the system to
491 // take further action
492 if v.groupName == GemPortHistoryName {
Girish Gowdra36ccf7d2021-02-25 20:42:51 -0800493 mm.updateGemPortNTPInstanceToAddForPerfMonitoring(ctx)
Girish Gowdra5c5aaf42021-02-17 19:40:50 -0800494 }
Girish Gowdrae0140f02021-02-02 16:55:09 -0800495 } else if mm.pDeviceHandler.pmConfigs.FreqOverride { // otherwise just update the next collection interval
496 v.nextCollectionInterval = time.Now().Add(time.Duration(v.frequency) * time.Second)
497 }
498 } else { // group counter is disabled
499 if v.isL2PMCounter {
500 // If it is a L2 PM counter we need to mark the PM to be deleted
Girish Gowdra5c5aaf42021-02-17 19:40:50 -0800501 mm.l2PmToDelete = mm.appendIfMissingString(mm.l2PmToDelete, v.groupName)
Girish Gowdrae0140f02021-02-02 16:55:09 -0800502 // If the group support flag toggles too soon, we need to delete the group name from l2PmToAdd slice
Girish Gowdra5c5aaf42021-02-17 19:40:50 -0800503 mm.l2PmToAdd = mm.removeIfFoundString(mm.l2PmToAdd, v.groupName)
504
505 // The GemPortHistory group requires some special handling as the instance IDs are not pre-defined
506 // unlike other L2 PM counters. We need to fetch the active gemport instance IDs in the system to
507 // take further action
508 if v.groupName == GemPortHistoryName {
Girish Gowdra36ccf7d2021-02-25 20:42:51 -0800509 mm.updateGemPortNTPInstanceToDeleteForPerfMonitoring(ctx)
Girish Gowdra5c5aaf42021-02-17 19:40:50 -0800510 }
Girish Gowdrae0140f02021-02-02 16:55:09 -0800511 }
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800512 }
513 updated = true
Girish Gowdrae0140f02021-02-02 16:55:09 -0800514 if v.isL2PMCounter {
515 logger.Infow(ctx, "l2 pm group metric support updated",
516 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": aGroupName, "enabled": group.Enabled, "l2PmToAdd": mm.l2PmToAdd, "l2PmToDelete": mm.l2PmToDelete})
517 } else {
518 logger.Infow(ctx, "group metric support updated", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": aGroupName, "enabled": group.Enabled})
519 }
520 break
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800521 }
522 }
523
524 if !updated {
525 logger.Errorw(ctx, "group metric support not updated", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": aGroupName})
526 return fmt.Errorf("internal-error-during-group-support-update--groupName-%s", aGroupName)
527 }
528 return nil
529}
530
531func (mm *onuMetricsManager) updateMetricSupport(ctx context.Context, aMetricName string, pmConfigs *voltha.PmConfigs) error {
532 metricSliceIdx := 0
533 var metric *voltha.PmConfig
534
535 for metricSliceIdx, metric = range pmConfigs.Metrics {
536 if metric.Name == aMetricName {
537 break
538 }
539 }
540
541 if metric == nil {
542 logger.Errorw(ctx, "standalone metric not found", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "metricName": aMetricName})
543 return fmt.Errorf("metric-not-found--metricname-%s", aMetricName)
544 }
545
546 updated := false
547 mm.onuMetricsManagerLock.Lock()
548 defer mm.onuMetricsManagerLock.Unlock()
549 for k, v := range mm.standaloneMetricMap {
550 if k == aMetricName && v.enabled != metric.Enabled {
551 mm.pDeviceHandler.pmConfigs.Metrics[metricSliceIdx].Enabled = metric.Enabled
552 v.enabled = metric.Enabled
553 // If the standalone metric is now enabled and frequency override is enabled, set the next metric collection time
554 if metric.Enabled && mm.pDeviceHandler.pmConfigs.FreqOverride {
555 v.nextCollectionInterval = time.Now().Add(time.Duration(v.frequency) * time.Second)
556 }
557 updated = true
558 logger.Infow(ctx, "standalone metric support updated", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "metricName": aMetricName, "enabled": metric.Enabled})
Girish Gowdra36ccf7d2021-02-25 20:42:51 -0800559 break
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800560 }
561 }
562 if !updated {
563 logger.Errorw(ctx, "standalone metric support not updated", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "metricName": aMetricName})
564 return fmt.Errorf("internal-error-during-standalone-support-update--metricname-%s", aMetricName)
565 }
566 return nil
567}
568
569func (mm *onuMetricsManager) collectAllGroupAndStandaloneMetrics(ctx context.Context) {
570 if mm.pDeviceHandler.pmConfigs.Grouped { // metrics are managed as a group.
571 go mm.collectAllGroupMetrics(ctx)
572 } else {
573 go mm.collectAllStandaloneMetrics(ctx)
574 }
575}
576
577func (mm *onuMetricsManager) collectAllGroupMetrics(ctx context.Context) {
578 go func() {
579 logger.Debug(ctx, "startCollector before collecting optical metrics")
580 metricInfo := mm.collectOpticalMetrics(ctx)
581 if metricInfo != nil {
582 mm.publishMetrics(ctx, metricInfo)
583 }
584 }()
585
586 go func() {
587 logger.Debug(ctx, "startCollector before collecting uni metrics")
588 metricInfo := mm.collectUniStatusMetrics(ctx)
589 if metricInfo != nil {
590 mm.publishMetrics(ctx, metricInfo)
591 }
592 }()
593
594 // Add more here
595}
596
597func (mm *onuMetricsManager) collectAllStandaloneMetrics(ctx context.Context) {
598 // None exists as of now, add when available here
599}
600
601func (mm *onuMetricsManager) collectGroupMetric(ctx context.Context, groupName string) {
602 switch groupName {
603 case OpticalPowerGroupMetricName:
604 go func() {
605 if mi := mm.collectOpticalMetrics(ctx); mm != nil {
606 mm.publishMetrics(ctx, mi)
607 }
608 }()
609 case UniStatusGroupMetricName:
610 go func() {
611 if mi := mm.collectUniStatusMetrics(ctx); mm != nil {
612 mm.publishMetrics(ctx, mi)
613 }
614 }()
615 default:
616 logger.Errorw(ctx, "unhandled group metric name", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": groupName})
617 }
618}
619
620func (mm *onuMetricsManager) collectStandaloneMetric(ctx context.Context, metricName string) {
621 switch metricName {
622 // None exist as of now, add when available
623 default:
624 logger.Errorw(ctx, "unhandled standalone metric name", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "metricName": metricName})
625 }
626}
627
628// collectOpticalMetrics collects groups metrics related to optical power from ani-g ME.
Girish Gowdrae09a6202021-01-12 18:10:59 -0800629func (mm *onuMetricsManager) collectOpticalMetrics(ctx context.Context) []*voltha.MetricInformation {
630 logger.Debugw(ctx, "collectOpticalMetrics", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800631
632 mm.onuMetricsManagerLock.RLock()
633 if !mm.groupMetricMap[OpticalPowerGroupMetricName].enabled {
634 mm.onuMetricsManagerLock.RUnlock()
635 logger.Debugw(ctx, "optical power group metric is not enabled", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
636 return nil
637 }
638 mm.onuMetricsManagerLock.RUnlock()
639
Girish Gowdrae09a6202021-01-12 18:10:59 -0800640 var metricInfoSlice []*voltha.MetricInformation
641 metricsContext := make(map[string]string)
642 metricsContext["onuID"] = fmt.Sprintf("%d", mm.pDeviceHandler.device.ProxyAddress.OnuId)
643 metricsContext["intfID"] = fmt.Sprintf("%d", mm.pDeviceHandler.device.ProxyAddress.ChannelId)
644 metricsContext["devicetype"] = mm.pDeviceHandler.DeviceType
645
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800646 raisedTs := time.Now().Unix()
Girish Gowdrae09a6202021-01-12 18:10:59 -0800647 mmd := voltha.MetricMetaData{
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800648 Title: OpticalPowerGroupMetricName,
Girish Gowdrae09a6202021-01-12 18:10:59 -0800649 Ts: float64(raisedTs),
650 Context: metricsContext,
651 DeviceId: mm.pDeviceHandler.deviceID,
652 LogicalDeviceId: mm.pDeviceHandler.logicalDeviceID,
653 SerialNo: mm.pDeviceHandler.device.SerialNumber,
654 }
655
Girish Gowdrae09a6202021-01-12 18:10:59 -0800656 // get the ANI-G instance IDs
657 anigInstKeys := mm.pDeviceHandler.pOnuOmciDevice.pOnuDB.getSortedInstKeys(ctx, me.AniGClassID)
658loop:
659 for _, anigInstID := range anigInstKeys {
660 var meAttributes me.AttributeValueMap
661 opticalMetrics := make(map[string]float32)
662 // Get the ANI-G instance optical power attributes
663 requestedAttributes := me.AttributeValueMap{"OpticalSignalLevel": 0, "TransmitOpticalLevel": 0}
Girish Gowdra0b235842021-03-09 13:06:46 -0800664 if meInstance := mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendGetMe(ctx, me.AniGClassID, anigInstID, requestedAttributes, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, mm.pAdaptFsm.commChan); meInstance != nil {
Girish Gowdrae09a6202021-01-12 18:10:59 -0800665 select {
666 case meAttributes = <-mm.opticalMetricsChan:
667 logger.Debugw(ctx, "received optical metrics", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
Girish Gowdra0b235842021-03-09 13:06:46 -0800668 case <-time.After(time.Duration(mm.pDeviceHandler.pOpenOnuAc.omciTimeout) * time.Second):
Girish Gowdra36ccf7d2021-02-25 20:42:51 -0800669 logger.Errorw(ctx, "timeout waiting for omci-get response for optical metrics", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
Girish Gowdrae09a6202021-01-12 18:10:59 -0800670 // The metrics will be empty in this case
671 break loop
672 }
673 // Populate metric only if it was enabled.
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800674 for k := range OpticalPowerGroupMetrics {
675 switch k {
676 case "ani_g_instance_id":
677 if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
678 opticalMetrics[k] = float32(val.(uint16))
679 }
Girish Gowdrae20a4f62021-03-09 16:06:23 -0800680 case "transmit_power_dBm":
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800681 if val, ok := meAttributes["TransmitOpticalLevel"]; ok && val != nil {
Girish Gowdrae20a4f62021-03-09 16:06:23 -0800682 opticalMetrics[k] = float32(math.Round((float64(mm.twosComplementToSignedInt16(val.(uint16)))/500.0)*10) / 10) // convert to dBm rounded of to single decimal place
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800683 }
Girish Gowdrae20a4f62021-03-09 16:06:23 -0800684 case "receive_power_dBm":
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800685 if val, ok := meAttributes["OpticalSignalLevel"]; ok && val != nil {
Girish Gowdrae20a4f62021-03-09 16:06:23 -0800686 opticalMetrics[k] = float32(math.Round((float64(mm.twosComplementToSignedInt16(val.(uint16)))/500.0)*10) / 10) // convert to dBm rounded of to single decimal place
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800687 }
Girish Gowdrae09a6202021-01-12 18:10:59 -0800688 default:
689 // do nothing
690 }
691 }
692 }
693 // create slice of metrics given that there could be more than one ANI-G instance and
694 // optical metrics are collected per ANI-G instance
695 metricInfo := voltha.MetricInformation{Metadata: &mmd, Metrics: opticalMetrics}
696 metricInfoSlice = append(metricInfoSlice, &metricInfo)
697 }
698
699 return metricInfoSlice
700}
701
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800702// collectUniStatusMetrics collects UNI status group metric from various MEs (uni-g, pptp and veip).
Girish Gowdrae09a6202021-01-12 18:10:59 -0800703// nolint: gocyclo
704func (mm *onuMetricsManager) collectUniStatusMetrics(ctx context.Context) []*voltha.MetricInformation {
705 logger.Debugw(ctx, "collectUniStatusMetrics", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800706 mm.onuMetricsManagerLock.RLock()
707 if !mm.groupMetricMap[UniStatusGroupMetricName].enabled {
708 mm.onuMetricsManagerLock.RUnlock()
709 logger.Debugw(ctx, "uni status group metric is not enabled", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
710 return nil
711 }
712 mm.onuMetricsManagerLock.RUnlock()
713
Girish Gowdrae09a6202021-01-12 18:10:59 -0800714 var metricInfoSlice []*voltha.MetricInformation
715 metricsContext := make(map[string]string)
716 metricsContext["onuID"] = fmt.Sprintf("%d", mm.pDeviceHandler.device.ProxyAddress.OnuId)
717 metricsContext["intfID"] = fmt.Sprintf("%d", mm.pDeviceHandler.device.ProxyAddress.ChannelId)
718 metricsContext["devicetype"] = mm.pDeviceHandler.DeviceType
719
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800720 raisedTs := time.Now().Unix()
Girish Gowdrae09a6202021-01-12 18:10:59 -0800721 mmd := voltha.MetricMetaData{
722 Title: "UniStatus", // Is this ok to hard code?
723 Ts: float64(raisedTs),
724 Context: metricsContext,
725 DeviceId: mm.pDeviceHandler.deviceID,
726 LogicalDeviceId: mm.pDeviceHandler.logicalDeviceID,
727 SerialNo: mm.pDeviceHandler.device.SerialNumber,
728 }
729
Girish Gowdrae09a6202021-01-12 18:10:59 -0800730 // get the UNI-G instance IDs
731 unigInstKeys := mm.pDeviceHandler.pOnuOmciDevice.pOnuDB.getSortedInstKeys(ctx, me.UniGClassID)
732loop1:
733 for _, unigInstID := range unigInstKeys {
734 // TODO: Include additional information in the voltha.MetricMetaData - like portno, uni-id, instance-id
735 // to uniquely identify this ME instance and also to correlate the ME instance to physical instance
736 unigMetrics := make(map[string]float32)
737 var meAttributes me.AttributeValueMap
738 // Get the UNI-G instance optical power attributes
739 requestedAttributes := me.AttributeValueMap{"AdministrativeState": 0}
Girish Gowdra0b235842021-03-09 13:06:46 -0800740 if meInstance := mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendGetMe(ctx, me.UniGClassID, unigInstID, requestedAttributes, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, mm.pAdaptFsm.commChan); meInstance != nil {
Girish Gowdrae09a6202021-01-12 18:10:59 -0800741 // Wait for metrics or timeout
742 select {
743 case meAttributes = <-mm.uniStatusMetricsChan:
744 logger.Debugw(ctx, "received uni-g metrics", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
Girish Gowdra0b235842021-03-09 13:06:46 -0800745 case <-time.After(time.Duration(mm.pDeviceHandler.pOpenOnuAc.omciTimeout) * time.Second):
Girish Gowdrae09a6202021-01-12 18:10:59 -0800746 logger.Errorw(ctx, "timeout waiting for omci-get response for uni status", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
747 // The metrics could be empty in this case
748 break loop1
749 }
750 // Populate metric only if it was enabled.
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800751 for k := range UniStatusGroupMetrics {
752 switch k {
Girish Gowdrae09a6202021-01-12 18:10:59 -0800753 case "uni_admin_state":
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800754 if val, ok := meAttributes["AdministrativeState"]; ok && val != nil {
755 unigMetrics[k] = float32(val.(byte))
756 }
Girish Gowdrae09a6202021-01-12 18:10:59 -0800757 default:
758 // do nothing
759 }
760 }
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800761 if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
Girish Gowdra0e533642021-03-02 22:02:51 -0800762 entityID := val.(uint16)
763 unigMetrics["entity_id"] = float32(entityID)
764 // TODO: Rlock needed for reading uniEntityMap? May not be needed given uniEntityMap is populated setup at initial ONU bring up
765 for _, uni := range mm.pDeviceHandler.uniEntityMap {
766 if uni.entityID == entityID {
767 unigMetrics["uni_port_no"] = float32(uni.portNo)
Girish Gowdrada3a52f2021-03-17 11:24:11 -0700768 break
Girish Gowdra0e533642021-03-02 22:02:51 -0800769 }
770 }
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800771 }
Girish Gowdrada3a52f2021-03-17 11:24:11 -0700772 unigMetrics["me_class_id"] = float32(me.UniGClassID)
Girish Gowdra0e533642021-03-02 22:02:51 -0800773
Girish Gowdrae09a6202021-01-12 18:10:59 -0800774 // create slice of metrics given that there could be more than one UNI-G instance
775 metricInfo := voltha.MetricInformation{Metadata: &mmd, Metrics: unigMetrics}
776 metricInfoSlice = append(metricInfoSlice, &metricInfo)
777 }
778 }
779
780 // get the PPTP instance IDs
781 pptpInstKeys := mm.pDeviceHandler.pOnuOmciDevice.pOnuDB.getSortedInstKeys(ctx, me.PhysicalPathTerminationPointEthernetUniClassID)
782loop2:
783 for _, pptpInstID := range pptpInstKeys {
784 // TODO: Include additional information in the voltha.MetricMetaData - like portno, uni-id, instance-id
785 // to uniquely identify this ME instance and also to correlate the ME instance to physical instance
786 var meAttributes me.AttributeValueMap
787 pptpMetrics := make(map[string]float32)
788
789 requestedAttributes := me.AttributeValueMap{"SensedType": 0, "OperationalState": 0, "AdministrativeState": 0}
Girish Gowdra0b235842021-03-09 13:06:46 -0800790 if meInstance := mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendGetMe(ctx, me.PhysicalPathTerminationPointEthernetUniClassID, pptpInstID, requestedAttributes, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, mm.pAdaptFsm.commChan); meInstance != nil {
Girish Gowdrae09a6202021-01-12 18:10:59 -0800791 // Wait for metrics or timeout
792 select {
793 case meAttributes = <-mm.uniStatusMetricsChan:
794 logger.Debugw(ctx, "received pptp metrics", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
Girish Gowdra0b235842021-03-09 13:06:46 -0800795 case <-time.After(time.Duration(mm.pDeviceHandler.pOpenOnuAc.omciTimeout) * time.Second):
Girish Gowdrae09a6202021-01-12 18:10:59 -0800796 logger.Errorw(ctx, "timeout waiting for omci-get response for uni status", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
797 // The metrics could be empty in this case
798 break loop2
799 }
800
801 // Populate metric only if it was enabled.
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800802 for k := range UniStatusGroupMetrics {
803 switch k {
Girish Gowdrada3a52f2021-03-17 11:24:11 -0700804 case "sensed_type":
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800805 if val, ok := meAttributes["SensedType"]; ok && val != nil {
806 pptpMetrics[k] = float32(val.(byte))
807 }
Girish Gowdrae09a6202021-01-12 18:10:59 -0800808 case "oper_status":
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800809 if val, ok := meAttributes["OperationalState"]; ok && val != nil {
810 pptpMetrics[k] = float32(val.(byte))
811 }
Girish Gowdrae09a6202021-01-12 18:10:59 -0800812 case "uni_admin_state":
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800813 if val, ok := meAttributes["AdministrativeState"]; ok && val != nil {
814 pptpMetrics[k] = float32(val.(byte))
815 }
Girish Gowdrae09a6202021-01-12 18:10:59 -0800816 default:
817 // do nothing
818 }
819 }
820 }
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800821 if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
Girish Gowdra0e533642021-03-02 22:02:51 -0800822 entityID := val.(uint16)
823 pptpMetrics["entity_id"] = float32(entityID)
824 // TODO: Rlock needed for reading uniEntityMap? May not be needed given uniEntityMap is populated setup at initial ONU bring up
825 for _, uni := range mm.pDeviceHandler.uniEntityMap {
826 if uni.entityID == entityID {
827 pptpMetrics["uni_port_no"] = float32(uni.portNo)
Girish Gowdrada3a52f2021-03-17 11:24:11 -0700828 break
Girish Gowdra0e533642021-03-02 22:02:51 -0800829 }
830 }
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800831 }
Girish Gowdrada3a52f2021-03-17 11:24:11 -0700832 pptpMetrics["me_class_id"] = float32(me.PhysicalPathTerminationPointEthernetUniClassID)
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800833
Girish Gowdrae09a6202021-01-12 18:10:59 -0800834 // create slice of metrics given that there could be more than one PPTP instance and
835 metricInfo := voltha.MetricInformation{Metadata: &mmd, Metrics: pptpMetrics}
836 metricInfoSlice = append(metricInfoSlice, &metricInfo)
837 }
838
839 // get the VEIP instance IDs
840 veipInstKeys := mm.pDeviceHandler.pOnuOmciDevice.pOnuDB.getSortedInstKeys(ctx, me.VirtualEthernetInterfacePointClassID)
841loop3:
842 for _, veipInstID := range veipInstKeys {
843 // TODO: Include additional information in the voltha.MetricMetaData - like portno, uni-id, instance-id
844 // to uniquely identify this ME instance and also to correlate the ME instance to physical instance
845 var meAttributes me.AttributeValueMap
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800846 veipMetrics := make(map[string]float32)
Girish Gowdrae09a6202021-01-12 18:10:59 -0800847
848 requestedAttributes := me.AttributeValueMap{"OperationalState": 0, "AdministrativeState": 0}
Girish Gowdra0b235842021-03-09 13:06:46 -0800849 if meInstance := mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendGetMe(ctx, me.VirtualEthernetInterfacePointClassID, veipInstID, requestedAttributes, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, mm.pAdaptFsm.commChan); meInstance != nil {
Girish Gowdrae09a6202021-01-12 18:10:59 -0800850 // Wait for metrics or timeout
851 select {
852 case meAttributes = <-mm.uniStatusMetricsChan:
853 logger.Debugw(ctx, "received veip metrics", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
Girish Gowdra0b235842021-03-09 13:06:46 -0800854 case <-time.After(time.Duration(mm.pDeviceHandler.pOpenOnuAc.omciTimeout) * time.Second):
Girish Gowdrae09a6202021-01-12 18:10:59 -0800855 logger.Errorw(ctx, "timeout waiting for omci-get response for uni status", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
856 // The metrics could be empty in this case
857 break loop3
858 }
859
860 // Populate metric only if it was enabled.
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800861 for k := range UniStatusGroupMetrics {
862 switch k {
Girish Gowdrae09a6202021-01-12 18:10:59 -0800863 case "oper_status":
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800864 if val, ok := meAttributes["OperationalState"]; ok && val != nil {
865 veipMetrics[k] = float32(val.(byte))
866 }
Girish Gowdrae09a6202021-01-12 18:10:59 -0800867 case "uni_admin_state":
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800868 if val, ok := meAttributes["AdministrativeState"]; ok && val != nil {
869 veipMetrics[k] = float32(val.(byte))
870 }
Girish Gowdrae09a6202021-01-12 18:10:59 -0800871 default:
872 // do nothing
873 }
874 }
875 }
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800876
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800877 if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
Girish Gowdra0e533642021-03-02 22:02:51 -0800878 entityID := val.(uint16)
879 veipMetrics["entity_id"] = float32(entityID)
880 // TODO: Rlock needed for reading uniEntityMap? May not be needed given uniEntityMap is populated setup at initial ONU bring up
881 for _, uni := range mm.pDeviceHandler.uniEntityMap {
882 if uni.entityID == entityID {
883 veipMetrics["uni_port_no"] = float32(uni.portNo)
Girish Gowdrada3a52f2021-03-17 11:24:11 -0700884 break
Girish Gowdra0e533642021-03-02 22:02:51 -0800885 }
886 }
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800887 }
Girish Gowdrada3a52f2021-03-17 11:24:11 -0700888 veipMetrics["me_class_id"] = float32(me.VirtualEthernetInterfacePointClassID)
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800889
Girish Gowdrae09a6202021-01-12 18:10:59 -0800890 // create slice of metrics given that there could be more than one VEIP instance
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800891 metricInfo := voltha.MetricInformation{Metadata: &mmd, Metrics: veipMetrics}
Girish Gowdrae09a6202021-01-12 18:10:59 -0800892 metricInfoSlice = append(metricInfoSlice, &metricInfo)
893 }
894
895 return metricInfoSlice
896}
897
898// publishMetrics publishes the metrics on kafka
899func (mm *onuMetricsManager) publishMetrics(ctx context.Context, metricInfo []*voltha.MetricInformation) {
900 var ke voltha.KpiEvent2
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800901 ts := time.Now().Unix()
Girish Gowdrae09a6202021-01-12 18:10:59 -0800902 ke.SliceData = metricInfo
903 ke.Type = voltha.KpiEventType_slice
904 ke.Ts = float64(ts)
905
906 if err := mm.pDeviceHandler.EventProxy.SendKpiEvent(ctx, "STATS_EVENT", &ke, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_ONU, ts); err != nil {
907 logger.Errorw(ctx, "failed-to-send-pon-stats", log.Fields{"err": err})
908 }
909}
910
911func (mm *onuMetricsManager) processOmciMessages(ctx context.Context) {
912 logger.Infow(ctx, "Start routine to process OMCI-GET messages for device-id", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
913 // Flush metric collection channels to be safe.
914 // It is possible that there is stale data on this channel if the processOmciMessages routine
915 // is stopped right after issuing a OMCI-GET request and started again.
916 // The processOmciMessages routine will get stopped if startCollector routine (in device_handler.go)
917 // is stopped - as a result of ONU going down.
918 mm.flushMetricCollectionChannels(ctx)
919
920 for {
921 select {
922 case <-mm.stopProcessingOmciResponses: // stop this routine
923 logger.Infow(ctx, "Stop routine to process OMCI-GET messages for device-id", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
924 return
Girish Gowdrae0140f02021-02-02 16:55:09 -0800925 case message, ok := <-mm.pAdaptFsm.commChan:
Girish Gowdrae09a6202021-01-12 18:10:59 -0800926 if !ok {
927 logger.Errorw(ctx, "Message couldn't be read from channel", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
928 continue
929 }
930 logger.Debugw(ctx, "Received message on ONU metrics channel", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
931
932 switch message.Type {
933 case OMCI:
934 msg, _ := message.Data.(OmciMessage)
935 mm.handleOmciMessage(ctx, msg)
936 default:
937 logger.Warn(ctx, "Unknown message type received", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "message.Type": message.Type})
938 }
939 }
940 }
941}
942
943func (mm *onuMetricsManager) handleOmciMessage(ctx context.Context, msg OmciMessage) {
944 logger.Debugw(ctx, "omci Msg", log.Fields{"device-id": mm.pDeviceHandler.deviceID,
945 "msgType": msg.OmciMsg.MessageType, "msg": msg})
946 switch msg.OmciMsg.MessageType {
947 case omci.GetResponseType:
948 //TODO: error handling
949 _ = mm.handleOmciGetResponseMessage(ctx, msg)
Girish Gowdrae0140f02021-02-02 16:55:09 -0800950 case omci.SynchronizeTimeResponseType:
951 _ = mm.handleOmciSynchronizeTimeResponseMessage(ctx, msg)
952 case omci.CreateResponseType:
953 _ = mm.handleOmciCreateResponseMessage(ctx, msg)
954 case omci.DeleteResponseType:
955 _ = mm.handleOmciDeleteResponseMessage(ctx, msg)
Girish Gowdrae09a6202021-01-12 18:10:59 -0800956 default:
957 logger.Warnw(ctx, "Unknown Message Type", log.Fields{"msgType": msg.OmciMsg.MessageType})
958
959 }
960}
961
962func (mm *onuMetricsManager) handleOmciGetResponseMessage(ctx context.Context, msg OmciMessage) error {
963 msgLayer := (*msg.OmciPacket).Layer(omci.LayerTypeGetResponse)
964 if msgLayer == nil {
965 logger.Errorw(ctx, "omci Msg layer could not be detected for GetResponse - handling stopped", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
966 return fmt.Errorf("omci Msg layer could not be detected for GetResponse - handling stopped: %s", mm.pDeviceHandler.deviceID)
967 }
968 msgObj, msgOk := msgLayer.(*omci.GetResponse)
969 if !msgOk {
970 logger.Errorw(ctx, "omci Msg layer could not be assigned for GetResponse - handling stopped", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
971 return fmt.Errorf("omci Msg layer could not be assigned for GetResponse - handling stopped: %s", mm.pDeviceHandler.deviceID)
972 }
973 logger.Debugw(ctx, "OMCI GetResponse Data", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "data-fields": msgObj})
974 if msgObj.Result == me.Success {
975 meAttributes := msgObj.Attributes
976 switch msgObj.EntityClass {
977 case me.AniGClassID:
978 mm.opticalMetricsChan <- meAttributes
979 return nil
980 case me.UniGClassID:
981 mm.uniStatusMetricsChan <- meAttributes
982 return nil
983 case me.PhysicalPathTerminationPointEthernetUniClassID:
984 mm.uniStatusMetricsChan <- meAttributes
985 return nil
986 case me.VirtualEthernetInterfacePointClassID:
987 mm.uniStatusMetricsChan <- meAttributes
988 return nil
Girish Gowdrae0140f02021-02-02 16:55:09 -0800989 case me.EthernetFramePerformanceMonitoringHistoryDataUpstreamClassID,
990 me.EthernetFramePerformanceMonitoringHistoryDataDownstreamClassID,
Girish Gowdra5c5aaf42021-02-17 19:40:50 -0800991 me.EthernetPerformanceMonitoringHistoryDataClassID,
992 me.FecPerformanceMonitoringHistoryDataClassID,
993 me.GemPortNetworkCtpPerformanceMonitoringHistoryDataClassID:
Girish Gowdrae0140f02021-02-02 16:55:09 -0800994 mm.l2PmChan <- meAttributes
Girish Gowdrae09a6202021-01-12 18:10:59 -0800995 default:
996 logger.Errorw(ctx, "unhandled omci get response message",
997 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "class-id": msgObj.EntityClass})
998 }
999 }
1000
Girish Gowdrae0140f02021-02-02 16:55:09 -08001001 return fmt.Errorf("unhandled-omci-get-response-message")
1002}
1003
1004func (mm *onuMetricsManager) handleOmciSynchronizeTimeResponseMessage(ctx context.Context, msg OmciMessage) error {
1005 msgLayer := (*msg.OmciPacket).Layer(omci.LayerTypeSynchronizeTimeResponse)
1006 if msgLayer == nil {
1007 logger.Errorw(ctx, "omci Msg layer could not be detected for synchronize time response - handling stopped", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
1008 return fmt.Errorf("omci Msg layer could not be detected for synchronize time response - handling stopped: %s", mm.pDeviceHandler.deviceID)
1009 }
1010 msgObj, msgOk := msgLayer.(*omci.SynchronizeTimeResponse)
1011 if !msgOk {
1012 logger.Errorw(ctx, "omci Msg layer could not be assigned for synchronize time response - handling stopped", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
1013 return fmt.Errorf("omci Msg layer could not be assigned for synchronize time response - handling stopped: %s", mm.pDeviceHandler.deviceID)
1014 }
1015 logger.Debugw(ctx, "OMCI synchronize time response Data", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "data-fields": msgObj})
1016 if msgObj.Result == me.Success {
1017 switch msgObj.EntityClass {
1018 case me.OnuGClassID:
1019 logger.Infow(ctx, "omci synchronize time success", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
1020 mm.syncTimeResponseChan <- true
1021 return nil
1022 default:
1023 logger.Errorw(ctx, "unhandled omci message",
1024 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "class-id": msgObj.EntityClass})
1025 }
1026 }
1027 mm.syncTimeResponseChan <- false
1028 logger.Errorf(ctx, "unhandled-omci-synchronize-time-response-message--error-code-%v", msgObj.Result)
1029 return fmt.Errorf("unhandled-omci-synchronize-time-response-message--error-code-%v", msgObj.Result)
Girish Gowdrae09a6202021-01-12 18:10:59 -08001030}
1031
1032// flushMetricCollectionChannels flushes all metric collection channels for any stale OMCI responses
1033func (mm *onuMetricsManager) flushMetricCollectionChannels(ctx context.Context) {
1034 // flush commMetricsChan
1035 select {
Girish Gowdrae0140f02021-02-02 16:55:09 -08001036 case <-mm.pAdaptFsm.commChan:
Girish Gowdrae09a6202021-01-12 18:10:59 -08001037 logger.Debug(ctx, "flushed common metrics channel")
1038 default:
1039 }
1040
1041 // flush opticalMetricsChan
1042 select {
1043 case <-mm.opticalMetricsChan:
1044 logger.Debug(ctx, "flushed optical metrics channel")
1045 default:
1046 }
1047
1048 // flush uniStatusMetricsChan
1049 select {
1050 case <-mm.uniStatusMetricsChan:
1051 logger.Debug(ctx, "flushed uni status metrics channel")
1052 default:
1053 }
Girish Gowdrae0140f02021-02-02 16:55:09 -08001054
1055 // flush syncTimeResponseChan
1056 select {
1057 case <-mm.syncTimeResponseChan:
1058 logger.Debug(ctx, "flushed sync time response channel")
1059 default:
1060 }
1061
1062 // flush l2PmChan
1063 select {
1064 case <-mm.l2PmChan:
1065 logger.Debug(ctx, "flushed L2 PM collection channel")
1066 default:
1067 }
1068
1069 // flush stopTicks
1070 select {
1071 case <-mm.stopTicks:
1072 logger.Debug(ctx, "flushed stopTicks channel")
1073 default:
1074 }
1075
1076}
1077
1078// ** L2 PM FSM Handlers start **
1079
1080func (mm *onuMetricsManager) l2PMFsmStarting(ctx context.Context, e *fsm.Event) {
Girish Gowdra0e533642021-03-02 22:02:51 -08001081 // restore data from KV store
1082 if err := mm.restorePmData(ctx); err != nil {
1083 logger.Errorw(ctx, "error restoring pm data", log.Fields{"err": err})
1084 // we continue given that it does not effect the actual services for the ONU,
1085 // but there may be some negative effect on PM collection (there may be some mismatch in
1086 // the actual PM config and what is present on the device).
1087 }
1088
Girish Gowdrae0140f02021-02-02 16:55:09 -08001089 // Loop through all the group metrics
1090 // If it is a L2 PM Interval metric and it is enabled, then if it is not in the
1091 // list of active L2 PM list then mark it for creation
1092 // It it is a L2 PM Interval metric and it is disabled, then if it is in the
1093 // list of active L2 PM list then mark it for deletion
1094 mm.onuMetricsManagerLock.Lock()
1095 for n, g := range mm.groupMetricMap {
1096 if g.isL2PMCounter { // it is a l2 pm counter
1097 if g.enabled { // metric enabled.
1098 found := false
1099 inner1:
1100 for _, v := range mm.activeL2Pms {
1101 if v == n {
1102 found = true // metric already present in active l2 pm list
1103 break inner1
1104 }
1105 }
1106 if !found { // metric not in active l2 pm list. Mark this to be added later
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001107 mm.l2PmToAdd = mm.appendIfMissingString(mm.l2PmToAdd, n)
Girish Gowdrae0140f02021-02-02 16:55:09 -08001108 }
1109 } else { // metric not enabled.
1110 found := false
1111 inner2:
1112 for _, v := range mm.activeL2Pms {
1113 if v == n {
1114 found = true // metric is found in active l2 pm list
1115 break inner2
1116 }
1117 }
1118 if found { // metric is found in active l2 pm list. Mark this to be deleted later
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001119 mm.l2PmToDelete = mm.appendIfMissingString(mm.l2PmToDelete, n)
Girish Gowdrae0140f02021-02-02 16:55:09 -08001120 }
1121 }
1122 }
1123 }
1124 mm.onuMetricsManagerLock.Unlock()
1125 logger.Debugw(ctx, "pms to add and delete",
1126 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "pms-to-add": mm.l2PmToAdd, "pms-to-delete": mm.l2PmToDelete})
1127 go func() {
1128 // push a tick event to move to next state
1129 if err := mm.pAdaptFsm.pFsm.Event(l2PmEventTick); err != nil {
1130 logger.Errorw(ctx, "error calling event", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "err": err})
1131 }
1132 }()
1133}
1134
1135func (mm *onuMetricsManager) l2PMFsmSyncTime(ctx context.Context, e *fsm.Event) {
1136 // Sync time with the ONU to establish 15min boundary for PM collection.
1137 if err := mm.syncTime(ctx); err != nil {
1138 go func() {
1139 time.Sleep(SyncTimeRetryInterval * time.Second) // retry to sync time after this timeout
1140 // This will result in FSM attempting to sync time again
1141 if err := mm.pAdaptFsm.pFsm.Event(l2PmEventFailure); err != nil {
1142 logger.Errorw(ctx, "error calling event", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "err": err})
1143 }
1144 }()
1145 }
1146 // Initiate a tick generation routine every L2PmCollectionInterval
1147 go mm.generateTicks(ctx)
1148
1149 go func() {
1150 if err := mm.pAdaptFsm.pFsm.Event(l2PmEventSuccess); err != nil {
1151 logger.Errorw(ctx, "error calling event", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "err": err})
1152 }
1153 }()
1154}
1155
1156func (mm *onuMetricsManager) l2PMFsmNull(ctx context.Context, e *fsm.Event) {
1157 // We need to reset the local data so that the L2 PM MEs are re-provisioned once the ONU is back up based on the latest PM CONFIG
1158 mm.onuMetricsManagerLock.Lock()
1159 mm.activeL2Pms = nil
1160 mm.l2PmToAdd = nil
1161 mm.l2PmToDelete = nil
1162 mm.onuMetricsManagerLock.Unlock()
Girish Gowdra0e533642021-03-02 22:02:51 -08001163 // If the FSM was stopped, then clear PM data from KV store
1164 // The FSM is stopped when ONU goes down. It is time to clear its data from store
1165 if e.Event == l2PmEventStop {
1166 _ = mm.clearPmGroupData(ctx) // ignore error
1167 }
1168
Girish Gowdrae0140f02021-02-02 16:55:09 -08001169}
1170func (mm *onuMetricsManager) l2PMFsmIdle(ctx context.Context, e *fsm.Event) {
1171 logger.Debugw(ctx, "Enter state idle", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
1172
1173 mm.onuMetricsManagerLock.RLock()
1174 numOfPmToDelete := len(mm.l2PmToDelete)
1175 numOfPmToAdd := len(mm.l2PmToAdd)
1176 mm.onuMetricsManagerLock.RUnlock()
1177
1178 if numOfPmToDelete > 0 {
1179 logger.Debugw(ctx, "state idle - pms to delete", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "pms-to-delete": numOfPmToDelete})
1180 go func() {
1181 if err := mm.pAdaptFsm.pFsm.Event(l2PmEventDeleteMe); err != nil {
1182 logger.Errorw(ctx, "error calling event", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "err": err})
1183 }
1184 }()
1185 } else if numOfPmToAdd > 0 {
1186 logger.Debugw(ctx, "state idle - pms to add", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "pms-to-add": numOfPmToAdd})
1187 go func() {
1188 if err := mm.pAdaptFsm.pFsm.Event(l2PmEventAddMe); err != nil {
1189 logger.Errorw(ctx, "error calling event", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "err": err})
1190 }
1191 }()
1192 }
1193}
1194
1195func (mm *onuMetricsManager) l2PmFsmCollectData(ctx context.Context, e *fsm.Event) {
1196 logger.Debugw(ctx, "state collect data", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
1197 // Copy the activeL2Pms for which we want to collect the metrics since activeL2Pms can change dynamically
1198 mm.onuMetricsManagerLock.RLock()
1199 copyOfActiveL2Pms := make([]string, len(mm.activeL2Pms))
1200 _ = copy(copyOfActiveL2Pms, mm.activeL2Pms)
1201 mm.onuMetricsManagerLock.RUnlock()
1202
1203 for _, n := range copyOfActiveL2Pms {
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001204 var metricInfoSlice []*voltha.MetricInformation
Girish Gowdra0e533642021-03-02 22:02:51 -08001205
1206 // mm.groupMetricMap[n].pmMEData.InstancesActive could dynamically change, so make a copy
1207 mm.onuMetricsManagerLock.RLock()
1208 copyOfEntityIDs := make([]uint16, len(mm.groupMetricMap[n].pmMEData.InstancesActive))
1209 _ = copy(copyOfEntityIDs, mm.groupMetricMap[n].pmMEData.InstancesActive)
1210 mm.onuMetricsManagerLock.RUnlock()
1211
Girish Gowdrae0140f02021-02-02 16:55:09 -08001212 switch n {
1213 case EthernetBridgeHistoryName:
1214 logger.Debugw(ctx, "state collect data - collecting data for EthernetFramePerformanceMonitoringHistoryData ME", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
Girish Gowdra0e533642021-03-02 22:02:51 -08001215 for _, entityID := range copyOfEntityIDs {
Girish Gowdrae0140f02021-02-02 16:55:09 -08001216 if metricInfo := mm.collectEthernetFramePerformanceMonitoringHistoryData(ctx, true, entityID); metricInfo != nil { // upstream
1217 metricInfoSlice = append(metricInfoSlice, metricInfo)
1218 }
1219 if metricInfo := mm.collectEthernetFramePerformanceMonitoringHistoryData(ctx, false, entityID); metricInfo != nil { // downstream
1220 metricInfoSlice = append(metricInfoSlice, metricInfo)
1221 }
1222 }
Girish Gowdrae0140f02021-02-02 16:55:09 -08001223 case EthernetUniHistoryName:
1224 logger.Debugw(ctx, "state collect data - collecting data for EthernetPerformanceMonitoringHistoryData ME", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
Girish Gowdra0e533642021-03-02 22:02:51 -08001225 for _, entityID := range copyOfEntityIDs {
1226 if metricInfo := mm.collectEthernetUniHistoryData(ctx, entityID); metricInfo != nil { // upstream
1227 metricInfoSlice = append(metricInfoSlice, metricInfo)
Girish Gowdrae0140f02021-02-02 16:55:09 -08001228 }
1229 }
Girish Gowdra0e533642021-03-02 22:02:51 -08001230
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001231 case FecHistoryName:
Girish Gowdra0e533642021-03-02 22:02:51 -08001232 for _, entityID := range copyOfEntityIDs {
1233 if metricInfo := mm.collectFecHistoryData(ctx, entityID); metricInfo != nil { // upstream
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001234 metricInfoSlice = append(metricInfoSlice, metricInfo)
Girish Gowdrae0140f02021-02-02 16:55:09 -08001235 }
Girish Gowdrae0140f02021-02-02 16:55:09 -08001236 }
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001237 case GemPortHistoryName:
Girish Gowdra0e533642021-03-02 22:02:51 -08001238 for _, entityID := range copyOfEntityIDs {
1239 if metricInfo := mm.collectGemHistoryData(ctx, entityID); metricInfo != nil { // upstream
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001240 metricInfoSlice = append(metricInfoSlice, metricInfo)
1241 }
1242 }
1243
Girish Gowdrae0140f02021-02-02 16:55:09 -08001244 default:
1245 logger.Errorw(ctx, "unsupported l2 pm", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "name": n})
1246 }
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001247 mm.handleMetricsPublish(ctx, n, metricInfoSlice)
Girish Gowdrae0140f02021-02-02 16:55:09 -08001248 }
1249 // Does not matter we send success or failure here.
1250 // Those PMs that we failed to collect data will be attempted to collect again in the next PM collection cycle (assuming
1251 // we have not exceed max attempts to collect the PM data)
1252 go func() {
1253 if err := mm.pAdaptFsm.pFsm.Event(l2PmEventSuccess); err != nil {
1254 logger.Errorw(ctx, "error calling event", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "err": err})
1255 }
1256 }()
1257}
1258
Girish Gowdra0e533642021-03-02 22:02:51 -08001259// nolint: gocyclo
Girish Gowdrae0140f02021-02-02 16:55:09 -08001260func (mm *onuMetricsManager) l2PmFsmCreatePM(ctx context.Context, e *fsm.Event) {
1261 // Copy the l2PmToAdd for which we want to collect the metrics since l2PmToAdd can change dynamically
1262 mm.onuMetricsManagerLock.RLock()
1263 copyOfL2PmToAdd := make([]string, len(mm.l2PmToAdd))
1264 _ = copy(copyOfL2PmToAdd, mm.l2PmToAdd)
1265 mm.onuMetricsManagerLock.RUnlock()
1266
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001267 logger.Debugw(ctx, "state create pm - start", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "pms-to-add": copyOfL2PmToAdd})
Girish Gowdrae0140f02021-02-02 16:55:09 -08001268 for _, n := range copyOfL2PmToAdd {
1269 resp := false
Girish Gowdra0e533642021-03-02 22:02:51 -08001270 atLeastOneSuccess := false // flag indicates if at least one ME instance of the PM was successfully created.
1271 cnt := 0
Girish Gowdrae0140f02021-02-02 16:55:09 -08001272 switch n {
1273 case EthernetBridgeHistoryName:
1274 boolForDirection := make([]bool, 2) // stores true and false to indicate upstream and downstream directions.
1275 boolForDirection = append(boolForDirection, true, false)
1276 // Create ME twice, one for each direction. Boolean true is used to indicate upstream and false for downstream.
1277 for _, direction := range boolForDirection {
Girish Gowdrae0140f02021-02-02 16:55:09 -08001278 for _, uniPort := range mm.pDeviceHandler.uniEntityMap {
1279 // Attach the EthernetFramePerformanceMonitoringHistoryData ME to MacBridgePortConfigData on the UNI port
1280 entityID := macBridgePortAniEID + uniPort.entityID
Girish Gowdra0e533642021-03-02 22:02:51 -08001281 _ = mm.updatePmData(ctx, n, entityID, cPmAdd) // TODO: ignore error for now
1282 inner1:
1283 // retry L2PmCreateAttempts times to create the instance of PM
1284 for cnt = 0; cnt < L2PmCreateAttempts; cnt++ {
1285 mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteEthernetPerformanceMonitoringHistoryME(
Girish Gowdra0b235842021-03-09 13:06:46 -08001286 ctx, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, direction, true, mm.pAdaptFsm.commChan, entityID)
Girish Gowdra0e533642021-03-02 22:02:51 -08001287 if resp = mm.waitForResponseOrTimeout(ctx, true, entityID, "EthernetFramePerformanceMonitoringHistoryData"); resp {
1288 atLeastOneSuccess = true
1289 _ = mm.updatePmData(ctx, n, entityID, cPmAdded) // TODO: ignore error for now
1290 break inner1
1291 }
1292 }
1293 if cnt == L2PmCreateAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
1294 _ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
Girish Gowdrae0140f02021-02-02 16:55:09 -08001295 }
1296 }
1297 }
1298 case EthernetUniHistoryName:
Girish Gowdrae0140f02021-02-02 16:55:09 -08001299 for _, uniPort := range mm.pDeviceHandler.uniEntityMap {
1300 if uniPort.portType == uniPPTP { // This metric is only applicable for PPTP Uni Type
Girish Gowdra0e533642021-03-02 22:02:51 -08001301 // Attach the EthernetPerformanceMonitoringHistoryData ME to PPTP port instance
Girish Gowdrae0140f02021-02-02 16:55:09 -08001302 entityID := uniPort.entityID
Girish Gowdra0e533642021-03-02 22:02:51 -08001303 _ = mm.updatePmData(ctx, n, entityID, cPmAdd) // TODO: ignore error for now
1304 inner2:
1305 // retry L2PmCreateAttempts times to create the instance of PM
1306 for cnt = 0; cnt < L2PmCreateAttempts; cnt++ {
1307 mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteEthernetUniHistoryME(
Girish Gowdra0b235842021-03-09 13:06:46 -08001308 ctx, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, true, mm.pAdaptFsm.commChan, entityID)
Girish Gowdra0e533642021-03-02 22:02:51 -08001309 if resp = mm.waitForResponseOrTimeout(ctx, true, entityID, "EthernetPerformanceMonitoringHistoryData"); resp {
1310 atLeastOneSuccess = true
1311 _ = mm.updatePmData(ctx, n, entityID, cPmAdded) // TODO: ignore error for now
1312 break inner2
1313 }
1314 }
1315 if cnt == L2PmCreateAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
1316 _ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
Girish Gowdrae0140f02021-02-02 16:55:09 -08001317 }
1318 }
1319 }
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001320 case FecHistoryName:
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001321 for _, anigInstID := range mm.pDeviceHandler.pOnuOmciDevice.pOnuDB.getSortedInstKeys(ctx, me.AniGClassID) {
Girish Gowdra0e533642021-03-02 22:02:51 -08001322 // Attach the FecPerformanceMonitoringHistoryData ME to the ANI-G ME instance
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001323 mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteFecHistoryME(
Girish Gowdra0b235842021-03-09 13:06:46 -08001324 ctx, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, true, mm.pAdaptFsm.commChan, anigInstID)
Girish Gowdra0e533642021-03-02 22:02:51 -08001325 _ = mm.updatePmData(ctx, n, anigInstID, cPmAdd) // TODO: ignore error for now
1326 inner3:
1327 // retry L2PmCreateAttempts times to create the instance of PM
1328 for cnt = 0; cnt < L2PmCreateAttempts; cnt++ {
1329 if resp = mm.waitForResponseOrTimeout(ctx, true, anigInstID, "FecPerformanceMonitoringHistoryData"); resp {
1330 atLeastOneSuccess = true
1331 _ = mm.updatePmData(ctx, n, anigInstID, cPmAdded) // TODO: ignore error for now
1332 break inner3
1333 }
1334 }
1335 if cnt == L2PmCreateAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
1336 _ = mm.updatePmData(ctx, n, anigInstID, cPmRemoved) // TODO: ignore error for now
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001337 }
1338 }
1339 case GemPortHistoryName:
1340
1341 mm.onuMetricsManagerLock.RLock()
Girish Gowdra0e533642021-03-02 22:02:51 -08001342 copyOfGemPortInstIDsToAdd := make([]uint16, len(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd))
1343 _ = copy(copyOfGemPortInstIDsToAdd, mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd)
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001344 mm.onuMetricsManagerLock.RUnlock()
Girish Gowdra0e533642021-03-02 22:02:51 -08001345
1346 if len(copyOfGemPortInstIDsToAdd) == 0 {
1347 // If there are no gemport history MEs to be created, just skip further processing
1348 // Otherwise down below (after 'switch' case handling) we assume the ME creation failed because resp and atLeastOneSuccess flag are false.
1349 // Normally there are no GemPortHistory MEs to create at start up. They come in only after provisioning service on the ONU.
1350 mm.onuMetricsManagerLock.Lock()
1351 mm.l2PmToAdd = mm.removeIfFoundString(mm.l2PmToAdd, n)
1352 mm.onuMetricsManagerLock.Unlock()
1353 continue
1354 }
1355
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001356 for _, v := range copyOfGemPortInstIDsToAdd {
1357 mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteGemPortHistoryME(
Girish Gowdra0b235842021-03-09 13:06:46 -08001358 ctx, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, true, mm.pAdaptFsm.commChan, v)
Girish Gowdra0e533642021-03-02 22:02:51 -08001359 _ = mm.updatePmData(ctx, n, v, cPmAdd) // TODO: ignore error for now
1360 inner4:
1361 // retry L2PmCreateAttempts times to create the instance of PM
1362 for cnt = 0; cnt < L2PmCreateAttempts; cnt++ {
1363 if resp = mm.waitForResponseOrTimeout(ctx, true, v, "GemPortNetworkCtpPerformanceMonitoringHistoryData"); resp {
1364 atLeastOneSuccess = true
1365 _ = mm.updatePmData(ctx, n, v, cPmAdded) // TODO: ignore error for now
1366 break inner4
1367 }
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001368 }
Girish Gowdra0e533642021-03-02 22:02:51 -08001369 if cnt == L2PmCreateAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
1370 _ = mm.updatePmData(ctx, n, v, cPmRemoved) // TODO: ignore error for now
1371 }
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001372 }
1373
Girish Gowdrae0140f02021-02-02 16:55:09 -08001374 default:
1375 logger.Errorw(ctx, "unsupported l2 pm", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "name": n})
1376 }
Girish Gowdra0e533642021-03-02 22:02:51 -08001377 // On success of at least one instance of the PM for a given ME, update the local list maintained for active PMs and PMs to add
1378 if atLeastOneSuccess {
Girish Gowdrae0140f02021-02-02 16:55:09 -08001379 mm.onuMetricsManagerLock.Lock()
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001380 mm.activeL2Pms = mm.appendIfMissingString(mm.activeL2Pms, n)
1381 mm.l2PmToAdd = mm.removeIfFoundString(mm.l2PmToAdd, n)
1382 logger.Debugw(ctx, "success-resp", log.Fields{"pm-name": n, "active-l2-pms": mm.activeL2Pms, "pms-to-add": mm.l2PmToAdd})
Girish Gowdrae0140f02021-02-02 16:55:09 -08001383 mm.onuMetricsManagerLock.Unlock()
1384 } else {
Girish Gowdra0e533642021-03-02 22:02:51 -08001385 // If we are here then no instance of the PM of the given ME were created successfully, so locally disable the PM
Girish Gowdrae0140f02021-02-02 16:55:09 -08001386 // and also remove it from l2PmToAdd slice so that we do not try to create the PM ME anymore
1387 mm.onuMetricsManagerLock.Lock()
Girish Gowdra0e533642021-03-02 22:02:51 -08001388 logger.Debugw(ctx, "exceeded-max-add-retry-attempts--disabling-group", log.Fields{"groupName": n})
1389 mm.groupMetricMap[n].enabled = false
1390 mm.l2PmToAdd = mm.removeIfFoundString(mm.l2PmToAdd, n)
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001391
Girish Gowdrae0140f02021-02-02 16:55:09 -08001392 logger.Warnw(ctx, "state create pm - failed to create pm",
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001393 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "metricName": n,
Girish Gowdra0e533642021-03-02 22:02:51 -08001394 "active-l2-pms": mm.activeL2Pms, "pms-to-add": mm.l2PmToAdd})
Girish Gowdrae0140f02021-02-02 16:55:09 -08001395 mm.onuMetricsManagerLock.Unlock()
1396 }
1397 }
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001398 logger.Debugw(ctx, "state create pm - done", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "active-l2-pms": mm.activeL2Pms, "pms-to-add": mm.l2PmToAdd})
Girish Gowdrae0140f02021-02-02 16:55:09 -08001399 // Does not matter we send success or failure here.
1400 // Those PMs that we failed to create will be attempted to create again in the next PM creation cycle (assuming
1401 // we have not exceed max attempts to create the PM ME)
1402 go func() {
1403 if err := mm.pAdaptFsm.pFsm.Event(l2PmEventSuccess); err != nil {
1404 logger.Errorw(ctx, "error calling event", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "err": err})
1405 }
1406 }()
1407}
1408
Girish Gowdra0e533642021-03-02 22:02:51 -08001409// nolint: gocyclo
Girish Gowdrae0140f02021-02-02 16:55:09 -08001410func (mm *onuMetricsManager) l2PmFsmDeletePM(ctx context.Context, e *fsm.Event) {
1411 // Copy the l2PmToDelete for which we want to collect the metrics since l2PmToDelete can change dynamically
1412 mm.onuMetricsManagerLock.RLock()
1413 copyOfL2PmToDelete := make([]string, len(mm.l2PmToDelete))
1414 _ = copy(copyOfL2PmToDelete, mm.l2PmToDelete)
1415 mm.onuMetricsManagerLock.RUnlock()
1416
1417 logger.Debugw(ctx, "state delete pm", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "pms-to-delete": mm.l2PmToDelete})
1418 for _, n := range copyOfL2PmToDelete {
1419 resp := false
Girish Gowdra0e533642021-03-02 22:02:51 -08001420 cnt := 0
1421 atLeastOneDeleteFailure := false
1422
1423 // mm.groupMetricMap[n].pmMEData.InstancesActive could dynamically change, so make a copy
1424 mm.onuMetricsManagerLock.RLock()
1425 copyOfEntityIDs := make([]uint16, len(mm.groupMetricMap[n].pmMEData.InstancesActive))
1426 _ = copy(copyOfEntityIDs, mm.groupMetricMap[n].pmMEData.InstancesActive)
1427 mm.onuMetricsManagerLock.RUnlock()
1428
1429 if len(copyOfEntityIDs) == 0 {
1430 // if there are no enityIDs to remove for the PM ME just clear the PM name entry from cache and continue
1431 mm.onuMetricsManagerLock.Lock()
1432 mm.activeL2Pms = mm.removeIfFoundString(mm.activeL2Pms, n)
1433 mm.l2PmToDelete = mm.removeIfFoundString(mm.l2PmToDelete, n)
1434 logger.Debugw(ctx, "success-resp", log.Fields{"pm-name": n, "active-l2-pms": mm.activeL2Pms, "pms-to-delete": mm.l2PmToDelete})
1435 mm.onuMetricsManagerLock.Unlock()
1436 continue
1437 }
1438 logger.Debugw(ctx, "entities to delete", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "metricName": n, "entityIDs": copyOfEntityIDs})
Girish Gowdrae0140f02021-02-02 16:55:09 -08001439 switch n {
1440 case EthernetBridgeHistoryName:
1441 boolForDirection := make([]bool, 2) // stores true and false to indicate upstream and downstream directions.
1442 boolForDirection = append(boolForDirection, true, false)
1443 // Create ME twice, one for each direction. Boolean true is used to indicate upstream and false for downstream.
1444 for _, direction := range boolForDirection {
Girish Gowdra0e533642021-03-02 22:02:51 -08001445 for _, entityID := range copyOfEntityIDs {
1446 inner1:
1447 // retry L2PmDeleteAttempts times to delete the instance of PM
1448 for cnt = 0; cnt < L2PmDeleteAttempts; cnt++ {
1449 mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteEthernetPerformanceMonitoringHistoryME(
Girish Gowdra0b235842021-03-09 13:06:46 -08001450 ctx, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, direction, false, mm.pAdaptFsm.commChan, entityID)
Girish Gowdra0e533642021-03-02 22:02:51 -08001451 _ = mm.updatePmData(ctx, n, entityID, cPmRemove) // TODO: ignore error for now
1452 if resp = mm.waitForResponseOrTimeout(ctx, false, entityID, "EthernetFramePerformanceMonitoringHistoryData"); !resp {
1453 atLeastOneDeleteFailure = true
1454 } else {
1455 _ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
1456 break inner1
1457 }
1458 }
1459 if cnt == L2PmDeleteAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
1460 _ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
Girish Gowdrae0140f02021-02-02 16:55:09 -08001461 }
1462 }
1463 }
1464 case EthernetUniHistoryName:
Girish Gowdra0e533642021-03-02 22:02:51 -08001465 for _, entityID := range copyOfEntityIDs {
1466 inner2:
1467 // retry L2PmDeleteAttempts times to delete the instance of PM
1468 for cnt = 0; cnt < L2PmDeleteAttempts; cnt++ {
Girish Gowdrae0140f02021-02-02 16:55:09 -08001469 mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteEthernetUniHistoryME(
Girish Gowdra0b235842021-03-09 13:06:46 -08001470 ctx, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, false, mm.pAdaptFsm.commChan, entityID)
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001471 if resp = mm.waitForResponseOrTimeout(ctx, false, entityID, "EthernetPerformanceMonitoringHistoryData"); !resp {
Girish Gowdra0e533642021-03-02 22:02:51 -08001472 atLeastOneDeleteFailure = true
1473 } else {
1474 _ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001475 break inner2
Girish Gowdrae0140f02021-02-02 16:55:09 -08001476 }
1477 }
Girish Gowdra0e533642021-03-02 22:02:51 -08001478 if cnt == L2PmDeleteAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
1479 _ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
1480 }
Girish Gowdrae0140f02021-02-02 16:55:09 -08001481 }
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001482 case FecHistoryName:
Girish Gowdra0e533642021-03-02 22:02:51 -08001483 for _, entityID := range copyOfEntityIDs {
1484 inner3:
1485 // retry L2PmDeleteAttempts times to delete the instance of PM
1486 for cnt = 0; cnt < L2PmDeleteAttempts; cnt++ {
1487 mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteFecHistoryME(
Girish Gowdra0b235842021-03-09 13:06:46 -08001488 ctx, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, false, mm.pAdaptFsm.commChan, entityID)
Girish Gowdra0e533642021-03-02 22:02:51 -08001489 if resp := mm.waitForResponseOrTimeout(ctx, false, entityID, "FecPerformanceMonitoringHistoryData"); !resp {
1490 atLeastOneDeleteFailure = true
1491 } else {
1492 _ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
1493 break inner3
1494 }
1495 }
1496 if cnt == L2PmDeleteAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
1497 _ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001498 }
1499 }
1500 case GemPortHistoryName:
Girish Gowdra0e533642021-03-02 22:02:51 -08001501 for _, entityID := range copyOfEntityIDs {
1502 inner4:
1503 // retry L2PmDeleteAttempts times to delete the instance of PM
1504 for cnt = 0; cnt < L2PmDeleteAttempts; cnt++ {
1505 mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteGemPortHistoryME(
Girish Gowdra0b235842021-03-09 13:06:46 -08001506 ctx, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, false, mm.pAdaptFsm.commChan, entityID)
Girish Gowdra0e533642021-03-02 22:02:51 -08001507 if resp = mm.waitForResponseOrTimeout(ctx, false, entityID, "GemPortNetworkCtpPerformanceMonitoringHistoryData"); !resp {
1508 atLeastOneDeleteFailure = true
1509 } else {
1510 _ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
1511 break inner4
1512 }
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001513 }
Girish Gowdra0e533642021-03-02 22:02:51 -08001514 if cnt == L2PmDeleteAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
1515 _ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
1516 }
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001517 }
Girish Gowdrae0140f02021-02-02 16:55:09 -08001518 default:
1519 logger.Errorw(ctx, "unsupported l2 pm", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "name": n})
1520 }
Girish Gowdra0e533642021-03-02 22:02:51 -08001521 // If we could not completely clean up the PM ME then just give up.
1522 if atLeastOneDeleteFailure {
1523 logger.Warnw(ctx, "state delete pm - failed to delete at least one instance of the PM ME",
1524 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "metricName": n,
1525 "active-l2-pms": mm.activeL2Pms, "pms-to-delete": mm.l2PmToDelete})
1526 mm.onuMetricsManagerLock.Lock()
1527 logger.Debugw(ctx, "exceeded-max-delete-retry-attempts--disabling-group", log.Fields{"groupName": n})
1528 mm.activeL2Pms = mm.removeIfFoundString(mm.activeL2Pms, n)
1529 mm.l2PmToDelete = mm.removeIfFoundString(mm.l2PmToDelete, n)
1530 mm.groupMetricMap[n].enabled = false
1531 mm.onuMetricsManagerLock.Unlock()
1532 } else { // success case
Girish Gowdrae0140f02021-02-02 16:55:09 -08001533 mm.onuMetricsManagerLock.Lock()
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001534 mm.activeL2Pms = mm.removeIfFoundString(mm.activeL2Pms, n)
1535 mm.l2PmToDelete = mm.removeIfFoundString(mm.l2PmToDelete, n)
1536 logger.Debugw(ctx, "success-resp", log.Fields{"pm-name": n, "active-l2-pms": mm.activeL2Pms, "pms-to-delete": mm.l2PmToDelete})
Girish Gowdrae0140f02021-02-02 16:55:09 -08001537 mm.onuMetricsManagerLock.Unlock()
1538 }
1539 }
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001540 logger.Debugw(ctx, "state delete pm - done", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "active-l2-pms": mm.activeL2Pms, "pms-to-delete": mm.l2PmToDelete})
Girish Gowdrae0140f02021-02-02 16:55:09 -08001541 // Does not matter we send success or failure here.
1542 // Those PMs that we failed to delete will be attempted to create again in the next PM collection cycle
1543 go func() {
1544 if err := mm.pAdaptFsm.pFsm.Event(l2PmEventSuccess); err != nil {
1545 logger.Errorw(ctx, "error calling event", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "err": err})
1546 }
1547 }()
1548}
1549
1550// ** L2 PM FSM Handlers end **
1551
1552// syncTime synchronizes time with the ONU to establish a 15 min boundary for PM collection and reporting.
1553func (mm *onuMetricsManager) syncTime(ctx context.Context) error {
Girish Gowdra0b235842021-03-09 13:06:46 -08001554 if err := mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendSyncTime(ctx, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, mm.pAdaptFsm.commChan); err != nil {
Girish Gowdrae0140f02021-02-02 16:55:09 -08001555 logger.Errorw(ctx, "cannot send sync time request", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
1556 return err
1557 }
1558
1559 select {
Girish Gowdra0b235842021-03-09 13:06:46 -08001560 case <-time.After(time.Duration(mm.pDeviceHandler.pOpenOnuAc.omciTimeout) * time.Second):
Girish Gowdrae0140f02021-02-02 16:55:09 -08001561 logger.Errorf(ctx, "timed out waiting for sync time response from onu", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
1562 return fmt.Errorf("timed-out-waiting-for-sync-time-response-%v", mm.pDeviceHandler.deviceID)
1563 case syncTimeRes := <-mm.syncTimeResponseChan:
1564 if !syncTimeRes {
1565 return fmt.Errorf("failed-to-sync-time-%v", mm.pDeviceHandler.deviceID)
1566 }
1567 logger.Infow(ctx, "sync time success", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
1568 return nil
1569 }
1570}
1571
1572func (mm *onuMetricsManager) collectEthernetFramePerformanceMonitoringHistoryData(ctx context.Context, upstream bool, entityID uint16) *voltha.MetricInformation {
1573 var mEnt *me.ManagedEntity
1574 var omciErr me.OmciErrors
1575 var classID me.ClassID
1576 var meAttributes me.AttributeValueMap
1577 logger.Debugw(ctx, "collecting data for EthernetFramePerformanceMonitoringHistoryData", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID, "upstream": upstream})
1578 meParam := me.ParamData{EntityID: entityID}
1579 if upstream {
1580 if mEnt, omciErr = me.NewEthernetFramePerformanceMonitoringHistoryDataUpstream(meParam); omciErr == nil || mEnt == nil || omciErr.GetError() != nil {
1581 logger.Errorw(ctx, "error creating me", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID, "upstream": upstream})
1582 return nil
1583 }
1584 classID = me.EthernetFramePerformanceMonitoringHistoryDataUpstreamClassID
1585 } else {
1586 if mEnt, omciErr = me.NewEthernetFramePerformanceMonitoringHistoryDataDownstream(meParam); omciErr == nil || mEnt == nil || omciErr.GetError() != nil {
1587 logger.Errorw(ctx, "error creating me", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID, "upstream": upstream})
1588 return nil
1589 }
1590 classID = me.EthernetFramePerformanceMonitoringHistoryDataDownstreamClassID
1591 }
1592
Girish Gowdrae0140f02021-02-02 16:55:09 -08001593 intervalEndTime := -1
1594 ethPMHistData := make(map[string]float32)
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001595 if err := mm.populateGroupSpecificMetrics(ctx, mEnt, classID, entityID, meAttributes, ethPMHistData, &intervalEndTime); err != nil {
1596 return nil
Girish Gowdrae0140f02021-02-02 16:55:09 -08001597 }
1598
1599 // Populate some relevant context for the EthernetFramePerformanceMonitoringHistoryData PM
1600 ethPMHistData["class_id"] = float32(classID)
1601 ethPMHistData["interval_end_time"] = float32(intervalEndTime)
1602 ethPMHistData["parent_class_id"] = float32(me.MacBridgeConfigurationDataClassID) // EthernetFramePerformanceMonitoringHistoryData is attached to MBPCD ME
1603 ethPMHistData["parent_entity_id"] = float32(entityID)
1604 if upstream {
1605 ethPMHistData["upstream"] = float32(1)
1606 } else {
1607 ethPMHistData["upstream"] = float32(0)
1608 }
1609
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001610 metricInfo := mm.populateOnuMetricInfo(EthernetBridgeHistoryName, ethPMHistData)
Girish Gowdrae0140f02021-02-02 16:55:09 -08001611
Girish Gowdrae0140f02021-02-02 16:55:09 -08001612 logger.Debugw(ctx, "collecting data for EthernetFramePerformanceMonitoringHistoryData successful",
1613 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID, "upstream": upstream, "metricInfo": metricInfo})
1614 return &metricInfo
1615}
1616
1617func (mm *onuMetricsManager) collectEthernetUniHistoryData(ctx context.Context, entityID uint16) *voltha.MetricInformation {
1618 var mEnt *me.ManagedEntity
1619 var omciErr me.OmciErrors
1620 var classID me.ClassID
1621 var meAttributes me.AttributeValueMap
1622 logger.Debugw(ctx, "collecting data for EthernetFramePerformanceMonitoringHistoryData", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID})
1623 meParam := me.ParamData{EntityID: entityID}
1624 if mEnt, omciErr = me.NewEthernetPerformanceMonitoringHistoryData(meParam); omciErr == nil || mEnt == nil || omciErr.GetError() != nil {
1625 logger.Errorw(ctx, "error creating me", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID})
1626 return nil
1627 }
1628 classID = me.EthernetPerformanceMonitoringHistoryDataClassID
1629
Girish Gowdrae0140f02021-02-02 16:55:09 -08001630 intervalEndTime := -1
1631 ethUniHistData := make(map[string]float32)
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001632 if err := mm.populateGroupSpecificMetrics(ctx, mEnt, classID, entityID, meAttributes, ethUniHistData, &intervalEndTime); err != nil {
1633 return nil
Girish Gowdrae0140f02021-02-02 16:55:09 -08001634 }
1635
1636 // Populate some relevant context for the EthernetPerformanceMonitoringHistoryData PM
1637 ethUniHistData["class_id"] = float32(classID)
1638 ethUniHistData["interval_end_time"] = float32(intervalEndTime)
1639
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001640 metricInfo := mm.populateOnuMetricInfo(EthernetUniHistoryName, ethUniHistData)
Girish Gowdrae0140f02021-02-02 16:55:09 -08001641
Girish Gowdrae0140f02021-02-02 16:55:09 -08001642 logger.Debugw(ctx, "collecting data for EthernetPerformanceMonitoringHistoryData successful",
1643 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID, "metricInfo": metricInfo})
1644 return &metricInfo
1645}
1646
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001647func (mm *onuMetricsManager) collectFecHistoryData(ctx context.Context, entityID uint16) *voltha.MetricInformation {
1648 var mEnt *me.ManagedEntity
1649 var omciErr me.OmciErrors
1650 var classID me.ClassID
1651 var meAttributes me.AttributeValueMap
1652 logger.Debugw(ctx, "collecting data for FecPerformanceMonitoringHistoryData", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID})
1653 meParam := me.ParamData{EntityID: entityID}
1654 if mEnt, omciErr = me.NewFecPerformanceMonitoringHistoryData(meParam); omciErr == nil || mEnt == nil || omciErr.GetError() != nil {
1655 logger.Errorw(ctx, "error creating me", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID})
1656 return nil
1657 }
1658 classID = me.FecPerformanceMonitoringHistoryDataClassID
1659
1660 intervalEndTime := -1
1661 fecHistData := make(map[string]float32)
1662 if err := mm.populateGroupSpecificMetrics(ctx, mEnt, classID, entityID, meAttributes, fecHistData, &intervalEndTime); err != nil {
1663 return nil
1664 }
1665
1666 // Populate some relevant context for the EthernetPerformanceMonitoringHistoryData PM
1667 fecHistData["class_id"] = float32(classID)
1668 fecHistData["interval_end_time"] = float32(intervalEndTime)
1669
1670 metricInfo := mm.populateOnuMetricInfo(FecHistoryName, fecHistData)
1671
1672 logger.Debugw(ctx, "collecting data for FecPerformanceMonitoringHistoryData successful",
1673 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID, "metricInfo": metricInfo})
1674 return &metricInfo
1675}
1676
1677func (mm *onuMetricsManager) collectGemHistoryData(ctx context.Context, entityID uint16) *voltha.MetricInformation {
1678 var mEnt *me.ManagedEntity
1679 var omciErr me.OmciErrors
1680 var classID me.ClassID
1681 var meAttributes me.AttributeValueMap
1682 logger.Debugw(ctx, "collecting data for GemPortNetworkCtpPerformanceMonitoringHistoryData", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID})
1683 meParam := me.ParamData{EntityID: entityID}
1684 if mEnt, omciErr = me.NewGemPortNetworkCtpPerformanceMonitoringHistoryData(meParam); omciErr == nil || mEnt == nil || omciErr.GetError() != nil {
1685 logger.Errorw(ctx, "error creating me", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID})
1686 return nil
1687 }
1688 classID = me.GemPortNetworkCtpPerformanceMonitoringHistoryDataClassID
1689
1690 intervalEndTime := -1
1691 gemHistData := make(map[string]float32)
1692 if err := mm.populateGroupSpecificMetrics(ctx, mEnt, classID, entityID, meAttributes, gemHistData, &intervalEndTime); err != nil {
1693 return nil
1694 }
1695
1696 // Populate some relevant context for the GemPortNetworkCtpPerformanceMonitoringHistoryData PM
1697 gemHistData["class_id"] = float32(classID)
1698 gemHistData["interval_end_time"] = float32(intervalEndTime)
1699
1700 metricInfo := mm.populateOnuMetricInfo(GemPortHistoryName, gemHistData)
1701
1702 logger.Debugw(ctx, "collecting data for GemPortNetworkCtpPerformanceMonitoringHistoryData successful",
1703 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID, "metricInfo": metricInfo})
1704 return &metricInfo
1705}
1706
Girish Gowdrae0140f02021-02-02 16:55:09 -08001707// nolint: gocyclo
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001708func (mm *onuMetricsManager) populateEthernetBridgeHistoryMetrics(ctx context.Context, classID me.ClassID, entityID uint16,
Girish Gowdrae0140f02021-02-02 16:55:09 -08001709 meAttributes me.AttributeValueMap, requestedAttributes me.AttributeValueMap, ethPMHistData map[string]float32, intervalEndTime *int) error {
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001710 upstream := false
1711 if classID == me.EthernetFramePerformanceMonitoringHistoryDataUpstreamClassID {
1712 upstream = true
1713 }
Girish Gowdrae0140f02021-02-02 16:55:09 -08001714 // Make sure "IntervalEndTime" is part of the requested attributes as we need this to compare the get responses when get request is multipart
1715 if _, ok := requestedAttributes["IntervalEndTime"]; !ok {
1716 requestedAttributes["IntervalEndTime"] = 0
1717 }
Girish Gowdra0b235842021-03-09 13:06:46 -08001718 if meInstance := mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendGetMe(ctx, classID, entityID, requestedAttributes, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, mm.pAdaptFsm.commChan); meInstance != nil {
Girish Gowdrae0140f02021-02-02 16:55:09 -08001719 select {
1720 case meAttributes = <-mm.l2PmChan:
1721 logger.Debugw(ctx, "received ethernet pm history data metrics",
1722 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "upstream": upstream, "entityID": entityID})
Girish Gowdra0b235842021-03-09 13:06:46 -08001723 case <-time.After(time.Duration(mm.pDeviceHandler.pOpenOnuAc.omciTimeout) * time.Second):
Girish Gowdrae0140f02021-02-02 16:55:09 -08001724 logger.Errorw(ctx, "timeout waiting for omci-get response for ethernet pm history data",
1725 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "upstream": upstream, "entityID": entityID})
1726 // The metrics will be empty in this case
1727 return fmt.Errorf("timeout-during-l2-pm-collection-for-ethernet-bridge-history-%v", mm.pDeviceHandler.deviceID)
1728 }
1729 // verify that interval end time has not changed during metric collection. If it changed, we abort the procedure
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001730 if valid := mm.updateAndValidateIntervalEndTime(ctx, entityID, meAttributes, intervalEndTime); !valid {
1731 return fmt.Errorf("interval-end-time-changed-during-metric-collection-for-ethernet-bridge-history-%v", mm.pDeviceHandler.deviceID)
Girish Gowdrae0140f02021-02-02 16:55:09 -08001732 }
1733 }
1734 for k := range EthernetBridgeHistory {
1735 // populate ethPMHistData only if metric key not already present (or populated), since it is possible that we populate
1736 // the attributes in multiple iterations for a given L2 PM ME as there is a limit on the max OMCI GET payload size.
1737 if _, ok := ethPMHistData[k]; !ok {
1738 switch k {
Girish Gowdra0e533642021-03-02 22:02:51 -08001739 case "entity_id":
1740 if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
1741 ethPMHistData[k] = float32(val.(uint16))
1742 }
Girish Gowdrae0140f02021-02-02 16:55:09 -08001743 case "drop_events":
1744 if val, ok := meAttributes["DropEvents"]; ok && val != nil {
1745 ethPMHistData[k] = float32(val.(uint32))
1746 }
1747 case "octets":
1748 if val, ok := meAttributes["Octets"]; ok && val != nil {
1749 ethPMHistData[k] = float32(val.(uint32))
1750 }
1751 case "packets":
1752 if val, ok := meAttributes["Packets"]; ok && val != nil {
1753 ethPMHistData[k] = float32(val.(uint32))
1754 }
1755 case "broadcast_packets":
1756 if val, ok := meAttributes["BroadcastPackets"]; ok && val != nil {
1757 ethPMHistData[k] = float32(val.(uint32))
1758 }
1759 case "multicast_packets":
1760 if val, ok := meAttributes["MulticastPackets"]; ok && val != nil {
1761 ethPMHistData[k] = float32(val.(uint32))
1762 }
1763 case "crc_errored_packets":
1764 if val, ok := meAttributes["CrcErroredPackets"]; ok && val != nil {
1765 ethPMHistData[k] = float32(val.(uint32))
1766 }
1767 case "undersize_packets":
1768 if val, ok := meAttributes["UndersizePackets"]; ok && val != nil {
1769 ethPMHistData[k] = float32(val.(uint32))
1770 }
1771 case "oversize_packets":
1772 if val, ok := meAttributes["OversizePackets"]; ok && val != nil {
1773 ethPMHistData[k] = float32(val.(uint32))
1774 }
1775 case "64_octets":
1776 if val, ok := meAttributes["Packets64Octets"]; ok && val != nil {
1777 ethPMHistData[k] = float32(val.(uint32))
1778 }
1779 case "65_to_127_octets":
1780 if val, ok := meAttributes["Packets65To127Octets"]; ok && val != nil {
1781 ethPMHistData[k] = float32(val.(uint32))
1782 }
1783 case "128_to_255_octets":
1784 if val, ok := meAttributes["Packets128To255Octets"]; ok && val != nil {
1785 ethPMHistData[k] = float32(val.(uint32))
1786 }
1787 case "256_to_511_octets":
1788 if val, ok := meAttributes["Packets256To511Octets"]; ok && val != nil {
1789 ethPMHistData[k] = float32(val.(uint32))
1790 }
1791 case "512_to_1023_octets":
1792 if val, ok := meAttributes["Packets512To1023Octets"]; ok && val != nil {
1793 ethPMHistData[k] = float32(val.(uint32))
1794 }
1795 case "1024_to_1518_octets":
1796 if val, ok := meAttributes["Packets1024To1518Octets"]; ok && val != nil {
1797 ethPMHistData[k] = float32(val.(uint32))
1798 }
1799 default:
1800 // do nothing
1801 }
1802 }
1803 }
1804 return nil
1805}
1806
1807// nolint: gocyclo
1808func (mm *onuMetricsManager) populateEthernetUniHistoryMetrics(ctx context.Context, classID me.ClassID, entityID uint16,
1809 meAttributes me.AttributeValueMap, requestedAttributes me.AttributeValueMap, ethPMUniHistData map[string]float32, intervalEndTime *int) error {
1810 // Make sure "IntervalEndTime" is part of the requested attributes as we need this to compare the get responses when get request is multipart
1811 if _, ok := requestedAttributes["IntervalEndTime"]; !ok {
1812 requestedAttributes["IntervalEndTime"] = 0
1813 }
Girish Gowdra0b235842021-03-09 13:06:46 -08001814 if meInstance := mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendGetMe(ctx, classID, entityID, requestedAttributes, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, mm.pAdaptFsm.commChan); meInstance != nil {
Girish Gowdrae0140f02021-02-02 16:55:09 -08001815 select {
1816 case meAttributes = <-mm.l2PmChan:
1817 logger.Debugw(ctx, "received ethernet uni history data metrics",
1818 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID})
Girish Gowdra0b235842021-03-09 13:06:46 -08001819 case <-time.After(time.Duration(mm.pDeviceHandler.pOpenOnuAc.omciTimeout) * time.Second):
Girish Gowdrae0140f02021-02-02 16:55:09 -08001820 logger.Errorw(ctx, "timeout waiting for omci-get response for ethernet uni history data",
1821 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID})
1822 // The metrics will be empty in this case
1823 return fmt.Errorf("timeout-during-l2-pm-collection-for-ethernet-uni-history-%v", mm.pDeviceHandler.deviceID)
1824 }
1825 // verify that interval end time has not changed during metric collection. If it changed, we abort the procedure
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001826 if valid := mm.updateAndValidateIntervalEndTime(ctx, entityID, meAttributes, intervalEndTime); !valid {
1827 return fmt.Errorf("interval-end-time-changed-during-metric-collection-for-ethernet-uni-history-%v", mm.pDeviceHandler.deviceID)
Girish Gowdrae0140f02021-02-02 16:55:09 -08001828 }
1829 }
1830 for k := range EthernetUniHistory {
1831 // populate ethPMUniHistData only if metric key not already present (or populated), since it is possible that we populate
1832 // the attributes in multiple iterations for a given L2 PM ME as there is a limit on the max OMCI GET payload size.
1833 if _, ok := ethPMUniHistData[k]; !ok {
1834 switch k {
Girish Gowdra0e533642021-03-02 22:02:51 -08001835 case "entity_id":
1836 if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
1837 ethPMUniHistData[k] = float32(val.(uint16))
1838 }
Girish Gowdrae0140f02021-02-02 16:55:09 -08001839 case "fcs_errors":
1840 if val, ok := meAttributes["FcsErrors"]; ok && val != nil {
1841 ethPMUniHistData[k] = float32(val.(uint32))
1842 }
1843 case "excessive_collision_counter":
1844 if val, ok := meAttributes["ExcessiveCollisionCounter"]; ok && val != nil {
1845 ethPMUniHistData[k] = float32(val.(uint32))
1846 }
1847 case "late_collision_counter":
1848 if val, ok := meAttributes["LateCollisionCounter"]; ok && val != nil {
1849 ethPMUniHistData[k] = float32(val.(uint32))
1850 }
1851 case "frames_too_long":
1852 if val, ok := meAttributes["FramesTooLong"]; ok && val != nil {
1853 ethPMUniHistData[k] = float32(val.(uint32))
1854 }
1855 case "buffer_overflows_on_rx":
1856 if val, ok := meAttributes["BufferOverflowsOnReceive"]; ok && val != nil {
1857 ethPMUniHistData[k] = float32(val.(uint32))
1858 }
1859 case "buffer_overflows_on_tx":
1860 if val, ok := meAttributes["BufferOverflowsOnTransmit"]; ok && val != nil {
1861 ethPMUniHistData[k] = float32(val.(uint32))
1862 }
1863 case "single_collision_frame_counter":
1864 if val, ok := meAttributes["SingleCollisionFrameCounter"]; ok && val != nil {
1865 ethPMUniHistData[k] = float32(val.(uint32))
1866 }
1867 case "multiple_collisions_frame_counter":
1868 if val, ok := meAttributes["MultipleCollisionsFrameCounter"]; ok && val != nil {
1869 ethPMUniHistData[k] = float32(val.(uint32))
1870 }
1871 case "sqe_counter":
1872 if val, ok := meAttributes["SqeCounter"]; ok && val != nil {
1873 ethPMUniHistData[k] = float32(val.(uint32))
1874 }
1875 case "deferred_tx_counter":
1876 if val, ok := meAttributes["DeferredTransmissionCounter"]; ok && val != nil {
1877 ethPMUniHistData[k] = float32(val.(uint32))
1878 }
1879 case "internal_mac_tx_error_counter":
1880 if val, ok := meAttributes["InternalMacTransmitErrorCounter"]; ok && val != nil {
1881 ethPMUniHistData[k] = float32(val.(uint32))
1882 }
1883 case "carrier_sense_error_counter":
1884 if val, ok := meAttributes["CarrierSenseErrorCounter"]; ok && val != nil {
1885 ethPMUniHistData[k] = float32(val.(uint32))
1886 }
1887 case "alignment_error_counter":
1888 if val, ok := meAttributes["AlignmentErrorCounter"]; ok && val != nil {
1889 ethPMUniHistData[k] = float32(val.(uint32))
1890 }
1891 case "internal_mac_rx_error_counter":
1892 if val, ok := meAttributes["InternalMacReceiveErrorCounter"]; ok && val != nil {
1893 ethPMUniHistData[k] = float32(val.(uint32))
1894 }
1895 default:
1896 // do nothing
1897 }
1898 }
1899 }
1900 return nil
1901}
1902
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001903// nolint: gocyclo
1904func (mm *onuMetricsManager) populateFecHistoryMetrics(ctx context.Context, classID me.ClassID, entityID uint16,
1905 meAttributes me.AttributeValueMap, requestedAttributes me.AttributeValueMap, fecHistData map[string]float32, intervalEndTime *int) error {
1906 // Make sure "IntervalEndTime" is part of the requested attributes as we need this to compare the get responses when get request is multipart
1907 if _, ok := requestedAttributes["IntervalEndTime"]; !ok {
1908 requestedAttributes["IntervalEndTime"] = 0
1909 }
Girish Gowdra0b235842021-03-09 13:06:46 -08001910 if meInstance := mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendGetMe(ctx, classID, entityID, requestedAttributes, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, mm.pAdaptFsm.commChan); meInstance != nil {
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001911 select {
1912 case meAttributes = <-mm.l2PmChan:
1913 logger.Debugw(ctx, "received fec history data metrics",
1914 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID})
Girish Gowdra0b235842021-03-09 13:06:46 -08001915 case <-time.After(time.Duration(mm.pDeviceHandler.pOpenOnuAc.omciTimeout) * time.Second):
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001916 logger.Errorw(ctx, "timeout waiting for omci-get response for fec history data",
1917 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID})
1918 // The metrics will be empty in this case
1919 return fmt.Errorf("timeout-during-l2-pm-collection-for-fec-history-%v", mm.pDeviceHandler.deviceID)
1920 }
1921 // verify that interval end time has not changed during metric collection. If it changed, we abort the procedure
1922 if valid := mm.updateAndValidateIntervalEndTime(ctx, entityID, meAttributes, intervalEndTime); !valid {
1923 return fmt.Errorf("interval-end-time-changed-during-metric-collection-for-fec-history-%v", mm.pDeviceHandler.deviceID)
1924 }
1925 }
1926 for k := range FecHistory {
1927 // populate fecHistData only if metric key not already present (or populated), since it is possible that we populate
1928 // the attributes in multiple iterations for a given L2 PM ME as there is a limit on the max OMCI GET payload size.
1929 if _, ok := fecHistData[k]; !ok {
1930 switch k {
Girish Gowdra0e533642021-03-02 22:02:51 -08001931 case "entity_id":
1932 if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
1933 fecHistData[k] = float32(val.(uint16))
1934 }
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001935 case "corrected_bytes":
1936 if val, ok := meAttributes["CorrectedBytes"]; ok && val != nil {
1937 fecHistData[k] = float32(val.(uint32))
1938 }
1939 case "corrected_code_words":
1940 if val, ok := meAttributes["CorrectedCodeWords"]; ok && val != nil {
1941 fecHistData[k] = float32(val.(uint32))
1942 }
1943 case "uncorrectable_code_words":
1944 if val, ok := meAttributes["UncorrectableCodeWords"]; ok && val != nil {
1945 fecHistData[k] = float32(val.(uint32))
1946 }
1947 case "total_code_words":
1948 if val, ok := meAttributes["TotalCodeWords"]; ok && val != nil {
1949 fecHistData[k] = float32(val.(uint32))
1950 }
1951 case "fec_seconds":
1952 if val, ok := meAttributes["FecSeconds"]; ok && val != nil {
1953 fecHistData[k] = float32(val.(uint16))
1954 }
1955 default:
1956 // do nothing
1957 }
1958 }
1959 }
1960 return nil
1961}
1962
1963// nolint: gocyclo
1964func (mm *onuMetricsManager) populateGemPortMetrics(ctx context.Context, classID me.ClassID, entityID uint16,
1965 meAttributes me.AttributeValueMap, requestedAttributes me.AttributeValueMap, gemPortHistData map[string]float32, intervalEndTime *int) error {
1966 // Make sure "IntervalEndTime" is part of the requested attributes as we need this to compare the get responses when get request is multipart
1967 if _, ok := requestedAttributes["IntervalEndTime"]; !ok {
1968 requestedAttributes["IntervalEndTime"] = 0
1969 }
Girish Gowdra0b235842021-03-09 13:06:46 -08001970 if meInstance := mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendGetMe(ctx, classID, entityID, requestedAttributes, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, mm.pAdaptFsm.commChan); meInstance != nil {
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001971 select {
1972 case meAttributes = <-mm.l2PmChan:
1973 logger.Debugw(ctx, "received gem port history data metrics",
1974 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID})
Girish Gowdra0b235842021-03-09 13:06:46 -08001975 case <-time.After(time.Duration(mm.pDeviceHandler.pOpenOnuAc.omciTimeout) * time.Second):
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001976 logger.Errorw(ctx, "timeout waiting for omci-get response for gem port history data",
1977 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID})
1978 // The metrics will be empty in this case
1979 return fmt.Errorf("timeout-during-l2-pm-collection-for-gemport-history-%v", mm.pDeviceHandler.deviceID)
1980 }
1981 // verify that interval end time has not changed during metric collection. If it changed, we abort the procedure
1982 if valid := mm.updateAndValidateIntervalEndTime(ctx, entityID, meAttributes, intervalEndTime); !valid {
1983 return fmt.Errorf("interval-end-time-changed-during-metric-collection-for-gemport-history-%v", mm.pDeviceHandler.deviceID)
1984 }
1985 }
1986 for k := range GemPortHistory {
1987 // populate gemPortHistData only if metric key not already present (or populated), since it is possible that we populate
1988 // the attributes in multiple iterations for a given L2 PM ME as there is a limit on the max OMCI GET payload size.
1989 if _, ok := gemPortHistData[k]; !ok {
1990 switch k {
Girish Gowdra0e533642021-03-02 22:02:51 -08001991 case "entity_id":
1992 if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
1993 gemPortHistData[k] = float32(val.(uint16))
1994 }
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001995 case "transmitted_gem_frames":
1996 if val, ok := meAttributes["TransmittedGemFrames"]; ok && val != nil {
1997 gemPortHistData[k] = float32(val.(uint32))
1998 }
1999 case "received_gem_frames":
2000 if val, ok := meAttributes["ReceivedGemFrames"]; ok && val != nil {
2001 gemPortHistData[k] = float32(val.(uint32))
2002 }
2003 case "received_payload_bytes":
2004 if val, ok := meAttributes["ReceivedPayloadBytes"]; ok && val != nil {
2005 gemPortHistData[k] = float32(val.(uint64))
2006 }
2007 case "transmitted_payload_bytes":
2008 if val, ok := meAttributes["TransmittedPayloadBytes"]; ok && val != nil {
2009 gemPortHistData[k] = float32(val.(uint64))
2010 }
2011 case "encryption_key_errors":
2012 if val, ok := meAttributes["EncryptionKeyErrors"]; ok && val != nil {
2013 gemPortHistData[k] = float32(val.(uint32))
2014 }
2015 default:
2016 // do nothing
2017 }
2018 }
2019 }
2020 return nil
2021}
2022
Girish Gowdrae0140f02021-02-02 16:55:09 -08002023func (mm *onuMetricsManager) handleOmciCreateResponseMessage(ctx context.Context, msg OmciMessage) error {
2024 msgLayer := (*msg.OmciPacket).Layer(omci.LayerTypeCreateResponse)
2025 if msgLayer == nil {
2026 logger.Errorw(ctx, "omci Msg layer could not be detected for create response - handling stopped", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
2027 return fmt.Errorf("omci Msg layer could not be detected for create response - handling stopped: %s", mm.pDeviceHandler.deviceID)
2028 }
2029 msgObj, msgOk := msgLayer.(*omci.CreateResponse)
2030 if !msgOk {
2031 logger.Errorw(ctx, "omci Msg layer could not be assigned for create response - handling stopped", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
2032 return fmt.Errorf("omci Msg layer could not be assigned for delete response - handling stopped: %s", mm.pDeviceHandler.deviceID)
2033 }
2034 logger.Debugw(ctx, "OMCI create response Data", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "data-fields": msgObj})
2035 switch msgObj.EntityClass {
2036 case me.EthernetFramePerformanceMonitoringHistoryDataUpstreamClassID,
2037 me.EthernetFramePerformanceMonitoringHistoryDataDownstreamClassID,
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002038 me.EthernetPerformanceMonitoringHistoryDataClassID,
2039 me.FecPerformanceMonitoringHistoryDataClassID,
2040 me.GemPortNetworkCtpPerformanceMonitoringHistoryDataClassID:
Girish Gowdrae0140f02021-02-02 16:55:09 -08002041 // If the result is me.InstanceExists it means the entity was already created. It is ok handled that as success
2042 if msgObj.Result == me.Success || msgObj.Result == me.InstanceExists {
2043 mm.l2PmCreateOrDeleteResponseChan <- true
2044 } else {
2045 logger.Warnw(ctx, "failed to create me", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "class-id": msgObj.EntityClass})
2046 mm.l2PmCreateOrDeleteResponseChan <- false
2047 }
2048 return nil
2049 default:
2050 logger.Errorw(ctx, "unhandled omci create response message",
2051 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "class-id": msgObj.EntityClass})
2052 }
2053 return fmt.Errorf("unhandled-omci-create-response-message-%v", mm.pDeviceHandler.deviceID)
2054}
2055
2056func (mm *onuMetricsManager) handleOmciDeleteResponseMessage(ctx context.Context, msg OmciMessage) error {
2057 msgLayer := (*msg.OmciPacket).Layer(omci.LayerTypeDeleteResponse)
2058 if msgLayer == nil {
2059 logger.Errorw(ctx, "omci Msg layer could not be detected for delete response - handling stopped", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
2060 return fmt.Errorf("omci Msg layer could not be detected for create response - handling stopped: %s", mm.pDeviceHandler.deviceID)
2061 }
2062 msgObj, msgOk := msgLayer.(*omci.DeleteResponse)
2063 if !msgOk {
2064 logger.Errorw(ctx, "omci Msg layer could not be assigned for delete response - handling stopped", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
2065 return fmt.Errorf("omci Msg layer could not be assigned for delete response - handling stopped: %s", mm.pDeviceHandler.deviceID)
2066 }
2067 logger.Debugw(ctx, "OMCI delete response Data", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "data-fields": msgObj})
2068 switch msgObj.EntityClass {
2069 case me.EthernetFramePerformanceMonitoringHistoryDataUpstreamClassID,
2070 me.EthernetFramePerformanceMonitoringHistoryDataDownstreamClassID,
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002071 me.EthernetPerformanceMonitoringHistoryDataClassID,
2072 me.FecPerformanceMonitoringHistoryDataClassID,
2073 me.GemPortNetworkCtpPerformanceMonitoringHistoryDataClassID:
Girish Gowdrae0140f02021-02-02 16:55:09 -08002074 // If the result is me.UnknownInstance it means the entity was already deleted. It is ok handled that as success
2075 if msgObj.Result == me.Success || msgObj.Result == me.UnknownInstance {
2076 mm.l2PmCreateOrDeleteResponseChan <- true
2077 } else {
2078 logger.Warnw(ctx, "failed to delete me", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "class-id": msgObj.EntityClass})
2079 mm.l2PmCreateOrDeleteResponseChan <- false
2080 }
2081 return nil
2082 default:
2083 logger.Errorw(ctx, "unhandled omci delete response message",
2084 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "class-id": msgObj.EntityClass})
2085 }
2086 return fmt.Errorf("unhandled-omci-delete-response-message-%v", mm.pDeviceHandler.deviceID)
2087}
2088
2089func (mm *onuMetricsManager) generateTicks(ctx context.Context) {
2090 for {
2091 select {
2092 case <-time.After(L2PmCollectionInterval * time.Second):
2093 go func() {
2094 if err := mm.pAdaptFsm.pFsm.Event(l2PmEventTick); err != nil {
2095 logger.Errorw(ctx, "error calling event", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "err": err})
2096 }
2097 }()
2098 case <-mm.stopTicks:
2099 logger.Infow(ctx, "stopping ticks", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
2100 return
2101 }
2102 }
2103}
2104
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002105func (mm *onuMetricsManager) handleMetricsPublish(ctx context.Context, metricName string, metricInfoSlice []*voltha.MetricInformation) {
2106 // Publish metrics if it is valid
2107 if metricInfoSlice != nil {
2108 mm.publishMetrics(ctx, metricInfoSlice)
2109 } else {
2110 // If collectAttempts exceeds L2PmCollectAttempts then remove it from activeL2Pms
2111 // slice so that we do not collect data from that PM ME anymore
2112 mm.onuMetricsManagerLock.Lock()
2113 mm.groupMetricMap[metricName].collectAttempts++
2114 if mm.groupMetricMap[metricName].collectAttempts > L2PmCollectAttempts {
2115 mm.activeL2Pms = mm.removeIfFoundString(mm.activeL2Pms, metricName)
2116 }
2117 logger.Warnw(ctx, "state collect data - no metrics collected",
2118 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "metricName": metricName, "collectAttempts": mm.groupMetricMap[metricName].collectAttempts})
2119 mm.onuMetricsManagerLock.Unlock()
2120 }
2121}
2122
2123func (mm *onuMetricsManager) populateGroupSpecificMetrics(ctx context.Context, mEnt *me.ManagedEntity, classID me.ClassID, entityID uint16,
2124 meAttributes me.AttributeValueMap, data map[string]float32, intervalEndTime *int) error {
2125 var grpFunc groupMetricPopulateFunc
2126 switch classID {
2127 case me.EthernetFramePerformanceMonitoringHistoryDataUpstreamClassID, me.EthernetFramePerformanceMonitoringHistoryDataDownstreamClassID:
2128 grpFunc = mm.populateEthernetBridgeHistoryMetrics
2129 case me.EthernetPerformanceMonitoringHistoryDataClassID:
2130 grpFunc = mm.populateEthernetUniHistoryMetrics
2131 case me.FecPerformanceMonitoringHistoryDataClassID:
2132 grpFunc = mm.populateFecHistoryMetrics
2133 case me.GemPortNetworkCtpPerformanceMonitoringHistoryDataClassID:
2134 grpFunc = mm.populateGemPortMetrics
2135 default:
2136 return fmt.Errorf("unknown-classid-%v", classID)
2137 }
2138
2139 size := 0
2140 requestedAttributes := make(me.AttributeValueMap)
2141 for _, v := range mEnt.GetAttributeDefinitions() {
2142 if (v.Size + size) <= MaxL2PMGetPayLoadSize {
2143 requestedAttributes[v.Name] = v.DefValue
2144 size = v.Size + size
2145 } else { // We exceeded the allow omci get size
2146 // Let's collect the attributes via get now and collect remaining in the next iteration
2147 if err := grpFunc(ctx, classID, entityID, meAttributes, requestedAttributes, data, intervalEndTime); err != nil {
2148 logger.Errorw(ctx, "error during metric collection",
2149 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID, "err": err})
2150 return err
2151 }
2152 size = 0 // reset size
2153 requestedAttributes = make(me.AttributeValueMap) // reset map
2154 }
2155 }
2156 // Collect the omci get attributes for the last bunch of attributes.
2157 if len(requestedAttributes) > 0 {
2158 if err := grpFunc(ctx, classID, entityID, meAttributes, requestedAttributes, data, intervalEndTime); err != nil {
2159 logger.Errorw(ctx, "error during metric collection",
2160 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID, "err": err})
2161 return err
2162 }
2163 }
2164 return nil
2165}
2166
2167func (mm *onuMetricsManager) populateOnuMetricInfo(title string, data map[string]float32) voltha.MetricInformation {
2168 metricsContext := make(map[string]string)
2169 metricsContext["onuID"] = fmt.Sprintf("%d", mm.pDeviceHandler.device.ProxyAddress.OnuId)
2170 metricsContext["intfID"] = fmt.Sprintf("%d", mm.pDeviceHandler.device.ProxyAddress.ChannelId)
2171 metricsContext["devicetype"] = mm.pDeviceHandler.DeviceType
2172
2173 raisedTs := time.Now().Unix()
2174 mmd := voltha.MetricMetaData{
2175 Title: title,
2176 Ts: float64(raisedTs),
2177 Context: metricsContext,
2178 DeviceId: mm.pDeviceHandler.deviceID,
2179 LogicalDeviceId: mm.pDeviceHandler.logicalDeviceID,
2180 SerialNo: mm.pDeviceHandler.device.SerialNumber,
2181 }
2182
2183 // create slice of metrics given that there could be more than one VEIP instance
2184 metricInfo := voltha.MetricInformation{Metadata: &mmd, Metrics: data}
2185 return metricInfo
2186}
2187
2188func (mm *onuMetricsManager) updateAndValidateIntervalEndTime(ctx context.Context, entityID uint16, meAttributes me.AttributeValueMap, intervalEndTime *int) bool {
2189 valid := false
2190 if *intervalEndTime == -1 { // first time
2191 // Update the interval end time
2192 if val, ok := meAttributes["IntervalEndTime"]; ok && val != nil {
2193 *intervalEndTime = int(meAttributes["IntervalEndTime"].(uint8))
2194 valid = true
2195 }
2196 } else {
2197 var currIntervalEndTime int
2198 if val, ok := meAttributes["IntervalEndTime"]; ok && val != nil {
2199 currIntervalEndTime = int(meAttributes["IntervalEndTime"].(uint8))
2200 }
2201 if currIntervalEndTime != *intervalEndTime { // interval end time changed during metric collection
2202 logger.Errorw(ctx, "interval end time changed during metrics collection for ethernet pm history data",
2203 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID,
2204 "currIntervalEndTime": *intervalEndTime, "newIntervalEndTime": currIntervalEndTime})
2205 } else {
2206 valid = true
2207 }
2208 }
2209 return valid
2210}
2211
2212func (mm *onuMetricsManager) waitForResponseOrTimeout(ctx context.Context, create bool, instID uint16, meClassName string) bool {
2213 logger.Debugw(ctx, "waitForResponseOrTimeout", log.Fields{"create": create, "instID": instID, "meClassName": meClassName})
2214 select {
2215 case resp := <-mm.l2PmCreateOrDeleteResponseChan:
Girish Gowdra36ccf7d2021-02-25 20:42:51 -08002216 logger.Debugw(ctx, "received l2 pm me response",
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002217 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "resp": resp, "create": create, "meClassName": meClassName, "instID": instID})
2218 return resp
Girish Gowdra0b235842021-03-09 13:06:46 -08002219 case <-time.After(time.Duration(mm.pDeviceHandler.pOpenOnuAc.omciTimeout) * time.Second):
Girish Gowdra36ccf7d2021-02-25 20:42:51 -08002220 logger.Errorw(ctx, "timeout waiting for l2 pm me response",
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002221 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "resp": false, "create": create, "meClassName": meClassName, "instID": instID})
2222 }
2223 return false
2224}
2225
2226func (mm *onuMetricsManager) initializeGroupMetric(grpMtrcs map[string]voltha.PmConfig_PmType, grpName string, grpEnabled bool, grpFreq uint32) {
2227 var pmConfigSlice []*voltha.PmConfig
2228 for k, v := range grpMtrcs {
Girish Gowdra0e533642021-03-02 22:02:51 -08002229 pmConfigSlice = append(pmConfigSlice,
2230 &voltha.PmConfig{
2231 Name: k,
2232 Type: v,
2233 Enabled: grpEnabled && mm.pDeviceHandler.pOpenOnuAc.metricsEnabled,
2234 SampleFreq: grpFreq})
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002235 }
2236 groupMetric := voltha.PmGroupConfig{
2237 GroupName: grpName,
2238 Enabled: grpEnabled && mm.pDeviceHandler.pOpenOnuAc.metricsEnabled,
2239 GroupFreq: grpFreq,
2240 Metrics: pmConfigSlice,
2241 }
2242 mm.pDeviceHandler.pmConfigs.Groups = append(mm.pDeviceHandler.pmConfigs.Groups, &groupMetric)
2243
2244}
2245
2246func (mm *onuMetricsManager) initializeL2PmFsm(ctx context.Context, aCommChannel chan Message) error {
2247 mm.pAdaptFsm = NewAdapterFsm("L2PmFSM", mm.pDeviceHandler.deviceID, aCommChannel)
2248 if mm.pAdaptFsm == nil {
2249 logger.Errorw(ctx, "L2PMFsm AdapterFsm could not be instantiated!!", log.Fields{
2250 "device-id": mm.pDeviceHandler.deviceID})
2251 return fmt.Errorf("nil-adapter-fsm")
2252 }
2253 // L2 PM FSM related state machine
2254 mm.pAdaptFsm.pFsm = fsm.NewFSM(
2255 l2PmStNull,
2256 fsm.Events{
2257 {Name: l2PmEventInit, Src: []string{l2PmStNull}, Dst: l2PmStStarting},
2258 {Name: l2PmEventTick, Src: []string{l2PmStStarting}, Dst: l2PmStSyncTime},
2259 {Name: l2PmEventTick, Src: []string{l2PmStIdle, l2PmEventDeleteMe, l2PmEventAddMe}, Dst: l2PmStCollectData},
2260 {Name: l2PmEventSuccess, Src: []string{l2PmStSyncTime, l2PmStCreatePmMe, l2PmStDeletePmMe, l2PmStCollectData}, Dst: l2PmStIdle},
2261 {Name: l2PmEventFailure, Src: []string{l2PmStCreatePmMe, l2PmStDeletePmMe, l2PmStCollectData}, Dst: l2PmStIdle},
2262 {Name: l2PmEventFailure, Src: []string{l2PmStSyncTime}, Dst: l2PmStSyncTime},
2263 {Name: l2PmEventAddMe, Src: []string{l2PmStIdle}, Dst: l2PmStCreatePmMe},
2264 {Name: l2PmEventDeleteMe, Src: []string{l2PmStIdle}, Dst: l2PmStDeletePmMe},
2265 {Name: l2PmEventStop, Src: []string{l2PmStNull, l2PmStStarting, l2PmStSyncTime, l2PmStIdle, l2PmStCreatePmMe, l2PmStDeletePmMe, l2PmStCollectData}, Dst: l2PmStNull},
2266 },
2267 fsm.Callbacks{
2268 "enter_state": func(e *fsm.Event) { mm.pAdaptFsm.logFsmStateChange(ctx, e) },
2269 "enter_" + l2PmStNull: func(e *fsm.Event) { mm.l2PMFsmNull(ctx, e) },
2270 "enter_" + l2PmStIdle: func(e *fsm.Event) { mm.l2PMFsmIdle(ctx, e) },
2271 "enter_" + l2PmStStarting: func(e *fsm.Event) { mm.l2PMFsmStarting(ctx, e) },
2272 "enter_" + l2PmStSyncTime: func(e *fsm.Event) { mm.l2PMFsmSyncTime(ctx, e) },
2273 "enter_" + l2PmStCollectData: func(e *fsm.Event) { mm.l2PmFsmCollectData(ctx, e) },
2274 "enter_" + l2PmStCreatePmMe: func(e *fsm.Event) { mm.l2PmFsmCreatePM(ctx, e) },
2275 "enter_" + l2PmStDeletePmMe: func(e *fsm.Event) { mm.l2PmFsmDeletePM(ctx, e) },
2276 },
2277 )
2278 return nil
2279}
2280
2281func (mm *onuMetricsManager) initializeAllGroupMetrics() {
2282 mm.pDeviceHandler.pmConfigs = &voltha.PmConfigs{}
2283 mm.pDeviceHandler.pmConfigs.Id = mm.pDeviceHandler.deviceID
2284 mm.pDeviceHandler.pmConfigs.DefaultFreq = DefaultMetricCollectionFrequency
2285 mm.pDeviceHandler.pmConfigs.Grouped = GroupMetricEnabled
2286 mm.pDeviceHandler.pmConfigs.FreqOverride = DefaultFrequencyOverrideEnabled
2287
2288 // Populate group metrics.
2289 // Lets populate irrespective of GroupMetricEnabled is true or not.
2290 // The group metrics collection will decided on this flag later
2291
2292 mm.initializeGroupMetric(OpticalPowerGroupMetrics, OpticalPowerGroupMetricName,
2293 OpticalPowerGroupMetricEnabled, OpticalPowerMetricGroupCollectionFrequency)
2294
2295 mm.initializeGroupMetric(UniStatusGroupMetrics, UniStatusGroupMetricName,
2296 UniStatusGroupMetricEnabled, UniStatusMetricGroupCollectionFrequency)
2297
2298 // classical l2 pm counter start
2299
2300 mm.initializeGroupMetric(EthernetBridgeHistory, EthernetBridgeHistoryName,
2301 EthernetBridgeHistoryEnabled, EthernetBridgeHistoryFrequency)
2302
2303 mm.initializeGroupMetric(EthernetUniHistory, EthernetUniHistoryName,
2304 EthernetUniHistoryEnabled, EthernetUniHistoryFrequency)
2305
2306 mm.initializeGroupMetric(FecHistory, FecHistoryName,
2307 FecHistoryEnabled, FecHistoryFrequency)
2308
2309 mm.initializeGroupMetric(GemPortHistory, GemPortHistoryName,
2310 GemPortHistoryEnabled, GemPortHistoryFrequency)
2311
2312 // classical l2 pm counter end
2313
2314 // Add standalone metric (if present) after this (will be added to dh.pmConfigs.Metrics)
2315}
2316
2317func (mm *onuMetricsManager) populateLocalGroupMetricData(ctx context.Context) {
2318 // Populate local group metric structures
2319 for _, g := range mm.pDeviceHandler.pmConfigs.Groups {
2320 mm.groupMetricMap[g.GroupName] = &groupMetric{
2321 groupName: g.GroupName,
2322 enabled: g.Enabled,
2323 frequency: g.GroupFreq,
2324 }
2325 switch g.GroupName {
2326 case OpticalPowerGroupMetricName:
2327 mm.groupMetricMap[g.GroupName].metricMap = OpticalPowerGroupMetrics
2328 case UniStatusGroupMetricName:
2329 mm.groupMetricMap[g.GroupName].metricMap = UniStatusGroupMetrics
2330 case EthernetBridgeHistoryName:
2331 mm.groupMetricMap[g.GroupName].metricMap = EthernetBridgeHistory
2332 mm.groupMetricMap[g.GroupName].isL2PMCounter = true
2333 case EthernetUniHistoryName:
2334 mm.groupMetricMap[g.GroupName].metricMap = EthernetUniHistory
2335 mm.groupMetricMap[g.GroupName].isL2PMCounter = true
2336 case FecHistoryName:
2337 mm.groupMetricMap[g.GroupName].metricMap = FecHistory
2338 mm.groupMetricMap[g.GroupName].isL2PMCounter = true
2339 case GemPortHistoryName:
2340 mm.groupMetricMap[g.GroupName].metricMap = GemPortHistory
2341 mm.groupMetricMap[g.GroupName].isL2PMCounter = true
2342 default:
2343 logger.Errorw(ctx, "unhandled-group-name", log.Fields{"groupName": g.GroupName})
2344 }
2345 }
2346
2347 // Populate local standalone metric structures
2348 for _, m := range mm.pDeviceHandler.pmConfigs.Metrics {
2349 mm.standaloneMetricMap[m.Name] = &standaloneMetric{
2350 metricName: m.Name,
2351 enabled: m.Enabled,
2352 frequency: m.SampleFreq,
2353 }
2354 switch m.Name {
2355 // None exist as of now. Add when available.
2356 default:
2357 logger.Errorw(ctx, "unhandled-metric-name", log.Fields{"metricName": m.Name})
2358 }
2359 }
2360}
2361
2362func (mm *onuMetricsManager) AddGemPortForPerfMonitoring(gemPortNTPInstID uint16) {
2363 mm.onuMetricsManagerLock.Lock()
2364 defer mm.onuMetricsManagerLock.Unlock()
2365 // mark the instance for addition
Girish Gowdra0e533642021-03-02 22:02:51 -08002366 mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd = mm.appendIfMissingUnt16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd, gemPortNTPInstID)
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002367 // If the instance presence toggles too soon, we need to remove it from gemPortNCTPPerfHistInstToDelete slice
Girish Gowdra0e533642021-03-02 22:02:51 -08002368 mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete = mm.removeIfFoundUint16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete, gemPortNTPInstID)
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002369
2370 mm.l2PmToAdd = mm.appendIfMissingString(mm.l2PmToAdd, GemPortHistoryName)
2371 // We do not need to remove from l2PmToDelete slice as we could have Add and Delete of
2372 // GemPortPerfHistory ME simultaneously for different instances of the ME.
2373 // The creation or deletion of an instance is decided based on its presence in gemPortNCTPPerfHistInstToDelete or
2374 // gemPortNCTPPerfHistInstToAdd slice
2375}
2376
2377func (mm *onuMetricsManager) RemoveGemPortForPerfMonitoring(gemPortNTPInstID uint16) {
2378 mm.onuMetricsManagerLock.Lock()
2379 defer mm.onuMetricsManagerLock.Unlock()
Girish Gowdra0e533642021-03-02 22:02:51 -08002380 mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete = mm.appendIfMissingUnt16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete, gemPortNTPInstID)
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002381 // If the instance presence toggles too soon, we need to remove it from gemPortNCTPPerfHistInstToAdd slice
Girish Gowdra0e533642021-03-02 22:02:51 -08002382 mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd = mm.removeIfFoundUint16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd, gemPortNTPInstID)
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002383
2384 mm.l2PmToDelete = mm.appendIfMissingString(mm.l2PmToDelete, GemPortHistoryName)
2385 // We do not need to remove from l2PmToAdd slice as we could have Add and Delete of
2386 // GemPortPerfHistory ME simultaneously for different instances of the ME.
2387 // The creation or deletion of an instance is decided based on its presence in gemPortNCTPPerfHistInstToDelete or
2388 // gemPortNCTPPerfHistInstToAdd slice
2389}
2390
Girish Gowdra36ccf7d2021-02-25 20:42:51 -08002391func (mm *onuMetricsManager) updateGemPortNTPInstanceToAddForPerfMonitoring(ctx context.Context) {
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002392 if mm.pDeviceHandler.pOnuTP != nil {
2393 gemPortInstIDs := mm.pDeviceHandler.pOnuTP.GetAllBidirectionalGemPortIDsForOnu()
Girish Gowdra36ccf7d2021-02-25 20:42:51 -08002394 // NOTE: It is expected that caller of this function has acquired the required mutex for synchronization purposes
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002395 for _, v := range gemPortInstIDs {
2396 // mark the instance for addition
Girish Gowdra0e533642021-03-02 22:02:51 -08002397 mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd = mm.appendIfMissingUnt16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd, v)
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002398 // If the instance presence toggles too soon, we need to remove it from gemPortNCTPPerfHistInstToDelete slice
Girish Gowdra0e533642021-03-02 22:02:51 -08002399 mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete = mm.removeIfFoundUint16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete, v)
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002400 }
Girish Gowdra36ccf7d2021-02-25 20:42:51 -08002401 logger.Debugw(ctx, "updateGemPortNTPInstanceToAddForPerfMonitoring",
Girish Gowdra0e533642021-03-02 22:02:51 -08002402 log.Fields{"deviceID": mm.pDeviceHandler.deviceID, "gemToAdd": mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd, "gemToDel": mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete})
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002403 }
2404}
2405
Girish Gowdra36ccf7d2021-02-25 20:42:51 -08002406func (mm *onuMetricsManager) updateGemPortNTPInstanceToDeleteForPerfMonitoring(ctx context.Context) {
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002407 if mm.pDeviceHandler.pOnuTP != nil {
2408 gemPortInstIDs := mm.pDeviceHandler.pOnuTP.GetAllBidirectionalGemPortIDsForOnu()
Girish Gowdra36ccf7d2021-02-25 20:42:51 -08002409 // NOTE: It is expected that caller of this function has acquired the required mutex for synchronization purposes
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002410 for _, v := range gemPortInstIDs {
Girish Gowdra0e533642021-03-02 22:02:51 -08002411 mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete = mm.appendIfMissingUnt16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete, v)
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002412 // If the instance presence toggles too soon, we need to remove it from gemPortNCTPPerfHistInstToAdd slice
Girish Gowdra0e533642021-03-02 22:02:51 -08002413 mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd = mm.removeIfFoundUint16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd, v)
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002414 }
2415 }
Girish Gowdra36ccf7d2021-02-25 20:42:51 -08002416 logger.Debugw(ctx, "updateGemPortNTPInstanceToDeleteForPerfMonitoring",
Girish Gowdra0e533642021-03-02 22:02:51 -08002417 log.Fields{"deviceID": mm.pDeviceHandler.deviceID, "gemToAdd": mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd, "gemToDel": mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete})
2418}
2419
2420// restorePmData restores any PM data available on the KV store to local cache
2421func (mm *onuMetricsManager) restorePmData(ctx context.Context) error {
2422 logger.Debugw(ctx, "restorePmData - start", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
2423 if mm.pmKvStore == nil {
2424 logger.Errorw(ctx, "pmKvStore not set - abort", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
2425 return fmt.Errorf(fmt.Sprintf("pmKvStore-not-set-abort-%s", mm.pDeviceHandler.deviceID))
2426 }
2427 var errorsList []error
2428 for groupName, group := range mm.groupMetricMap {
2429 group.pmMEData = &pmMEData{}
2430 Value, err := mm.pmKvStore.Get(ctx, groupName)
2431 if err == nil {
2432 if Value != nil {
2433 logger.Debugw(ctx, "PM data read",
2434 log.Fields{"Key": Value.Key, "device-id": mm.pDeviceHandler.deviceID})
2435 tmpBytes, _ := kvstore.ToByte(Value.Value)
2436
2437 if err = json.Unmarshal(tmpBytes, &group.pmMEData); err != nil {
2438 logger.Errorw(ctx, "unable to unmarshal PM data", log.Fields{"error": err, "device-id": mm.pDeviceHandler.deviceID})
2439 errorsList = append(errorsList, fmt.Errorf(fmt.Sprintf("unable-to-unmarshal-PM-data-%s-for-group-%s", mm.pDeviceHandler.deviceID, groupName)))
2440 continue
2441 }
2442 logger.Debugw(ctx, "restorePmData - success", log.Fields{"pmData": group.pmMEData, "groupName": groupName, "device-id": mm.pDeviceHandler.deviceID})
2443 } else {
2444 logger.Debugw(ctx, "no PM data found", log.Fields{"groupName": groupName, "device-id": mm.pDeviceHandler.deviceID})
2445 continue
2446 }
2447 } else {
2448 logger.Errorw(ctx, "restorePmData - fail", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": groupName, "err": err})
2449 errorsList = append(errorsList, fmt.Errorf(fmt.Sprintf("unable-to-read-from-KVstore-%s-for-group-%s", mm.pDeviceHandler.deviceID, groupName)))
2450 continue
2451 }
2452 }
2453 if len(errorsList) > 0 {
2454 return fmt.Errorf("errors-restoring-pm-data-for-one-or-more-groups--errors:%v", errorsList)
2455 }
2456 logger.Debugw(ctx, "restorePmData - complete success", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
2457 return nil
2458}
2459
2460// getPmData gets pmMEData from cache. Since we have write through cache implementation for pmMEData,
2461// the data must be available in cache.
2462// Note, it is expected that caller of this function manages the required synchronization (like using locks etc.).
2463func (mm *onuMetricsManager) getPmData(ctx context.Context, groupName string) (*pmMEData, error) {
2464 if grp, ok := mm.groupMetricMap[groupName]; ok {
2465 return grp.pmMEData, nil
2466 }
2467 // Data not in cache, try to fetch from kv store.
2468 data := &pmMEData{}
2469 if mm.pmKvStore == nil {
2470 logger.Errorw(ctx, "pmKvStore not set - abort", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
2471 return data, fmt.Errorf("pmKvStore not set. device-id - %s", mm.pDeviceHandler.deviceID)
2472 }
2473 Value, err := mm.pmKvStore.Get(ctx, groupName)
2474 if err == nil {
2475 if Value != nil {
2476 logger.Debugw(ctx, "PM data read",
2477 log.Fields{"Key": Value.Key, "device-id": mm.pDeviceHandler.deviceID})
2478 tmpBytes, _ := kvstore.ToByte(Value.Value)
2479
2480 if err = json.Unmarshal(tmpBytes, data); err != nil {
2481 logger.Errorw(ctx, "unable to unmarshal PM data", log.Fields{"error": err, "device-id": mm.pDeviceHandler.deviceID})
2482 return data, err
2483 }
2484 logger.Debugw(ctx, "PM data", log.Fields{"pmData": data, "groupName": groupName, "device-id": mm.pDeviceHandler.deviceID})
2485 } else {
2486 logger.Debugw(ctx, "no PM data found", log.Fields{"groupName": groupName, "device-id": mm.pDeviceHandler.deviceID})
2487 return data, err
2488 }
2489 } else {
2490 logger.Errorw(ctx, "unable to read from KVstore", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
2491 return data, err
2492 }
2493
2494 return data, nil
2495}
2496
2497// updatePmData update pmMEData to store. It is write through cache, i.e., write to cache first and then update store
2498func (mm *onuMetricsManager) updatePmData(ctx context.Context, groupName string, meInstanceID uint16, pmAction string) error {
2499 logger.Debugw(ctx, "updatePmData - start", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": groupName, "entityID": meInstanceID, "pmAction": pmAction})
2500 mm.onuMetricsManagerLock.Lock()
2501 defer mm.onuMetricsManagerLock.Unlock()
2502
2503 if mm.pmKvStore == nil {
2504 logger.Errorw(ctx, "pmKvStore not set - abort", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
2505 return fmt.Errorf(fmt.Sprintf("pmKvStore-not-set-abort-%s", mm.pDeviceHandler.deviceID))
2506 }
2507
2508 pmMEData, err := mm.getPmData(ctx, groupName)
2509 if err != nil || pmMEData == nil {
2510 // error already logged in called function.
2511 return err
2512 }
2513 switch pmAction {
2514 case cPmAdd:
2515 pmMEData.InstancesToAdd = mm.appendIfMissingUnt16(pmMEData.InstancesToAdd, meInstanceID)
2516 pmMEData.InstancesToDelete = mm.removeIfFoundUint16(pmMEData.InstancesToDelete, meInstanceID)
2517 pmMEData.InstancesActive = mm.removeIfFoundUint16(pmMEData.InstancesActive, meInstanceID)
2518 case cPmAdded:
2519 pmMEData.InstancesActive = mm.appendIfMissingUnt16(pmMEData.InstancesActive, meInstanceID)
2520 pmMEData.InstancesToAdd = mm.removeIfFoundUint16(pmMEData.InstancesToAdd, meInstanceID)
2521 pmMEData.InstancesToDelete = mm.removeIfFoundUint16(pmMEData.InstancesToDelete, meInstanceID)
2522 case cPmRemove:
2523 pmMEData.InstancesToDelete = mm.appendIfMissingUnt16(pmMEData.InstancesToDelete, meInstanceID)
2524 pmMEData.InstancesToAdd = mm.removeIfFoundUint16(pmMEData.InstancesToAdd, meInstanceID)
2525 pmMEData.InstancesActive = mm.removeIfFoundUint16(pmMEData.InstancesActive, meInstanceID)
2526 case cPmRemoved:
2527 pmMEData.InstancesToDelete = mm.removeIfFoundUint16(pmMEData.InstancesToDelete, meInstanceID)
2528 pmMEData.InstancesToAdd = mm.removeIfFoundUint16(pmMEData.InstancesToAdd, meInstanceID)
2529 pmMEData.InstancesActive = mm.removeIfFoundUint16(pmMEData.InstancesActive, meInstanceID)
2530 default:
2531 logger.Errorw(ctx, "unknown pm action", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "pmAction": pmAction, "groupName": groupName})
2532 return fmt.Errorf(fmt.Sprintf("unknown-pm-action-deviceid-%s-groupName-%s-pmaction-%s", mm.pDeviceHandler.deviceID, groupName, pmAction))
2533 }
2534 // write through cache
2535 mm.groupMetricMap[groupName].pmMEData = pmMEData
2536
2537 Value, err := json.Marshal(*pmMEData)
2538 if err != nil {
2539 logger.Errorw(ctx, "unable to marshal PM data", log.Fields{"groupName": groupName, "pmAction": pmAction, "pmData": *pmMEData, "err": err})
2540 return err
2541 }
2542 // Update back to kv store
2543 if err = mm.pmKvStore.Put(ctx, groupName, Value); err != nil {
2544 logger.Errorw(ctx, "unable to put PM data to kv store", log.Fields{"groupName": groupName, "pmData": *pmMEData, "pmAction": pmAction, "err": err})
2545 return err
2546 }
2547 logger.Debugw(ctx, "updatePmData - success", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": groupName, "pmData": *pmMEData, "pmAction": pmAction})
2548
2549 return nil
2550}
2551
2552// clearPmGroupData cleans PM Group data from store
2553func (mm *onuMetricsManager) clearPmGroupData(ctx context.Context) error {
2554 mm.onuMetricsManagerLock.Lock()
2555 defer mm.onuMetricsManagerLock.Unlock()
2556 logger.Debugw(ctx, "clearPmGroupData - start", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
2557 if mm.pmKvStore == nil {
2558 logger.Errorw(ctx, "pmKvStore not set - abort", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
2559 return fmt.Errorf(fmt.Sprintf("pmKvStore-not-set-abort-%s", mm.pDeviceHandler.deviceID))
2560 }
2561
2562 for n := range mm.groupMetricMap {
2563 if err := mm.pmKvStore.Delete(ctx, n); err != nil {
2564 logger.Errorw(ctx, "clearPmGroupData - fail", log.Fields{"deviceID": mm.pDeviceHandler.deviceID, "groupName": n, "err": err})
2565 // do not abort this procedure. continue to delete next group.
2566 } else {
2567 logger.Debugw(ctx, "clearPmGroupData - success", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": n})
2568 }
2569 }
2570
2571 return nil
2572}
2573
2574// clearAllPmData clears all PM data associated with the device from KV store
2575func (mm *onuMetricsManager) clearAllPmData(ctx context.Context) error {
2576 mm.onuMetricsManagerLock.Lock()
2577 defer mm.onuMetricsManagerLock.Unlock()
2578 logger.Debugw(ctx, "clearAllPmData - start", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
2579 if mm.pmKvStore == nil {
2580 logger.Errorw(ctx, "pmKvStore not set - abort", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
2581 return fmt.Errorf(fmt.Sprintf("pmKvStore-not-set-abort-%s", mm.pDeviceHandler.deviceID))
2582 }
Holger Hildebrandt44a0d4f2021-03-18 14:00:54 +00002583 var value error
2584 for n := range mm.groupMetricMap {
2585 if err := mm.pmKvStore.Delete(ctx, n); err != nil {
2586 logger.Errorw(ctx, "clearPmGroupData - fail", log.Fields{"deviceID": mm.pDeviceHandler.deviceID, "groupName": n, "err": err})
2587 value = err
2588 // do not abort this procedure - continue to delete next group.
2589 } else {
2590 logger.Debugw(ctx, "clearPmGroupData - success", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": n})
2591 }
Girish Gowdra0e533642021-03-02 22:02:51 -08002592 }
Holger Hildebrandt44a0d4f2021-03-18 14:00:54 +00002593 if value == nil {
2594 logger.Debugw(ctx, "clearAllPmData - success", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
2595 }
2596 return value
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002597}
2598
2599func (mm *onuMetricsManager) appendIfMissingString(slice []string, n string) []string {
Girish Gowdrae0140f02021-02-02 16:55:09 -08002600 for _, ele := range slice {
2601 if ele == n {
2602 return slice
2603 }
2604 }
2605 return append(slice, n)
2606}
2607
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002608func (mm *onuMetricsManager) removeIfFoundString(slice []string, n string) []string {
2609 for i, ele := range slice {
2610 if ele == n {
2611 return append(slice[:i], slice[i+1:]...)
2612 }
2613 }
2614 return slice
2615}
2616
2617func (mm *onuMetricsManager) appendIfMissingUnt16(slice []uint16, n uint16) []uint16 {
2618 for _, ele := range slice {
2619 if ele == n {
2620 return slice
2621 }
2622 }
2623 return append(slice, n)
2624}
2625
2626func (mm *onuMetricsManager) removeIfFoundUint16(slice []uint16, n uint16) []uint16 {
Girish Gowdrae0140f02021-02-02 16:55:09 -08002627 for i, ele := range slice {
2628 if ele == n {
2629 return append(slice[:i], slice[i+1:]...)
2630 }
2631 }
2632 return slice
Girish Gowdrae09a6202021-01-12 18:10:59 -08002633}
Girish Gowdrae20a4f62021-03-09 16:06:23 -08002634
2635func (mm *onuMetricsManager) twosComplementToSignedInt16(val uint16) int16 {
2636 var uint16MsbMask uint16 = 0x8000
2637 if val&uint16MsbMask == uint16MsbMask {
2638 return int16(^val+1) * -1
2639 }
2640
2641 return int16(val)
2642}
2643
2644/* // These are need in the future
2645
2646func (mm *onuMetricsManager) twosComplementToSignedInt32(val uint32) int32 {
2647 var uint32MsbMask uint32 = 0x80000000
2648 if val & uint32MsbMask == uint32MsbMask {
2649 return int32(^val + 1) * -1
2650 }
2651
2652 return int32(val)
2653}
2654
2655func (mm *onuMetricsManager) twosComplementToSignedInt64(val uint64) int64 {
2656 var uint64MsbMask uint64 = 0x8000000000000000
2657 if val & uint64MsbMask == uint64MsbMask {
2658 return int64(^val + 1) * -1
2659 }
2660
2661 return int64(val)
2662}
2663
2664*/