blob: 7d2e78790172473ee6c4453f8def2054280038ac [file] [log] [blame]
Girish Gowdrae09a6202021-01-12 18:10:59 -08001/*
2 * Copyright 2021-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//Package adaptercoreonu provides the utility for onu devices, flows and statistics
18package adaptercoreonu
19
20import (
21 "context"
Girish Gowdra0e533642021-03-02 22:02:51 -080022 "encoding/json"
Girish Gowdrae09a6202021-01-12 18:10:59 -080023 "fmt"
Holger Hildebrandt44a0d4f2021-03-18 14:00:54 +000024 "math"
25 "sync"
26 "time"
27
Girish Gowdrae0140f02021-02-02 16:55:09 -080028 "github.com/looplab/fsm"
Girish Gowdrae09a6202021-01-12 18:10:59 -080029 "github.com/opencord/omci-lib-go"
30 me "github.com/opencord/omci-lib-go/generated"
Girish Gowdra0e533642021-03-02 22:02:51 -080031 "github.com/opencord/voltha-lib-go/v4/pkg/db"
32 "github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore"
Girish Gowdrae09a6202021-01-12 18:10:59 -080033 "github.com/opencord/voltha-lib-go/v4/pkg/log"
34 "github.com/opencord/voltha-protos/v4/go/voltha"
Girish Gowdrae09a6202021-01-12 18:10:59 -080035)
36
Girish Gowdrae0140f02021-02-02 16:55:09 -080037const (
38 // events of L2 PM FSM
39 l2PmEventInit = "l2PmEventInit"
40 l2PmEventTick = "l2PmEventTick"
41 l2PmEventSuccess = "l2PmEventSuccess"
42 l2PmEventFailure = "l2PmEventFailure"
43 l2PmEventAddMe = "l2PmEventAddMe"
44 l2PmEventDeleteMe = "l2PmEventDeleteMe"
45 l2PmEventStop = "l2PmEventStop"
46)
47const (
48 // states of L2 PM FSM
49 l2PmStNull = "l2PmStNull"
50 l2PmStStarting = "l2PmStStarting"
51 l2PmStSyncTime = "l2PmStSyncTime"
52 l2PmStIdle = "l2PmStIdle"
53 l2PmStCreatePmMe = "l2PmStCreatePm"
54 l2PmStDeletePmMe = "l2PmStDeletePmMe"
55 l2PmStCollectData = "l2PmStCollectData"
56)
57
58const cL2PmFsmIdleState = l2PmStIdle
59
Girish Gowdra5a7c4922021-01-22 18:33:41 -080060// general constants used for overall Metric Collection management
61const (
62 DefaultMetricCollectionFrequency = 15 * 60 // unit in seconds. This setting can be changed from voltha NBI PmConfig configuration
63 GroupMetricEnabled = true // This is READONLY and cannot be changed from VOLTHA NBI
64 DefaultFrequencyOverrideEnabled = true // This is READONLY and cannot be changed from VOLTHA NBI
65 FrequencyGranularity = 5 // The frequency (in seconds) has to be multiple of 5. This setting cannot changed later.
66)
67
68// OpticalPowerGroupMetrics are supported optical pm names
69var OpticalPowerGroupMetrics = map[string]voltha.PmConfig_PmType{
Girish Gowdrae20a4f62021-03-09 16:06:23 -080070 "ani_g_instance_id": voltha.PmConfig_CONTEXT,
71 "transmit_power_dBm": voltha.PmConfig_GAUGE,
72 "receive_power_dBm": voltha.PmConfig_GAUGE,
Girish Gowdra5a7c4922021-01-22 18:33:41 -080073}
74
75// OpticalPowerGroupMetrics specific constants
76const (
Girish Gowdrae0140f02021-02-02 16:55:09 -080077 OpticalPowerGroupMetricName = "PON_Optical"
Girish Gowdra5a7c4922021-01-22 18:33:41 -080078 OpticalPowerGroupMetricEnabled = true // This setting can be changed from voltha NBI PmConfig configuration
79 OpticalPowerMetricGroupCollectionFrequency = 5 * 60 // unit in seconds. This setting can be changed from voltha NBI PmConfig configuration
80)
81
82// UniStatusGroupMetrics are supported UNI status names
83var UniStatusGroupMetrics = map[string]voltha.PmConfig_PmType{
84 "uni_port_no": voltha.PmConfig_CONTEXT,
Girish Gowdrada3a52f2021-03-17 11:24:11 -070085 "me_class_id": voltha.PmConfig_CONTEXT,
Girish Gowdra0e533642021-03-02 22:02:51 -080086 "entity_id": voltha.PmConfig_CONTEXT,
Girish Gowdrada3a52f2021-03-17 11:24:11 -070087 "sensed_type": voltha.PmConfig_GAUGE,
Girish Gowdra5a7c4922021-01-22 18:33:41 -080088 "oper_status": voltha.PmConfig_GAUGE,
89 "uni_admin_state": voltha.PmConfig_GAUGE,
90}
91
92// UniStatusGroupMetrics specific constants
93const (
Girish Gowdrae0140f02021-02-02 16:55:09 -080094 UniStatusGroupMetricName = "UNI_Status"
Girish Gowdra5a7c4922021-01-22 18:33:41 -080095 UniStatusGroupMetricEnabled = true // This setting can be changed from voltha NBI PmConfig configuration
96 UniStatusMetricGroupCollectionFrequency = 5 * 60 // unit in seconds. This setting can be changed from voltha NBI PmConfig configuration
97)
98
Girish Gowdrae0140f02021-02-02 16:55:09 -080099// *** Classical L2 PM Counters begin ***
100
101// EthernetBridgeHistory are supported ethernet bridge history counters fetched from
102// Ethernet Frame Performance Monitoring History Data Downstream and Ethernet Frame Performance Monitoring History Data Upstream MEs.
103var EthernetBridgeHistory = map[string]voltha.PmConfig_PmType{
104 "class_id": voltha.PmConfig_CONTEXT,
105 "entity_id": voltha.PmConfig_CONTEXT,
106 "interval_end_time": voltha.PmConfig_CONTEXT,
107 "parent_class_id": voltha.PmConfig_CONTEXT,
108 "parent_entity_id": voltha.PmConfig_CONTEXT,
109 "upstream": voltha.PmConfig_CONTEXT,
110
111 "drop_events": voltha.PmConfig_COUNTER,
112 "octets": voltha.PmConfig_COUNTER,
113 "packets": voltha.PmConfig_COUNTER,
114 "broadcast_packets": voltha.PmConfig_COUNTER,
115 "multicast_packets": voltha.PmConfig_COUNTER,
116 "crc_errored_packets": voltha.PmConfig_COUNTER,
117 "undersize_packets": voltha.PmConfig_COUNTER,
118 "oversize_packets": voltha.PmConfig_COUNTER,
119 "64_octets": voltha.PmConfig_COUNTER,
120 "65_to_127_octets": voltha.PmConfig_COUNTER,
121 "128_to_255_octets": voltha.PmConfig_COUNTER,
122 "256_to_511_octets": voltha.PmConfig_COUNTER,
123 "512_to_1023_octets": voltha.PmConfig_COUNTER,
124 "1024_to_1518_octets": voltha.PmConfig_COUNTER,
125}
126
127// EthernetUniHistory are supported ethernet uni history counters fetched from
128// Ethernet Performance Monitoring History Data ME.
129var EthernetUniHistory = map[string]voltha.PmConfig_PmType{
130 "class_id": voltha.PmConfig_CONTEXT,
131 "entity_id": voltha.PmConfig_CONTEXT,
132 "interval_end_time": voltha.PmConfig_CONTEXT,
133
134 "fcs_errors": voltha.PmConfig_COUNTER,
135 "excessive_collision_counter": voltha.PmConfig_COUNTER,
136 "late_collision_counter": voltha.PmConfig_COUNTER,
137 "frames_too_long": voltha.PmConfig_COUNTER,
138 "buffer_overflows_on_rx": voltha.PmConfig_COUNTER,
139 "buffer_overflows_on_tx": voltha.PmConfig_COUNTER,
140 "single_collision_frame_counter": voltha.PmConfig_COUNTER,
141 "multiple_collisions_frame_counter": voltha.PmConfig_COUNTER,
142 "sqe_counter": voltha.PmConfig_COUNTER,
143 "deferred_tx_counter": voltha.PmConfig_COUNTER,
144 "internal_mac_tx_error_counter": voltha.PmConfig_COUNTER,
145 "carrier_sense_error_counter": voltha.PmConfig_COUNTER,
146 "alignment_error_counter": voltha.PmConfig_COUNTER,
147 "internal_mac_rx_error_counter": voltha.PmConfig_COUNTER,
148}
149
Girish Gowdra5c5aaf42021-02-17 19:40:50 -0800150// FecHistory is supported FEC Performance Monitoring History Data related metrics
151var FecHistory = map[string]voltha.PmConfig_PmType{
152 "class_id": voltha.PmConfig_CONTEXT,
153 "entity_id": voltha.PmConfig_CONTEXT,
154 "interval_end_time": voltha.PmConfig_CONTEXT,
155
156 "corrected_bytes": voltha.PmConfig_COUNTER,
157 "corrected_code_words": voltha.PmConfig_COUNTER,
158 "uncorrectable_code_words": voltha.PmConfig_COUNTER,
159 "total_code_words": voltha.PmConfig_COUNTER,
160 "fec_seconds": voltha.PmConfig_COUNTER,
161}
162
163// GemPortHistory is supported GEM Port Network Ctp Performance Monitoring History Data
164// related metrics
165var GemPortHistory = map[string]voltha.PmConfig_PmType{
166 "class_id": voltha.PmConfig_CONTEXT,
167 "entity_id": voltha.PmConfig_CONTEXT,
168 "interval_end_time": voltha.PmConfig_CONTEXT,
169
170 "transmitted_gem_frames": voltha.PmConfig_COUNTER,
171 "received_gem_frames": voltha.PmConfig_COUNTER,
172 "received_payload_bytes": voltha.PmConfig_COUNTER,
173 "transmitted_payload_bytes": voltha.PmConfig_COUNTER,
174 "encryption_key_errors": voltha.PmConfig_COUNTER,
175}
176
Girish Gowdrae0140f02021-02-02 16:55:09 -0800177// Constants specific for L2 PM collection
178const (
179 L2PmCollectionInterval = 15 * 60 // Unit in seconds. Do not change this as this fixed by OMCI specification for L2 PM counters
180 SyncTimeRetryInterval = 15 // Unit seconds
181 L2PmCreateAttempts = 3
Girish Gowdra36ccf7d2021-02-25 20:42:51 -0800182 L2PmDeleteAttempts = 3
Girish Gowdrae0140f02021-02-02 16:55:09 -0800183 L2PmCollectAttempts = 3
Girish Gowdra453750f2021-02-16 16:36:46 -0800184 // Per Table 11.2.9-1 – OMCI baseline message limitations in G.988 spec, the max GET Response
185 // payload size is 25. We define 24 (one less) to allow for dynamic insertion of IntervalEndTime
186 // attribute (1 byte) in L2 PM GET Requests.
187 MaxL2PMGetPayLoadSize = 24
Girish Gowdrae0140f02021-02-02 16:55:09 -0800188)
189
190// EthernetUniHistoryName specific constants
191const (
192 EthernetBridgeHistoryName = "Ethernet_Bridge_Port_History"
193 EthernetBridgeHistoryEnabled = true // This setting can be changed from voltha NBI PmConfig configuration
194 EthernetBridgeHistoryFrequency = L2PmCollectionInterval
195)
196
197// EthernetBridgeHistory specific constants
198const (
199 EthernetUniHistoryName = "Ethernet_UNI_History"
200 EthernetUniHistoryEnabled = true // This setting can be changed from voltha NBI PmConfig configuration
201 EthernetUniHistoryFrequency = L2PmCollectionInterval
202)
203
Girish Gowdra5c5aaf42021-02-17 19:40:50 -0800204// FecHistory specific constants
205const (
206 FecHistoryName = "FEC_History"
207 FecHistoryEnabled = true // This setting can be changed from voltha NBI PmConfig configuration
208 FecHistoryFrequency = L2PmCollectionInterval
209)
210
211// GemPortHistory specific constants
212const (
213 GemPortHistoryName = "GEM_Port_History"
214 GemPortHistoryEnabled = true // This setting can be changed from voltha NBI PmConfig configuration
215 GemPortHistoryFrequency = L2PmCollectionInterval
216)
217
Girish Gowdra0e533642021-03-02 22:02:51 -0800218// KV Store related constants
219const (
220 cPmKvStorePrefix = "%s/openonu/pm-data/%s" // <some-base-path>/openonu/pm-data/<onu-device-id>
221 cPmAdd = "add"
222 cPmAdded = "added"
223 cPmRemove = "remove"
224 cPmRemoved = "removed"
225)
226
Girish Gowdra5c5aaf42021-02-17 19:40:50 -0800227// Defines the type for generic metric population function
228type groupMetricPopulateFunc func(context.Context, me.ClassID, uint16, me.AttributeValueMap, me.AttributeValueMap, map[string]float32, *int) error
229
Girish Gowdrae0140f02021-02-02 16:55:09 -0800230// *** Classical L2 PM Counters end ***
231
Girish Gowdra0e533642021-03-02 22:02:51 -0800232type pmMEData struct {
233 InstancesActive []uint16 `json:"instances_active"` // list of active ME instance IDs for the group
234 InstancesToDelete []uint16 `json:"instances_to_delete"` // list of ME instance IDs marked for deletion for the group
235 InstancesToAdd []uint16 `json:"instances_to_add"` // list of ME instance IDs marked for addition for the group
236}
237
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800238type groupMetric struct {
239 groupName string
240 enabled bool
241 frequency uint32 // valid only if FrequencyOverride is enabled.
242 metricMap map[string]voltha.PmConfig_PmType
243 nextCollectionInterval time.Time // valid only if FrequencyOverride is enabled.
Girish Gowdrae0140f02021-02-02 16:55:09 -0800244 isL2PMCounter bool // true for only L2 PM counters
245 collectAttempts uint32 // number of attempts to collect L2 PM data
Girish Gowdra0e533642021-03-02 22:02:51 -0800246 pmMEData *pmMEData
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800247}
248
249type standaloneMetric struct {
250 metricName string
251 enabled bool
252 frequency uint32 // valid only if FrequencyOverride is enabled.
253 nextCollectionInterval time.Time // valid only if FrequencyOverride is enabled.
254}
255
Girish Gowdrae09a6202021-01-12 18:10:59 -0800256type onuMetricsManager struct {
257 pDeviceHandler *deviceHandler
Girish Gowdrae0140f02021-02-02 16:55:09 -0800258 pAdaptFsm *AdapterFsm
Girish Gowdrae09a6202021-01-12 18:10:59 -0800259
Girish Gowdrae0140f02021-02-02 16:55:09 -0800260 opticalMetricsChan chan me.AttributeValueMap
261 uniStatusMetricsChan chan me.AttributeValueMap
262 l2PmChan chan me.AttributeValueMap
263 syncTimeResponseChan chan bool // true is success, false is fail
264 l2PmCreateOrDeleteResponseChan chan bool // true is success, false is fail
265
266 activeL2Pms []string // list of active l2 pm MEs created on the ONU.
267 l2PmToDelete []string // list of L2 PMs to delete
268 l2PmToAdd []string // list of L2 PM to add
Girish Gowdrae09a6202021-01-12 18:10:59 -0800269
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800270 groupMetricMap map[string]*groupMetric
271 standaloneMetricMap map[string]*standaloneMetric
272
Girish Gowdrae09a6202021-01-12 18:10:59 -0800273 stopProcessingOmciResponses chan bool
Girish Gowdra7b0ee5c2021-03-19 21:48:15 -0700274 omciProcessingActive bool
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800275
Girish Gowdra7b0ee5c2021-03-19 21:48:15 -0700276 stopTicks chan bool
277 tickGenerationActive bool
Girish Gowdrae0140f02021-02-02 16:55:09 -0800278
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800279 nextGlobalMetricCollectionTime time.Time // valid only if pmConfig.FreqOverride is set to false.
280
281 onuMetricsManagerLock sync.RWMutex
Girish Gowdra0e533642021-03-02 22:02:51 -0800282
283 pmKvStore *db.Backend
Girish Gowdrae09a6202021-01-12 18:10:59 -0800284}
285
286// newonuMetricsManager returns a new instance of the newonuMetricsManager
Girish Gowdra0e533642021-03-02 22:02:51 -0800287// The metrics manager module is responsible for configuration and management of individual and group metrics.
288// Currently all the metrics are managed as a group which fall into two categories - L2 PM and "all others"
289// The L2 PM counters have a fixed 15min interval for PM collection while all other group counters have
290// the collection interval configurable.
291// The global PM config is part of the voltha.Device struct and is backed up on KV store (by rw-core).
292// This module also implements resiliency for L2 PM ME instances that are active/pending-delete/pending-add.
Girish Gowdrae09a6202021-01-12 18:10:59 -0800293func newonuMetricsManager(ctx context.Context, dh *deviceHandler) *onuMetricsManager {
294
295 var metricsManager onuMetricsManager
296 logger.Debugw(ctx, "init-onuMetricsManager", log.Fields{"device-id": dh.deviceID})
297 metricsManager.pDeviceHandler = dh
298
Girish Gowdrae0140f02021-02-02 16:55:09 -0800299 commMetricsChan := make(chan Message)
Girish Gowdrae09a6202021-01-12 18:10:59 -0800300 metricsManager.opticalMetricsChan = make(chan me.AttributeValueMap)
301 metricsManager.uniStatusMetricsChan = make(chan me.AttributeValueMap)
Girish Gowdrae0140f02021-02-02 16:55:09 -0800302 metricsManager.l2PmChan = make(chan me.AttributeValueMap)
303
304 metricsManager.syncTimeResponseChan = make(chan bool)
305 metricsManager.l2PmCreateOrDeleteResponseChan = make(chan bool)
306
Girish Gowdrae09a6202021-01-12 18:10:59 -0800307 metricsManager.stopProcessingOmciResponses = make(chan bool)
Girish Gowdrae0140f02021-02-02 16:55:09 -0800308 metricsManager.stopTicks = make(chan bool)
Girish Gowdrae09a6202021-01-12 18:10:59 -0800309
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800310 metricsManager.groupMetricMap = make(map[string]*groupMetric)
311 metricsManager.standaloneMetricMap = make(map[string]*standaloneMetric)
312
313 if dh.pmConfigs == nil { // dh.pmConfigs is NOT nil if adapter comes back from a restart. We should NOT go back to defaults in this case
Girish Gowdra5c5aaf42021-02-17 19:40:50 -0800314 metricsManager.initializeAllGroupMetrics()
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800315 }
316
Girish Gowdra5c5aaf42021-02-17 19:40:50 -0800317 metricsManager.populateLocalGroupMetricData(ctx)
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800318
Girish Gowdra5c5aaf42021-02-17 19:40:50 -0800319 if err := metricsManager.initializeL2PmFsm(ctx, commMetricsChan); err != nil {
Girish Gowdrae0140f02021-02-02 16:55:09 -0800320 return nil
321 }
Girish Gowdrae0140f02021-02-02 16:55:09 -0800322
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800323 // initialize the next metric collection intervals.
324 metricsManager.initializeMetricCollectionTime(ctx)
Girish Gowdra0e533642021-03-02 22:02:51 -0800325
326 baseKvStorePath := fmt.Sprintf(cPmKvStorePrefix, dh.pOpenOnuAc.cm.Backend.PathPrefix, dh.deviceID)
327 metricsManager.pmKvStore = dh.setBackend(ctx, baseKvStorePath)
328 if metricsManager.pmKvStore == nil {
329 logger.Errorw(ctx, "Can't initialize pmKvStore - no backend connection to PM module",
330 log.Fields{"device-id": dh.deviceID, "service": baseKvStorePath})
331 return nil
332 }
333
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800334 logger.Info(ctx, "init-onuMetricsManager completed", log.Fields{"device-id": dh.deviceID})
Girish Gowdrae09a6202021-01-12 18:10:59 -0800335 return &metricsManager
336}
337
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800338func (mm *onuMetricsManager) initializeMetricCollectionTime(ctx context.Context) {
339 if mm.pDeviceHandler.pmConfigs.FreqOverride {
340 // If mm.pDeviceHandler.pmConfigs.FreqOverride is set to true, then group/standalone metric specific interval applies
341 mm.onuMetricsManagerLock.Lock()
342 defer mm.onuMetricsManagerLock.Unlock()
343 for _, v := range mm.groupMetricMap {
Girish Gowdrae0140f02021-02-02 16:55:09 -0800344 if v.enabled && !v.isL2PMCounter { // L2 PM counter collection is managed in a L2PmFsm
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800345 v.nextCollectionInterval = time.Now().Add(time.Duration(v.frequency) * time.Second)
346 }
347 }
348
349 for _, v := range mm.standaloneMetricMap {
350 if v.enabled {
351 v.nextCollectionInterval = time.Now().Add(time.Duration(v.frequency) * time.Second)
352 }
353 }
354 } else {
355 // If mm.pDeviceHandler.pmConfigs.FreqOverride is set to false, then overall metric specific interval applies
356 mm.nextGlobalMetricCollectionTime = time.Now().Add(time.Duration(mm.pDeviceHandler.pmConfigs.DefaultFreq) * time.Second)
357 }
358 logger.Infow(ctx, "initialized standalone group/metric collection time", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
359}
360
361func (mm *onuMetricsManager) updateDefaultFrequency(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
362 // Verify that the configured DefaultFrequency is > 0 and is a multiple of FrequencyGranularity
Girish Gowdraaf0ad632021-01-27 13:00:01 -0800363 if pmConfigs.DefaultFreq == 0 || (pmConfigs.DefaultFreq > 0 && pmConfigs.DefaultFreq%FrequencyGranularity != 0) {
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800364 logger.Errorf(ctx, "frequency-%u-should-be-a-multiple-of-%u", pmConfigs.DefaultFreq, FrequencyGranularity)
365 return fmt.Errorf("frequency-%d-should-be-a-multiple-of-%d", pmConfigs.DefaultFreq, FrequencyGranularity)
366 }
367 mm.pDeviceHandler.pmConfigs.DefaultFreq = pmConfigs.DefaultFreq
368 // re-set the nextGlobalMetricCollectionTime based on the new DefaultFreq
369 mm.nextGlobalMetricCollectionTime = time.Now().Add(time.Duration(mm.pDeviceHandler.pmConfigs.DefaultFreq) * time.Second)
370 logger.Debugw(ctx, "frequency-updated--new-frequency", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "frequency": mm.pDeviceHandler.pmConfigs.DefaultFreq})
371 return nil
372}
373
374func (mm *onuMetricsManager) updateGroupFreq(ctx context.Context, aGroupName string, pmConfigs *voltha.PmConfigs) error {
375 var newGroupFreq uint32
376 found := false
377 groupSliceIdx := 0
378 var group *voltha.PmGroupConfig
379 for groupSliceIdx, group = range pmConfigs.Groups {
380 if group.GroupName == aGroupName {
Girish Gowdraaf0ad632021-01-27 13:00:01 -0800381 // freq 0 is not allowed and it should be multiple of FrequencyGranularity
382 if group.GroupFreq == 0 || (group.GroupFreq > 0 && group.GroupFreq%FrequencyGranularity != 0) {
383 logger.Errorf(ctx, "frequency-%u-should-be-a-multiple-of-%u", group.GroupFreq, FrequencyGranularity)
384 return fmt.Errorf("frequency-%d-should-be-a-multiple-of-%d", group.GroupFreq, FrequencyGranularity)
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800385 }
Girish Gowdraaf0ad632021-01-27 13:00:01 -0800386 newGroupFreq = group.GroupFreq
387 found = true
388 break
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800389 }
390 }
391 // if not found update group freq and next collection interval for the group
392 if !found {
393 logger.Errorw(ctx, "group name not found", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": aGroupName})
394 return fmt.Errorf("group-name-not-found-%v", aGroupName)
395 }
396
397 updated := false
398 mm.onuMetricsManagerLock.Lock()
399 defer mm.onuMetricsManagerLock.Unlock()
400 for k, v := range mm.groupMetricMap {
Girish Gowdrae0140f02021-02-02 16:55:09 -0800401 if k == aGroupName && !v.isL2PMCounter { // We cannot allow the L2 PM counter frequency to be updated. It is 15min fixed by OMCI spec
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800402 v.frequency = newGroupFreq
403 // update internal pm config
404 mm.pDeviceHandler.pmConfigs.Groups[groupSliceIdx].GroupFreq = newGroupFreq
405 // Also updated the next group metric collection time from now
406 v.nextCollectionInterval = time.Now().Add(time.Duration(newGroupFreq) * time.Second)
407 updated = true
408 logger.Infow(ctx, "group frequency updated", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "newGroupFreq": newGroupFreq, "groupName": aGroupName})
Girish Gowdra36ccf7d2021-02-25 20:42:51 -0800409 break
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800410 }
411 }
412 if !updated {
413 logger.Errorw(ctx, "group frequency not updated", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "newGroupFreq": newGroupFreq, "groupName": aGroupName})
414 return fmt.Errorf("internal-error-during-group-freq-update--groupname-%s-freq-%d", aGroupName, newGroupFreq)
415 }
416 return nil
417}
418
419func (mm *onuMetricsManager) updateMetricFreq(ctx context.Context, aMetricName string, pmConfigs *voltha.PmConfigs) error {
420 var newMetricFreq uint32
421 found := false
422 metricSliceIdx := 0
423 var metric *voltha.PmConfig
424 for metricSliceIdx, metric = range pmConfigs.Metrics {
425 if metric.Name == aMetricName {
Girish Gowdraaf0ad632021-01-27 13:00:01 -0800426 // freq 0 is not allowed and it should be multiple of FrequencyGranularity
427 if metric.SampleFreq == 0 || (metric.SampleFreq > 0 && metric.SampleFreq%FrequencyGranularity != 0) {
428 logger.Errorf(ctx, "frequency-%u-should-be-a-multiple-of-%u", metric.SampleFreq, FrequencyGranularity)
429 return fmt.Errorf("frequency-%d-should-be-a-multiple-of-%d", metric.SampleFreq, FrequencyGranularity)
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800430 }
Girish Gowdraaf0ad632021-01-27 13:00:01 -0800431 newMetricFreq = metric.SampleFreq
432 found = true
433 break
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800434 }
435 }
436 if !found {
437 logger.Errorw(ctx, "metric name not found", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "metricName": aMetricName})
438 return fmt.Errorf("metric-name-not-found-%v", aMetricName)
439 }
440
441 updated := false
442 mm.onuMetricsManagerLock.Lock()
443 defer mm.onuMetricsManagerLock.Unlock()
444 for k, v := range mm.groupMetricMap {
Girish Gowdraaf0ad632021-01-27 13:00:01 -0800445 if k == aMetricName {
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800446 v.frequency = newMetricFreq
447 // update internal pm config
448 mm.pDeviceHandler.pmConfigs.Metrics[metricSliceIdx].SampleFreq = newMetricFreq
449 // Also updated the next standalone metric collection time from now
450 v.nextCollectionInterval = time.Now().Add(time.Duration(newMetricFreq) * time.Second)
451 updated = true
452 logger.Infow(ctx, "metric frequency updated", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "newMetricFreq": newMetricFreq, "aMetricName": aMetricName})
Girish Gowdra36ccf7d2021-02-25 20:42:51 -0800453 break
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800454 }
455 }
456 if !updated {
457 logger.Errorw(ctx, "metric frequency not updated", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "newMetricFreq": newMetricFreq, "aMetricName": aMetricName})
458 return fmt.Errorf("internal-error-during-standalone-metric-update--matricnane-%s-freq-%d", aMetricName, newMetricFreq)
459 }
460 return nil
461}
462
463func (mm *onuMetricsManager) updateGroupSupport(ctx context.Context, aGroupName string, pmConfigs *voltha.PmConfigs) error {
464 groupSliceIdx := 0
465 var group *voltha.PmGroupConfig
466
467 for groupSliceIdx, group = range pmConfigs.Groups {
468 if group.GroupName == aGroupName {
469 break
470 }
471 }
472 if group == nil {
473 logger.Errorw(ctx, "group metric not found", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": aGroupName})
474 return fmt.Errorf("group-not-found--groupName-%s", aGroupName)
475 }
476
477 updated := false
478 mm.onuMetricsManagerLock.Lock()
479 defer mm.onuMetricsManagerLock.Unlock()
480 for k, v := range mm.groupMetricMap {
481 if k == aGroupName && v.enabled != group.Enabled {
482 mm.pDeviceHandler.pmConfigs.Groups[groupSliceIdx].Enabled = group.Enabled
483 v.enabled = group.Enabled
Girish Gowdrae0140f02021-02-02 16:55:09 -0800484 if group.Enabled {
485 if v.isL2PMCounter {
486 // If it is a L2 PM counter we need to mark the PM to be added
Girish Gowdra5c5aaf42021-02-17 19:40:50 -0800487 mm.l2PmToAdd = mm.appendIfMissingString(mm.l2PmToAdd, v.groupName)
Girish Gowdrae0140f02021-02-02 16:55:09 -0800488 // If the group support flag toggles too soon, we need to delete the group name from l2PmToDelete slice
Girish Gowdra5c5aaf42021-02-17 19:40:50 -0800489 mm.l2PmToDelete = mm.removeIfFoundString(mm.l2PmToDelete, v.groupName)
490
491 // The GemPortHistory group requires some special handling as the instance IDs are not pre-defined
492 // unlike other L2 PM counters. We need to fetch the active gemport instance IDs in the system to
493 // take further action
494 if v.groupName == GemPortHistoryName {
Girish Gowdra36ccf7d2021-02-25 20:42:51 -0800495 mm.updateGemPortNTPInstanceToAddForPerfMonitoring(ctx)
Girish Gowdra5c5aaf42021-02-17 19:40:50 -0800496 }
Girish Gowdrae0140f02021-02-02 16:55:09 -0800497 } else if mm.pDeviceHandler.pmConfigs.FreqOverride { // otherwise just update the next collection interval
498 v.nextCollectionInterval = time.Now().Add(time.Duration(v.frequency) * time.Second)
499 }
500 } else { // group counter is disabled
501 if v.isL2PMCounter {
502 // If it is a L2 PM counter we need to mark the PM to be deleted
Girish Gowdra5c5aaf42021-02-17 19:40:50 -0800503 mm.l2PmToDelete = mm.appendIfMissingString(mm.l2PmToDelete, v.groupName)
Girish Gowdrae0140f02021-02-02 16:55:09 -0800504 // If the group support flag toggles too soon, we need to delete the group name from l2PmToAdd slice
Girish Gowdra5c5aaf42021-02-17 19:40:50 -0800505 mm.l2PmToAdd = mm.removeIfFoundString(mm.l2PmToAdd, v.groupName)
506
507 // The GemPortHistory group requires some special handling as the instance IDs are not pre-defined
508 // unlike other L2 PM counters. We need to fetch the active gemport instance IDs in the system to
509 // take further action
510 if v.groupName == GemPortHistoryName {
Girish Gowdra36ccf7d2021-02-25 20:42:51 -0800511 mm.updateGemPortNTPInstanceToDeleteForPerfMonitoring(ctx)
Girish Gowdra5c5aaf42021-02-17 19:40:50 -0800512 }
Girish Gowdrae0140f02021-02-02 16:55:09 -0800513 }
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800514 }
515 updated = true
Girish Gowdrae0140f02021-02-02 16:55:09 -0800516 if v.isL2PMCounter {
517 logger.Infow(ctx, "l2 pm group metric support updated",
518 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": aGroupName, "enabled": group.Enabled, "l2PmToAdd": mm.l2PmToAdd, "l2PmToDelete": mm.l2PmToDelete})
519 } else {
520 logger.Infow(ctx, "group metric support updated", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": aGroupName, "enabled": group.Enabled})
521 }
522 break
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800523 }
524 }
525
526 if !updated {
527 logger.Errorw(ctx, "group metric support not updated", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": aGroupName})
528 return fmt.Errorf("internal-error-during-group-support-update--groupName-%s", aGroupName)
529 }
530 return nil
531}
532
533func (mm *onuMetricsManager) updateMetricSupport(ctx context.Context, aMetricName string, pmConfigs *voltha.PmConfigs) error {
534 metricSliceIdx := 0
535 var metric *voltha.PmConfig
536
537 for metricSliceIdx, metric = range pmConfigs.Metrics {
538 if metric.Name == aMetricName {
539 break
540 }
541 }
542
543 if metric == nil {
544 logger.Errorw(ctx, "standalone metric not found", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "metricName": aMetricName})
545 return fmt.Errorf("metric-not-found--metricname-%s", aMetricName)
546 }
547
548 updated := false
549 mm.onuMetricsManagerLock.Lock()
550 defer mm.onuMetricsManagerLock.Unlock()
551 for k, v := range mm.standaloneMetricMap {
552 if k == aMetricName && v.enabled != metric.Enabled {
553 mm.pDeviceHandler.pmConfigs.Metrics[metricSliceIdx].Enabled = metric.Enabled
554 v.enabled = metric.Enabled
555 // If the standalone metric is now enabled and frequency override is enabled, set the next metric collection time
556 if metric.Enabled && mm.pDeviceHandler.pmConfigs.FreqOverride {
557 v.nextCollectionInterval = time.Now().Add(time.Duration(v.frequency) * time.Second)
558 }
559 updated = true
560 logger.Infow(ctx, "standalone metric support updated", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "metricName": aMetricName, "enabled": metric.Enabled})
Girish Gowdra36ccf7d2021-02-25 20:42:51 -0800561 break
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800562 }
563 }
564 if !updated {
565 logger.Errorw(ctx, "standalone metric support not updated", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "metricName": aMetricName})
566 return fmt.Errorf("internal-error-during-standalone-support-update--metricname-%s", aMetricName)
567 }
568 return nil
569}
570
571func (mm *onuMetricsManager) collectAllGroupAndStandaloneMetrics(ctx context.Context) {
572 if mm.pDeviceHandler.pmConfigs.Grouped { // metrics are managed as a group.
573 go mm.collectAllGroupMetrics(ctx)
574 } else {
575 go mm.collectAllStandaloneMetrics(ctx)
576 }
577}
578
579func (mm *onuMetricsManager) collectAllGroupMetrics(ctx context.Context) {
580 go func() {
581 logger.Debug(ctx, "startCollector before collecting optical metrics")
582 metricInfo := mm.collectOpticalMetrics(ctx)
583 if metricInfo != nil {
584 mm.publishMetrics(ctx, metricInfo)
585 }
586 }()
587
588 go func() {
589 logger.Debug(ctx, "startCollector before collecting uni metrics")
590 metricInfo := mm.collectUniStatusMetrics(ctx)
591 if metricInfo != nil {
592 mm.publishMetrics(ctx, metricInfo)
593 }
594 }()
595
596 // Add more here
597}
598
599func (mm *onuMetricsManager) collectAllStandaloneMetrics(ctx context.Context) {
600 // None exists as of now, add when available here
601}
602
603func (mm *onuMetricsManager) collectGroupMetric(ctx context.Context, groupName string) {
604 switch groupName {
605 case OpticalPowerGroupMetricName:
606 go func() {
607 if mi := mm.collectOpticalMetrics(ctx); mm != nil {
608 mm.publishMetrics(ctx, mi)
609 }
610 }()
611 case UniStatusGroupMetricName:
612 go func() {
613 if mi := mm.collectUniStatusMetrics(ctx); mm != nil {
614 mm.publishMetrics(ctx, mi)
615 }
616 }()
617 default:
618 logger.Errorw(ctx, "unhandled group metric name", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": groupName})
619 }
620}
621
622func (mm *onuMetricsManager) collectStandaloneMetric(ctx context.Context, metricName string) {
623 switch metricName {
624 // None exist as of now, add when available
625 default:
626 logger.Errorw(ctx, "unhandled standalone metric name", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "metricName": metricName})
627 }
628}
629
630// collectOpticalMetrics collects groups metrics related to optical power from ani-g ME.
Girish Gowdrae09a6202021-01-12 18:10:59 -0800631func (mm *onuMetricsManager) collectOpticalMetrics(ctx context.Context) []*voltha.MetricInformation {
632 logger.Debugw(ctx, "collectOpticalMetrics", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800633
634 mm.onuMetricsManagerLock.RLock()
635 if !mm.groupMetricMap[OpticalPowerGroupMetricName].enabled {
636 mm.onuMetricsManagerLock.RUnlock()
637 logger.Debugw(ctx, "optical power group metric is not enabled", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
638 return nil
639 }
640 mm.onuMetricsManagerLock.RUnlock()
641
Girish Gowdrae09a6202021-01-12 18:10:59 -0800642 var metricInfoSlice []*voltha.MetricInformation
643 metricsContext := make(map[string]string)
644 metricsContext["onuID"] = fmt.Sprintf("%d", mm.pDeviceHandler.device.ProxyAddress.OnuId)
645 metricsContext["intfID"] = fmt.Sprintf("%d", mm.pDeviceHandler.device.ProxyAddress.ChannelId)
646 metricsContext["devicetype"] = mm.pDeviceHandler.DeviceType
647
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800648 raisedTs := time.Now().Unix()
Girish Gowdrae09a6202021-01-12 18:10:59 -0800649 mmd := voltha.MetricMetaData{
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800650 Title: OpticalPowerGroupMetricName,
Girish Gowdrae09a6202021-01-12 18:10:59 -0800651 Ts: float64(raisedTs),
652 Context: metricsContext,
653 DeviceId: mm.pDeviceHandler.deviceID,
654 LogicalDeviceId: mm.pDeviceHandler.logicalDeviceID,
655 SerialNo: mm.pDeviceHandler.device.SerialNumber,
656 }
657
Girish Gowdrae09a6202021-01-12 18:10:59 -0800658 // get the ANI-G instance IDs
659 anigInstKeys := mm.pDeviceHandler.pOnuOmciDevice.pOnuDB.getSortedInstKeys(ctx, me.AniGClassID)
660loop:
661 for _, anigInstID := range anigInstKeys {
662 var meAttributes me.AttributeValueMap
663 opticalMetrics := make(map[string]float32)
664 // Get the ANI-G instance optical power attributes
665 requestedAttributes := me.AttributeValueMap{"OpticalSignalLevel": 0, "TransmitOpticalLevel": 0}
Girish Gowdra0b235842021-03-09 13:06:46 -0800666 if meInstance := mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendGetMe(ctx, me.AniGClassID, anigInstID, requestedAttributes, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, mm.pAdaptFsm.commChan); meInstance != nil {
Girish Gowdrae09a6202021-01-12 18:10:59 -0800667 select {
668 case meAttributes = <-mm.opticalMetricsChan:
669 logger.Debugw(ctx, "received optical metrics", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
Girish Gowdra0b235842021-03-09 13:06:46 -0800670 case <-time.After(time.Duration(mm.pDeviceHandler.pOpenOnuAc.omciTimeout) * time.Second):
Girish Gowdra36ccf7d2021-02-25 20:42:51 -0800671 logger.Errorw(ctx, "timeout waiting for omci-get response for optical metrics", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
Girish Gowdrae09a6202021-01-12 18:10:59 -0800672 // The metrics will be empty in this case
673 break loop
674 }
675 // Populate metric only if it was enabled.
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800676 for k := range OpticalPowerGroupMetrics {
677 switch k {
678 case "ani_g_instance_id":
679 if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
680 opticalMetrics[k] = float32(val.(uint16))
681 }
Girish Gowdrae20a4f62021-03-09 16:06:23 -0800682 case "transmit_power_dBm":
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800683 if val, ok := meAttributes["TransmitOpticalLevel"]; ok && val != nil {
Girish Gowdrae20a4f62021-03-09 16:06:23 -0800684 opticalMetrics[k] = float32(math.Round((float64(mm.twosComplementToSignedInt16(val.(uint16)))/500.0)*10) / 10) // convert to dBm rounded of to single decimal place
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800685 }
Girish Gowdrae20a4f62021-03-09 16:06:23 -0800686 case "receive_power_dBm":
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800687 if val, ok := meAttributes["OpticalSignalLevel"]; ok && val != nil {
Girish Gowdrae20a4f62021-03-09 16:06:23 -0800688 opticalMetrics[k] = float32(math.Round((float64(mm.twosComplementToSignedInt16(val.(uint16)))/500.0)*10) / 10) // convert to dBm rounded of to single decimal place
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800689 }
Girish Gowdrae09a6202021-01-12 18:10:59 -0800690 default:
691 // do nothing
692 }
693 }
694 }
695 // create slice of metrics given that there could be more than one ANI-G instance and
696 // optical metrics are collected per ANI-G instance
697 metricInfo := voltha.MetricInformation{Metadata: &mmd, Metrics: opticalMetrics}
698 metricInfoSlice = append(metricInfoSlice, &metricInfo)
699 }
700
701 return metricInfoSlice
702}
703
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800704// collectUniStatusMetrics collects UNI status group metric from various MEs (uni-g, pptp and veip).
Girish Gowdrae09a6202021-01-12 18:10:59 -0800705// nolint: gocyclo
706func (mm *onuMetricsManager) collectUniStatusMetrics(ctx context.Context) []*voltha.MetricInformation {
707 logger.Debugw(ctx, "collectUniStatusMetrics", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800708 mm.onuMetricsManagerLock.RLock()
709 if !mm.groupMetricMap[UniStatusGroupMetricName].enabled {
710 mm.onuMetricsManagerLock.RUnlock()
711 logger.Debugw(ctx, "uni status group metric is not enabled", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
712 return nil
713 }
714 mm.onuMetricsManagerLock.RUnlock()
715
Girish Gowdrae09a6202021-01-12 18:10:59 -0800716 var metricInfoSlice []*voltha.MetricInformation
717 metricsContext := make(map[string]string)
718 metricsContext["onuID"] = fmt.Sprintf("%d", mm.pDeviceHandler.device.ProxyAddress.OnuId)
719 metricsContext["intfID"] = fmt.Sprintf("%d", mm.pDeviceHandler.device.ProxyAddress.ChannelId)
720 metricsContext["devicetype"] = mm.pDeviceHandler.DeviceType
721
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800722 raisedTs := time.Now().Unix()
Girish Gowdrae09a6202021-01-12 18:10:59 -0800723 mmd := voltha.MetricMetaData{
724 Title: "UniStatus", // Is this ok to hard code?
725 Ts: float64(raisedTs),
726 Context: metricsContext,
727 DeviceId: mm.pDeviceHandler.deviceID,
728 LogicalDeviceId: mm.pDeviceHandler.logicalDeviceID,
729 SerialNo: mm.pDeviceHandler.device.SerialNumber,
730 }
731
Girish Gowdrae09a6202021-01-12 18:10:59 -0800732 // get the UNI-G instance IDs
733 unigInstKeys := mm.pDeviceHandler.pOnuOmciDevice.pOnuDB.getSortedInstKeys(ctx, me.UniGClassID)
734loop1:
735 for _, unigInstID := range unigInstKeys {
736 // TODO: Include additional information in the voltha.MetricMetaData - like portno, uni-id, instance-id
737 // to uniquely identify this ME instance and also to correlate the ME instance to physical instance
738 unigMetrics := make(map[string]float32)
739 var meAttributes me.AttributeValueMap
740 // Get the UNI-G instance optical power attributes
741 requestedAttributes := me.AttributeValueMap{"AdministrativeState": 0}
Girish Gowdra0b235842021-03-09 13:06:46 -0800742 if meInstance := mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendGetMe(ctx, me.UniGClassID, unigInstID, requestedAttributes, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, mm.pAdaptFsm.commChan); meInstance != nil {
Girish Gowdrae09a6202021-01-12 18:10:59 -0800743 // Wait for metrics or timeout
744 select {
745 case meAttributes = <-mm.uniStatusMetricsChan:
746 logger.Debugw(ctx, "received uni-g metrics", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
Girish Gowdra0b235842021-03-09 13:06:46 -0800747 case <-time.After(time.Duration(mm.pDeviceHandler.pOpenOnuAc.omciTimeout) * time.Second):
Girish Gowdrae09a6202021-01-12 18:10:59 -0800748 logger.Errorw(ctx, "timeout waiting for omci-get response for uni status", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
749 // The metrics could be empty in this case
750 break loop1
751 }
752 // Populate metric only if it was enabled.
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800753 for k := range UniStatusGroupMetrics {
754 switch k {
Girish Gowdrae09a6202021-01-12 18:10:59 -0800755 case "uni_admin_state":
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800756 if val, ok := meAttributes["AdministrativeState"]; ok && val != nil {
757 unigMetrics[k] = float32(val.(byte))
758 }
Girish Gowdrae09a6202021-01-12 18:10:59 -0800759 default:
760 // do nothing
761 }
762 }
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800763 if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
Girish Gowdra0e533642021-03-02 22:02:51 -0800764 entityID := val.(uint16)
765 unigMetrics["entity_id"] = float32(entityID)
766 // TODO: Rlock needed for reading uniEntityMap? May not be needed given uniEntityMap is populated setup at initial ONU bring up
767 for _, uni := range mm.pDeviceHandler.uniEntityMap {
768 if uni.entityID == entityID {
769 unigMetrics["uni_port_no"] = float32(uni.portNo)
Girish Gowdrada3a52f2021-03-17 11:24:11 -0700770 break
Girish Gowdra0e533642021-03-02 22:02:51 -0800771 }
772 }
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800773 }
Girish Gowdrada3a52f2021-03-17 11:24:11 -0700774 unigMetrics["me_class_id"] = float32(me.UniGClassID)
Girish Gowdra0e533642021-03-02 22:02:51 -0800775
Girish Gowdrae09a6202021-01-12 18:10:59 -0800776 // create slice of metrics given that there could be more than one UNI-G instance
777 metricInfo := voltha.MetricInformation{Metadata: &mmd, Metrics: unigMetrics}
778 metricInfoSlice = append(metricInfoSlice, &metricInfo)
779 }
780 }
781
782 // get the PPTP instance IDs
783 pptpInstKeys := mm.pDeviceHandler.pOnuOmciDevice.pOnuDB.getSortedInstKeys(ctx, me.PhysicalPathTerminationPointEthernetUniClassID)
784loop2:
785 for _, pptpInstID := range pptpInstKeys {
786 // TODO: Include additional information in the voltha.MetricMetaData - like portno, uni-id, instance-id
787 // to uniquely identify this ME instance and also to correlate the ME instance to physical instance
788 var meAttributes me.AttributeValueMap
789 pptpMetrics := make(map[string]float32)
790
791 requestedAttributes := me.AttributeValueMap{"SensedType": 0, "OperationalState": 0, "AdministrativeState": 0}
Girish Gowdra0b235842021-03-09 13:06:46 -0800792 if meInstance := mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendGetMe(ctx, me.PhysicalPathTerminationPointEthernetUniClassID, pptpInstID, requestedAttributes, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, mm.pAdaptFsm.commChan); meInstance != nil {
Girish Gowdrae09a6202021-01-12 18:10:59 -0800793 // Wait for metrics or timeout
794 select {
795 case meAttributes = <-mm.uniStatusMetricsChan:
796 logger.Debugw(ctx, "received pptp metrics", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
Girish Gowdra0b235842021-03-09 13:06:46 -0800797 case <-time.After(time.Duration(mm.pDeviceHandler.pOpenOnuAc.omciTimeout) * time.Second):
Girish Gowdrae09a6202021-01-12 18:10:59 -0800798 logger.Errorw(ctx, "timeout waiting for omci-get response for uni status", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
799 // The metrics could be empty in this case
800 break loop2
801 }
802
803 // Populate metric only if it was enabled.
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800804 for k := range UniStatusGroupMetrics {
805 switch k {
Girish Gowdrada3a52f2021-03-17 11:24:11 -0700806 case "sensed_type":
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800807 if val, ok := meAttributes["SensedType"]; ok && val != nil {
808 pptpMetrics[k] = float32(val.(byte))
809 }
Girish Gowdrae09a6202021-01-12 18:10:59 -0800810 case "oper_status":
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800811 if val, ok := meAttributes["OperationalState"]; ok && val != nil {
812 pptpMetrics[k] = float32(val.(byte))
813 }
Girish Gowdrae09a6202021-01-12 18:10:59 -0800814 case "uni_admin_state":
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800815 if val, ok := meAttributes["AdministrativeState"]; ok && val != nil {
816 pptpMetrics[k] = float32(val.(byte))
817 }
Girish Gowdrae09a6202021-01-12 18:10:59 -0800818 default:
819 // do nothing
820 }
821 }
822 }
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800823 if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
Girish Gowdra0e533642021-03-02 22:02:51 -0800824 entityID := val.(uint16)
825 pptpMetrics["entity_id"] = float32(entityID)
826 // TODO: Rlock needed for reading uniEntityMap? May not be needed given uniEntityMap is populated setup at initial ONU bring up
827 for _, uni := range mm.pDeviceHandler.uniEntityMap {
828 if uni.entityID == entityID {
829 pptpMetrics["uni_port_no"] = float32(uni.portNo)
Girish Gowdrada3a52f2021-03-17 11:24:11 -0700830 break
Girish Gowdra0e533642021-03-02 22:02:51 -0800831 }
832 }
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800833 }
Girish Gowdrada3a52f2021-03-17 11:24:11 -0700834 pptpMetrics["me_class_id"] = float32(me.PhysicalPathTerminationPointEthernetUniClassID)
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800835
Girish Gowdrae09a6202021-01-12 18:10:59 -0800836 // create slice of metrics given that there could be more than one PPTP instance and
837 metricInfo := voltha.MetricInformation{Metadata: &mmd, Metrics: pptpMetrics}
838 metricInfoSlice = append(metricInfoSlice, &metricInfo)
839 }
840
841 // get the VEIP instance IDs
842 veipInstKeys := mm.pDeviceHandler.pOnuOmciDevice.pOnuDB.getSortedInstKeys(ctx, me.VirtualEthernetInterfacePointClassID)
843loop3:
844 for _, veipInstID := range veipInstKeys {
845 // TODO: Include additional information in the voltha.MetricMetaData - like portno, uni-id, instance-id
846 // to uniquely identify this ME instance and also to correlate the ME instance to physical instance
847 var meAttributes me.AttributeValueMap
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800848 veipMetrics := make(map[string]float32)
Girish Gowdrae09a6202021-01-12 18:10:59 -0800849
850 requestedAttributes := me.AttributeValueMap{"OperationalState": 0, "AdministrativeState": 0}
Girish Gowdra0b235842021-03-09 13:06:46 -0800851 if meInstance := mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendGetMe(ctx, me.VirtualEthernetInterfacePointClassID, veipInstID, requestedAttributes, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, mm.pAdaptFsm.commChan); meInstance != nil {
Girish Gowdrae09a6202021-01-12 18:10:59 -0800852 // Wait for metrics or timeout
853 select {
854 case meAttributes = <-mm.uniStatusMetricsChan:
855 logger.Debugw(ctx, "received veip metrics", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
Girish Gowdra0b235842021-03-09 13:06:46 -0800856 case <-time.After(time.Duration(mm.pDeviceHandler.pOpenOnuAc.omciTimeout) * time.Second):
Girish Gowdrae09a6202021-01-12 18:10:59 -0800857 logger.Errorw(ctx, "timeout waiting for omci-get response for uni status", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
858 // The metrics could be empty in this case
859 break loop3
860 }
861
862 // Populate metric only if it was enabled.
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800863 for k := range UniStatusGroupMetrics {
864 switch k {
Girish Gowdrae09a6202021-01-12 18:10:59 -0800865 case "oper_status":
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800866 if val, ok := meAttributes["OperationalState"]; ok && val != nil {
867 veipMetrics[k] = float32(val.(byte))
868 }
Girish Gowdrae09a6202021-01-12 18:10:59 -0800869 case "uni_admin_state":
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800870 if val, ok := meAttributes["AdministrativeState"]; ok && val != nil {
871 veipMetrics[k] = float32(val.(byte))
872 }
Girish Gowdrae09a6202021-01-12 18:10:59 -0800873 default:
874 // do nothing
875 }
876 }
877 }
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800878
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800879 if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
Girish Gowdra0e533642021-03-02 22:02:51 -0800880 entityID := val.(uint16)
881 veipMetrics["entity_id"] = float32(entityID)
882 // TODO: Rlock needed for reading uniEntityMap? May not be needed given uniEntityMap is populated setup at initial ONU bring up
883 for _, uni := range mm.pDeviceHandler.uniEntityMap {
884 if uni.entityID == entityID {
885 veipMetrics["uni_port_no"] = float32(uni.portNo)
Girish Gowdrada3a52f2021-03-17 11:24:11 -0700886 break
Girish Gowdra0e533642021-03-02 22:02:51 -0800887 }
888 }
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800889 }
Girish Gowdrada3a52f2021-03-17 11:24:11 -0700890 veipMetrics["me_class_id"] = float32(me.VirtualEthernetInterfacePointClassID)
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800891
Girish Gowdrae09a6202021-01-12 18:10:59 -0800892 // create slice of metrics given that there could be more than one VEIP instance
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800893 metricInfo := voltha.MetricInformation{Metadata: &mmd, Metrics: veipMetrics}
Girish Gowdrae09a6202021-01-12 18:10:59 -0800894 metricInfoSlice = append(metricInfoSlice, &metricInfo)
895 }
896
897 return metricInfoSlice
898}
899
900// publishMetrics publishes the metrics on kafka
901func (mm *onuMetricsManager) publishMetrics(ctx context.Context, metricInfo []*voltha.MetricInformation) {
902 var ke voltha.KpiEvent2
Girish Gowdra5a7c4922021-01-22 18:33:41 -0800903 ts := time.Now().Unix()
Girish Gowdrae09a6202021-01-12 18:10:59 -0800904 ke.SliceData = metricInfo
905 ke.Type = voltha.KpiEventType_slice
906 ke.Ts = float64(ts)
907
908 if err := mm.pDeviceHandler.EventProxy.SendKpiEvent(ctx, "STATS_EVENT", &ke, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_ONU, ts); err != nil {
909 logger.Errorw(ctx, "failed-to-send-pon-stats", log.Fields{"err": err})
910 }
911}
912
913func (mm *onuMetricsManager) processOmciMessages(ctx context.Context) {
914 logger.Infow(ctx, "Start routine to process OMCI-GET messages for device-id", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
915 // Flush metric collection channels to be safe.
916 // It is possible that there is stale data on this channel if the processOmciMessages routine
917 // is stopped right after issuing a OMCI-GET request and started again.
918 // The processOmciMessages routine will get stopped if startCollector routine (in device_handler.go)
919 // is stopped - as a result of ONU going down.
920 mm.flushMetricCollectionChannels(ctx)
Girish Gowdra7b0ee5c2021-03-19 21:48:15 -0700921 mm.updateOmciProcessingStatus(true)
Girish Gowdrae09a6202021-01-12 18:10:59 -0800922 for {
923 select {
924 case <-mm.stopProcessingOmciResponses: // stop this routine
925 logger.Infow(ctx, "Stop routine to process OMCI-GET messages for device-id", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
Girish Gowdra7b0ee5c2021-03-19 21:48:15 -0700926 mm.updateOmciProcessingStatus(false)
Girish Gowdrae09a6202021-01-12 18:10:59 -0800927 return
Girish Gowdrae0140f02021-02-02 16:55:09 -0800928 case message, ok := <-mm.pAdaptFsm.commChan:
Girish Gowdrae09a6202021-01-12 18:10:59 -0800929 if !ok {
930 logger.Errorw(ctx, "Message couldn't be read from channel", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
931 continue
932 }
933 logger.Debugw(ctx, "Received message on ONU metrics channel", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
934
935 switch message.Type {
936 case OMCI:
937 msg, _ := message.Data.(OmciMessage)
938 mm.handleOmciMessage(ctx, msg)
939 default:
940 logger.Warn(ctx, "Unknown message type received", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "message.Type": message.Type})
941 }
942 }
943 }
944}
945
946func (mm *onuMetricsManager) handleOmciMessage(ctx context.Context, msg OmciMessage) {
947 logger.Debugw(ctx, "omci Msg", log.Fields{"device-id": mm.pDeviceHandler.deviceID,
948 "msgType": msg.OmciMsg.MessageType, "msg": msg})
949 switch msg.OmciMsg.MessageType {
950 case omci.GetResponseType:
951 //TODO: error handling
952 _ = mm.handleOmciGetResponseMessage(ctx, msg)
Girish Gowdrae0140f02021-02-02 16:55:09 -0800953 case omci.SynchronizeTimeResponseType:
954 _ = mm.handleOmciSynchronizeTimeResponseMessage(ctx, msg)
955 case omci.CreateResponseType:
956 _ = mm.handleOmciCreateResponseMessage(ctx, msg)
957 case omci.DeleteResponseType:
958 _ = mm.handleOmciDeleteResponseMessage(ctx, msg)
Girish Gowdrae09a6202021-01-12 18:10:59 -0800959 default:
960 logger.Warnw(ctx, "Unknown Message Type", log.Fields{"msgType": msg.OmciMsg.MessageType})
961
962 }
963}
964
965func (mm *onuMetricsManager) handleOmciGetResponseMessage(ctx context.Context, msg OmciMessage) error {
966 msgLayer := (*msg.OmciPacket).Layer(omci.LayerTypeGetResponse)
967 if msgLayer == nil {
968 logger.Errorw(ctx, "omci Msg layer could not be detected for GetResponse - handling stopped", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
969 return fmt.Errorf("omci Msg layer could not be detected for GetResponse - handling stopped: %s", mm.pDeviceHandler.deviceID)
970 }
971 msgObj, msgOk := msgLayer.(*omci.GetResponse)
972 if !msgOk {
973 logger.Errorw(ctx, "omci Msg layer could not be assigned for GetResponse - handling stopped", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
974 return fmt.Errorf("omci Msg layer could not be assigned for GetResponse - handling stopped: %s", mm.pDeviceHandler.deviceID)
975 }
976 logger.Debugw(ctx, "OMCI GetResponse Data", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "data-fields": msgObj})
977 if msgObj.Result == me.Success {
978 meAttributes := msgObj.Attributes
979 switch msgObj.EntityClass {
980 case me.AniGClassID:
981 mm.opticalMetricsChan <- meAttributes
982 return nil
983 case me.UniGClassID:
984 mm.uniStatusMetricsChan <- meAttributes
985 return nil
986 case me.PhysicalPathTerminationPointEthernetUniClassID:
987 mm.uniStatusMetricsChan <- meAttributes
988 return nil
989 case me.VirtualEthernetInterfacePointClassID:
990 mm.uniStatusMetricsChan <- meAttributes
991 return nil
Girish Gowdrae0140f02021-02-02 16:55:09 -0800992 case me.EthernetFramePerformanceMonitoringHistoryDataUpstreamClassID,
993 me.EthernetFramePerformanceMonitoringHistoryDataDownstreamClassID,
Girish Gowdra5c5aaf42021-02-17 19:40:50 -0800994 me.EthernetPerformanceMonitoringHistoryDataClassID,
995 me.FecPerformanceMonitoringHistoryDataClassID,
996 me.GemPortNetworkCtpPerformanceMonitoringHistoryDataClassID:
Girish Gowdrae0140f02021-02-02 16:55:09 -0800997 mm.l2PmChan <- meAttributes
Girish Gowdrae09a6202021-01-12 18:10:59 -0800998 default:
999 logger.Errorw(ctx, "unhandled omci get response message",
1000 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "class-id": msgObj.EntityClass})
1001 }
1002 }
1003
Girish Gowdrae0140f02021-02-02 16:55:09 -08001004 return fmt.Errorf("unhandled-omci-get-response-message")
1005}
1006
1007func (mm *onuMetricsManager) handleOmciSynchronizeTimeResponseMessage(ctx context.Context, msg OmciMessage) error {
1008 msgLayer := (*msg.OmciPacket).Layer(omci.LayerTypeSynchronizeTimeResponse)
1009 if msgLayer == nil {
1010 logger.Errorw(ctx, "omci Msg layer could not be detected for synchronize time response - handling stopped", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
1011 return fmt.Errorf("omci Msg layer could not be detected for synchronize time response - handling stopped: %s", mm.pDeviceHandler.deviceID)
1012 }
1013 msgObj, msgOk := msgLayer.(*omci.SynchronizeTimeResponse)
1014 if !msgOk {
1015 logger.Errorw(ctx, "omci Msg layer could not be assigned for synchronize time response - handling stopped", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
1016 return fmt.Errorf("omci Msg layer could not be assigned for synchronize time response - handling stopped: %s", mm.pDeviceHandler.deviceID)
1017 }
1018 logger.Debugw(ctx, "OMCI synchronize time response Data", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "data-fields": msgObj})
1019 if msgObj.Result == me.Success {
1020 switch msgObj.EntityClass {
1021 case me.OnuGClassID:
1022 logger.Infow(ctx, "omci synchronize time success", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
1023 mm.syncTimeResponseChan <- true
1024 return nil
1025 default:
1026 logger.Errorw(ctx, "unhandled omci message",
1027 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "class-id": msgObj.EntityClass})
1028 }
1029 }
1030 mm.syncTimeResponseChan <- false
1031 logger.Errorf(ctx, "unhandled-omci-synchronize-time-response-message--error-code-%v", msgObj.Result)
1032 return fmt.Errorf("unhandled-omci-synchronize-time-response-message--error-code-%v", msgObj.Result)
Girish Gowdrae09a6202021-01-12 18:10:59 -08001033}
1034
1035// flushMetricCollectionChannels flushes all metric collection channels for any stale OMCI responses
1036func (mm *onuMetricsManager) flushMetricCollectionChannels(ctx context.Context) {
1037 // flush commMetricsChan
1038 select {
Girish Gowdrae0140f02021-02-02 16:55:09 -08001039 case <-mm.pAdaptFsm.commChan:
Girish Gowdrae09a6202021-01-12 18:10:59 -08001040 logger.Debug(ctx, "flushed common metrics channel")
1041 default:
1042 }
1043
1044 // flush opticalMetricsChan
1045 select {
1046 case <-mm.opticalMetricsChan:
1047 logger.Debug(ctx, "flushed optical metrics channel")
1048 default:
1049 }
1050
1051 // flush uniStatusMetricsChan
1052 select {
1053 case <-mm.uniStatusMetricsChan:
1054 logger.Debug(ctx, "flushed uni status metrics channel")
1055 default:
1056 }
Girish Gowdrae0140f02021-02-02 16:55:09 -08001057
1058 // flush syncTimeResponseChan
1059 select {
1060 case <-mm.syncTimeResponseChan:
1061 logger.Debug(ctx, "flushed sync time response channel")
1062 default:
1063 }
1064
1065 // flush l2PmChan
1066 select {
1067 case <-mm.l2PmChan:
1068 logger.Debug(ctx, "flushed L2 PM collection channel")
1069 default:
1070 }
1071
1072 // flush stopTicks
1073 select {
1074 case <-mm.stopTicks:
1075 logger.Debug(ctx, "flushed stopTicks channel")
1076 default:
1077 }
1078
1079}
1080
1081// ** L2 PM FSM Handlers start **
1082
1083func (mm *onuMetricsManager) l2PMFsmStarting(ctx context.Context, e *fsm.Event) {
Girish Gowdra0e533642021-03-02 22:02:51 -08001084 // restore data from KV store
1085 if err := mm.restorePmData(ctx); err != nil {
1086 logger.Errorw(ctx, "error restoring pm data", log.Fields{"err": err})
1087 // we continue given that it does not effect the actual services for the ONU,
1088 // but there may be some negative effect on PM collection (there may be some mismatch in
1089 // the actual PM config and what is present on the device).
1090 }
1091
Girish Gowdrae0140f02021-02-02 16:55:09 -08001092 // Loop through all the group metrics
1093 // If it is a L2 PM Interval metric and it is enabled, then if it is not in the
1094 // list of active L2 PM list then mark it for creation
1095 // It it is a L2 PM Interval metric and it is disabled, then if it is in the
1096 // list of active L2 PM list then mark it for deletion
1097 mm.onuMetricsManagerLock.Lock()
1098 for n, g := range mm.groupMetricMap {
1099 if g.isL2PMCounter { // it is a l2 pm counter
1100 if g.enabled { // metric enabled.
1101 found := false
1102 inner1:
1103 for _, v := range mm.activeL2Pms {
1104 if v == n {
1105 found = true // metric already present in active l2 pm list
1106 break inner1
1107 }
1108 }
1109 if !found { // metric not in active l2 pm list. Mark this to be added later
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001110 mm.l2PmToAdd = mm.appendIfMissingString(mm.l2PmToAdd, n)
Girish Gowdrae0140f02021-02-02 16:55:09 -08001111 }
1112 } else { // metric not enabled.
1113 found := false
1114 inner2:
1115 for _, v := range mm.activeL2Pms {
1116 if v == n {
1117 found = true // metric is found in active l2 pm list
1118 break inner2
1119 }
1120 }
1121 if found { // metric is found in active l2 pm list. Mark this to be deleted later
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001122 mm.l2PmToDelete = mm.appendIfMissingString(mm.l2PmToDelete, n)
Girish Gowdrae0140f02021-02-02 16:55:09 -08001123 }
1124 }
1125 }
1126 }
1127 mm.onuMetricsManagerLock.Unlock()
1128 logger.Debugw(ctx, "pms to add and delete",
1129 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "pms-to-add": mm.l2PmToAdd, "pms-to-delete": mm.l2PmToDelete})
1130 go func() {
1131 // push a tick event to move to next state
1132 if err := mm.pAdaptFsm.pFsm.Event(l2PmEventTick); err != nil {
1133 logger.Errorw(ctx, "error calling event", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "err": err})
1134 }
1135 }()
1136}
1137
1138func (mm *onuMetricsManager) l2PMFsmSyncTime(ctx context.Context, e *fsm.Event) {
1139 // Sync time with the ONU to establish 15min boundary for PM collection.
1140 if err := mm.syncTime(ctx); err != nil {
1141 go func() {
1142 time.Sleep(SyncTimeRetryInterval * time.Second) // retry to sync time after this timeout
1143 // This will result in FSM attempting to sync time again
1144 if err := mm.pAdaptFsm.pFsm.Event(l2PmEventFailure); err != nil {
1145 logger.Errorw(ctx, "error calling event", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "err": err})
1146 }
1147 }()
1148 }
1149 // Initiate a tick generation routine every L2PmCollectionInterval
1150 go mm.generateTicks(ctx)
1151
1152 go func() {
1153 if err := mm.pAdaptFsm.pFsm.Event(l2PmEventSuccess); err != nil {
1154 logger.Errorw(ctx, "error calling event", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "err": err})
1155 }
1156 }()
1157}
1158
1159func (mm *onuMetricsManager) l2PMFsmNull(ctx context.Context, e *fsm.Event) {
1160 // We need to reset the local data so that the L2 PM MEs are re-provisioned once the ONU is back up based on the latest PM CONFIG
1161 mm.onuMetricsManagerLock.Lock()
1162 mm.activeL2Pms = nil
1163 mm.l2PmToAdd = nil
1164 mm.l2PmToDelete = nil
1165 mm.onuMetricsManagerLock.Unlock()
Girish Gowdra0e533642021-03-02 22:02:51 -08001166 // If the FSM was stopped, then clear PM data from KV store
1167 // The FSM is stopped when ONU goes down. It is time to clear its data from store
1168 if e.Event == l2PmEventStop {
1169 _ = mm.clearPmGroupData(ctx) // ignore error
1170 }
1171
Girish Gowdrae0140f02021-02-02 16:55:09 -08001172}
1173func (mm *onuMetricsManager) l2PMFsmIdle(ctx context.Context, e *fsm.Event) {
1174 logger.Debugw(ctx, "Enter state idle", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
1175
1176 mm.onuMetricsManagerLock.RLock()
1177 numOfPmToDelete := len(mm.l2PmToDelete)
1178 numOfPmToAdd := len(mm.l2PmToAdd)
1179 mm.onuMetricsManagerLock.RUnlock()
1180
1181 if numOfPmToDelete > 0 {
1182 logger.Debugw(ctx, "state idle - pms to delete", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "pms-to-delete": numOfPmToDelete})
1183 go func() {
1184 if err := mm.pAdaptFsm.pFsm.Event(l2PmEventDeleteMe); err != nil {
1185 logger.Errorw(ctx, "error calling event", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "err": err})
1186 }
1187 }()
1188 } else if numOfPmToAdd > 0 {
1189 logger.Debugw(ctx, "state idle - pms to add", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "pms-to-add": numOfPmToAdd})
1190 go func() {
1191 if err := mm.pAdaptFsm.pFsm.Event(l2PmEventAddMe); err != nil {
1192 logger.Errorw(ctx, "error calling event", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "err": err})
1193 }
1194 }()
1195 }
1196}
1197
1198func (mm *onuMetricsManager) l2PmFsmCollectData(ctx context.Context, e *fsm.Event) {
1199 logger.Debugw(ctx, "state collect data", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
1200 // Copy the activeL2Pms for which we want to collect the metrics since activeL2Pms can change dynamically
1201 mm.onuMetricsManagerLock.RLock()
1202 copyOfActiveL2Pms := make([]string, len(mm.activeL2Pms))
1203 _ = copy(copyOfActiveL2Pms, mm.activeL2Pms)
1204 mm.onuMetricsManagerLock.RUnlock()
1205
1206 for _, n := range copyOfActiveL2Pms {
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001207 var metricInfoSlice []*voltha.MetricInformation
Girish Gowdra0e533642021-03-02 22:02:51 -08001208
1209 // mm.groupMetricMap[n].pmMEData.InstancesActive could dynamically change, so make a copy
1210 mm.onuMetricsManagerLock.RLock()
1211 copyOfEntityIDs := make([]uint16, len(mm.groupMetricMap[n].pmMEData.InstancesActive))
1212 _ = copy(copyOfEntityIDs, mm.groupMetricMap[n].pmMEData.InstancesActive)
1213 mm.onuMetricsManagerLock.RUnlock()
1214
Girish Gowdrae0140f02021-02-02 16:55:09 -08001215 switch n {
1216 case EthernetBridgeHistoryName:
1217 logger.Debugw(ctx, "state collect data - collecting data for EthernetFramePerformanceMonitoringHistoryData ME", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
Girish Gowdra0e533642021-03-02 22:02:51 -08001218 for _, entityID := range copyOfEntityIDs {
Girish Gowdrae0140f02021-02-02 16:55:09 -08001219 if metricInfo := mm.collectEthernetFramePerformanceMonitoringHistoryData(ctx, true, entityID); metricInfo != nil { // upstream
1220 metricInfoSlice = append(metricInfoSlice, metricInfo)
1221 }
1222 if metricInfo := mm.collectEthernetFramePerformanceMonitoringHistoryData(ctx, false, entityID); metricInfo != nil { // downstream
1223 metricInfoSlice = append(metricInfoSlice, metricInfo)
1224 }
1225 }
Girish Gowdrae0140f02021-02-02 16:55:09 -08001226 case EthernetUniHistoryName:
1227 logger.Debugw(ctx, "state collect data - collecting data for EthernetPerformanceMonitoringHistoryData ME", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
Girish Gowdra0e533642021-03-02 22:02:51 -08001228 for _, entityID := range copyOfEntityIDs {
1229 if metricInfo := mm.collectEthernetUniHistoryData(ctx, entityID); metricInfo != nil { // upstream
1230 metricInfoSlice = append(metricInfoSlice, metricInfo)
Girish Gowdrae0140f02021-02-02 16:55:09 -08001231 }
1232 }
Girish Gowdra0e533642021-03-02 22:02:51 -08001233
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001234 case FecHistoryName:
Girish Gowdra0e533642021-03-02 22:02:51 -08001235 for _, entityID := range copyOfEntityIDs {
1236 if metricInfo := mm.collectFecHistoryData(ctx, entityID); metricInfo != nil { // upstream
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001237 metricInfoSlice = append(metricInfoSlice, metricInfo)
Girish Gowdrae0140f02021-02-02 16:55:09 -08001238 }
Girish Gowdrae0140f02021-02-02 16:55:09 -08001239 }
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001240 case GemPortHistoryName:
Girish Gowdra0e533642021-03-02 22:02:51 -08001241 for _, entityID := range copyOfEntityIDs {
1242 if metricInfo := mm.collectGemHistoryData(ctx, entityID); metricInfo != nil { // upstream
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001243 metricInfoSlice = append(metricInfoSlice, metricInfo)
1244 }
1245 }
1246
Girish Gowdrae0140f02021-02-02 16:55:09 -08001247 default:
1248 logger.Errorw(ctx, "unsupported l2 pm", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "name": n})
1249 }
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001250 mm.handleMetricsPublish(ctx, n, metricInfoSlice)
Girish Gowdrae0140f02021-02-02 16:55:09 -08001251 }
1252 // Does not matter we send success or failure here.
1253 // Those PMs that we failed to collect data will be attempted to collect again in the next PM collection cycle (assuming
1254 // we have not exceed max attempts to collect the PM data)
1255 go func() {
1256 if err := mm.pAdaptFsm.pFsm.Event(l2PmEventSuccess); err != nil {
1257 logger.Errorw(ctx, "error calling event", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "err": err})
1258 }
1259 }()
1260}
1261
Girish Gowdra0e533642021-03-02 22:02:51 -08001262// nolint: gocyclo
Girish Gowdrae0140f02021-02-02 16:55:09 -08001263func (mm *onuMetricsManager) l2PmFsmCreatePM(ctx context.Context, e *fsm.Event) {
1264 // Copy the l2PmToAdd for which we want to collect the metrics since l2PmToAdd can change dynamically
1265 mm.onuMetricsManagerLock.RLock()
1266 copyOfL2PmToAdd := make([]string, len(mm.l2PmToAdd))
1267 _ = copy(copyOfL2PmToAdd, mm.l2PmToAdd)
1268 mm.onuMetricsManagerLock.RUnlock()
1269
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001270 logger.Debugw(ctx, "state create pm - start", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "pms-to-add": copyOfL2PmToAdd})
Girish Gowdrae0140f02021-02-02 16:55:09 -08001271 for _, n := range copyOfL2PmToAdd {
1272 resp := false
Girish Gowdra0e533642021-03-02 22:02:51 -08001273 atLeastOneSuccess := false // flag indicates if at least one ME instance of the PM was successfully created.
1274 cnt := 0
Girish Gowdrae0140f02021-02-02 16:55:09 -08001275 switch n {
1276 case EthernetBridgeHistoryName:
1277 boolForDirection := make([]bool, 2) // stores true and false to indicate upstream and downstream directions.
1278 boolForDirection = append(boolForDirection, true, false)
1279 // Create ME twice, one for each direction. Boolean true is used to indicate upstream and false for downstream.
1280 for _, direction := range boolForDirection {
Girish Gowdrae0140f02021-02-02 16:55:09 -08001281 for _, uniPort := range mm.pDeviceHandler.uniEntityMap {
1282 // Attach the EthernetFramePerformanceMonitoringHistoryData ME to MacBridgePortConfigData on the UNI port
1283 entityID := macBridgePortAniEID + uniPort.entityID
Girish Gowdra0e533642021-03-02 22:02:51 -08001284 _ = mm.updatePmData(ctx, n, entityID, cPmAdd) // TODO: ignore error for now
1285 inner1:
1286 // retry L2PmCreateAttempts times to create the instance of PM
1287 for cnt = 0; cnt < L2PmCreateAttempts; cnt++ {
1288 mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteEthernetPerformanceMonitoringHistoryME(
Girish Gowdra0b235842021-03-09 13:06:46 -08001289 ctx, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, direction, true, mm.pAdaptFsm.commChan, entityID)
Girish Gowdra0e533642021-03-02 22:02:51 -08001290 if resp = mm.waitForResponseOrTimeout(ctx, true, entityID, "EthernetFramePerformanceMonitoringHistoryData"); resp {
1291 atLeastOneSuccess = true
1292 _ = mm.updatePmData(ctx, n, entityID, cPmAdded) // TODO: ignore error for now
1293 break inner1
1294 }
1295 }
1296 if cnt == L2PmCreateAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
1297 _ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
Girish Gowdrae0140f02021-02-02 16:55:09 -08001298 }
1299 }
1300 }
1301 case EthernetUniHistoryName:
Girish Gowdrae0140f02021-02-02 16:55:09 -08001302 for _, uniPort := range mm.pDeviceHandler.uniEntityMap {
1303 if uniPort.portType == uniPPTP { // This metric is only applicable for PPTP Uni Type
Girish Gowdra0e533642021-03-02 22:02:51 -08001304 // Attach the EthernetPerformanceMonitoringHistoryData ME to PPTP port instance
Girish Gowdrae0140f02021-02-02 16:55:09 -08001305 entityID := uniPort.entityID
Girish Gowdra0e533642021-03-02 22:02:51 -08001306 _ = mm.updatePmData(ctx, n, entityID, cPmAdd) // TODO: ignore error for now
1307 inner2:
1308 // retry L2PmCreateAttempts times to create the instance of PM
1309 for cnt = 0; cnt < L2PmCreateAttempts; cnt++ {
1310 mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteEthernetUniHistoryME(
Girish Gowdra0b235842021-03-09 13:06:46 -08001311 ctx, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, true, mm.pAdaptFsm.commChan, entityID)
Girish Gowdra0e533642021-03-02 22:02:51 -08001312 if resp = mm.waitForResponseOrTimeout(ctx, true, entityID, "EthernetPerformanceMonitoringHistoryData"); resp {
1313 atLeastOneSuccess = true
1314 _ = mm.updatePmData(ctx, n, entityID, cPmAdded) // TODO: ignore error for now
1315 break inner2
1316 }
1317 }
1318 if cnt == L2PmCreateAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
1319 _ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
Girish Gowdrae0140f02021-02-02 16:55:09 -08001320 }
1321 }
1322 }
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001323 case FecHistoryName:
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001324 for _, anigInstID := range mm.pDeviceHandler.pOnuOmciDevice.pOnuDB.getSortedInstKeys(ctx, me.AniGClassID) {
Girish Gowdra0e533642021-03-02 22:02:51 -08001325 // Attach the FecPerformanceMonitoringHistoryData ME to the ANI-G ME instance
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001326 mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteFecHistoryME(
Girish Gowdra0b235842021-03-09 13:06:46 -08001327 ctx, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, true, mm.pAdaptFsm.commChan, anigInstID)
Girish Gowdra0e533642021-03-02 22:02:51 -08001328 _ = mm.updatePmData(ctx, n, anigInstID, cPmAdd) // TODO: ignore error for now
1329 inner3:
1330 // retry L2PmCreateAttempts times to create the instance of PM
1331 for cnt = 0; cnt < L2PmCreateAttempts; cnt++ {
1332 if resp = mm.waitForResponseOrTimeout(ctx, true, anigInstID, "FecPerformanceMonitoringHistoryData"); resp {
1333 atLeastOneSuccess = true
1334 _ = mm.updatePmData(ctx, n, anigInstID, cPmAdded) // TODO: ignore error for now
1335 break inner3
1336 }
1337 }
1338 if cnt == L2PmCreateAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
1339 _ = mm.updatePmData(ctx, n, anigInstID, cPmRemoved) // TODO: ignore error for now
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001340 }
1341 }
1342 case GemPortHistoryName:
1343
1344 mm.onuMetricsManagerLock.RLock()
Girish Gowdra0e533642021-03-02 22:02:51 -08001345 copyOfGemPortInstIDsToAdd := make([]uint16, len(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd))
1346 _ = copy(copyOfGemPortInstIDsToAdd, mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd)
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001347 mm.onuMetricsManagerLock.RUnlock()
Girish Gowdra0e533642021-03-02 22:02:51 -08001348
1349 if len(copyOfGemPortInstIDsToAdd) == 0 {
1350 // If there are no gemport history MEs to be created, just skip further processing
1351 // Otherwise down below (after 'switch' case handling) we assume the ME creation failed because resp and atLeastOneSuccess flag are false.
1352 // Normally there are no GemPortHistory MEs to create at start up. They come in only after provisioning service on the ONU.
1353 mm.onuMetricsManagerLock.Lock()
1354 mm.l2PmToAdd = mm.removeIfFoundString(mm.l2PmToAdd, n)
1355 mm.onuMetricsManagerLock.Unlock()
1356 continue
1357 }
1358
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001359 for _, v := range copyOfGemPortInstIDsToAdd {
1360 mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteGemPortHistoryME(
Girish Gowdra0b235842021-03-09 13:06:46 -08001361 ctx, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, true, mm.pAdaptFsm.commChan, v)
Girish Gowdra0e533642021-03-02 22:02:51 -08001362 _ = mm.updatePmData(ctx, n, v, cPmAdd) // TODO: ignore error for now
1363 inner4:
1364 // retry L2PmCreateAttempts times to create the instance of PM
1365 for cnt = 0; cnt < L2PmCreateAttempts; cnt++ {
1366 if resp = mm.waitForResponseOrTimeout(ctx, true, v, "GemPortNetworkCtpPerformanceMonitoringHistoryData"); resp {
1367 atLeastOneSuccess = true
1368 _ = mm.updatePmData(ctx, n, v, cPmAdded) // TODO: ignore error for now
1369 break inner4
1370 }
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001371 }
Girish Gowdra0e533642021-03-02 22:02:51 -08001372 if cnt == L2PmCreateAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
1373 _ = mm.updatePmData(ctx, n, v, cPmRemoved) // TODO: ignore error for now
1374 }
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001375 }
1376
Girish Gowdrae0140f02021-02-02 16:55:09 -08001377 default:
1378 logger.Errorw(ctx, "unsupported l2 pm", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "name": n})
1379 }
Girish Gowdra0e533642021-03-02 22:02:51 -08001380 // On success of at least one instance of the PM for a given ME, update the local list maintained for active PMs and PMs to add
1381 if atLeastOneSuccess {
Girish Gowdrae0140f02021-02-02 16:55:09 -08001382 mm.onuMetricsManagerLock.Lock()
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001383 mm.activeL2Pms = mm.appendIfMissingString(mm.activeL2Pms, n)
1384 mm.l2PmToAdd = mm.removeIfFoundString(mm.l2PmToAdd, n)
1385 logger.Debugw(ctx, "success-resp", log.Fields{"pm-name": n, "active-l2-pms": mm.activeL2Pms, "pms-to-add": mm.l2PmToAdd})
Girish Gowdrae0140f02021-02-02 16:55:09 -08001386 mm.onuMetricsManagerLock.Unlock()
1387 } else {
Girish Gowdra0e533642021-03-02 22:02:51 -08001388 // If we are here then no instance of the PM of the given ME were created successfully, so locally disable the PM
Girish Gowdrae0140f02021-02-02 16:55:09 -08001389 // and also remove it from l2PmToAdd slice so that we do not try to create the PM ME anymore
1390 mm.onuMetricsManagerLock.Lock()
Girish Gowdra0e533642021-03-02 22:02:51 -08001391 logger.Debugw(ctx, "exceeded-max-add-retry-attempts--disabling-group", log.Fields{"groupName": n})
1392 mm.groupMetricMap[n].enabled = false
1393 mm.l2PmToAdd = mm.removeIfFoundString(mm.l2PmToAdd, n)
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001394
Girish Gowdrae0140f02021-02-02 16:55:09 -08001395 logger.Warnw(ctx, "state create pm - failed to create pm",
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001396 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "metricName": n,
Girish Gowdra0e533642021-03-02 22:02:51 -08001397 "active-l2-pms": mm.activeL2Pms, "pms-to-add": mm.l2PmToAdd})
Girish Gowdrae0140f02021-02-02 16:55:09 -08001398 mm.onuMetricsManagerLock.Unlock()
1399 }
1400 }
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001401 logger.Debugw(ctx, "state create pm - done", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "active-l2-pms": mm.activeL2Pms, "pms-to-add": mm.l2PmToAdd})
Girish Gowdrae0140f02021-02-02 16:55:09 -08001402 // Does not matter we send success or failure here.
1403 // Those PMs that we failed to create will be attempted to create again in the next PM creation cycle (assuming
1404 // we have not exceed max attempts to create the PM ME)
1405 go func() {
1406 if err := mm.pAdaptFsm.pFsm.Event(l2PmEventSuccess); err != nil {
1407 logger.Errorw(ctx, "error calling event", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "err": err})
1408 }
1409 }()
1410}
1411
Girish Gowdra0e533642021-03-02 22:02:51 -08001412// nolint: gocyclo
Girish Gowdrae0140f02021-02-02 16:55:09 -08001413func (mm *onuMetricsManager) l2PmFsmDeletePM(ctx context.Context, e *fsm.Event) {
1414 // Copy the l2PmToDelete for which we want to collect the metrics since l2PmToDelete can change dynamically
1415 mm.onuMetricsManagerLock.RLock()
1416 copyOfL2PmToDelete := make([]string, len(mm.l2PmToDelete))
1417 _ = copy(copyOfL2PmToDelete, mm.l2PmToDelete)
1418 mm.onuMetricsManagerLock.RUnlock()
1419
1420 logger.Debugw(ctx, "state delete pm", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "pms-to-delete": mm.l2PmToDelete})
1421 for _, n := range copyOfL2PmToDelete {
1422 resp := false
Girish Gowdra0e533642021-03-02 22:02:51 -08001423 cnt := 0
1424 atLeastOneDeleteFailure := false
1425
1426 // mm.groupMetricMap[n].pmMEData.InstancesActive could dynamically change, so make a copy
1427 mm.onuMetricsManagerLock.RLock()
1428 copyOfEntityIDs := make([]uint16, len(mm.groupMetricMap[n].pmMEData.InstancesActive))
1429 _ = copy(copyOfEntityIDs, mm.groupMetricMap[n].pmMEData.InstancesActive)
1430 mm.onuMetricsManagerLock.RUnlock()
1431
1432 if len(copyOfEntityIDs) == 0 {
1433 // if there are no enityIDs to remove for the PM ME just clear the PM name entry from cache and continue
1434 mm.onuMetricsManagerLock.Lock()
1435 mm.activeL2Pms = mm.removeIfFoundString(mm.activeL2Pms, n)
1436 mm.l2PmToDelete = mm.removeIfFoundString(mm.l2PmToDelete, n)
1437 logger.Debugw(ctx, "success-resp", log.Fields{"pm-name": n, "active-l2-pms": mm.activeL2Pms, "pms-to-delete": mm.l2PmToDelete})
1438 mm.onuMetricsManagerLock.Unlock()
1439 continue
1440 }
1441 logger.Debugw(ctx, "entities to delete", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "metricName": n, "entityIDs": copyOfEntityIDs})
Girish Gowdrae0140f02021-02-02 16:55:09 -08001442 switch n {
1443 case EthernetBridgeHistoryName:
1444 boolForDirection := make([]bool, 2) // stores true and false to indicate upstream and downstream directions.
1445 boolForDirection = append(boolForDirection, true, false)
1446 // Create ME twice, one for each direction. Boolean true is used to indicate upstream and false for downstream.
1447 for _, direction := range boolForDirection {
Girish Gowdra0e533642021-03-02 22:02:51 -08001448 for _, entityID := range copyOfEntityIDs {
1449 inner1:
1450 // retry L2PmDeleteAttempts times to delete the instance of PM
1451 for cnt = 0; cnt < L2PmDeleteAttempts; cnt++ {
1452 mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteEthernetPerformanceMonitoringHistoryME(
Girish Gowdra0b235842021-03-09 13:06:46 -08001453 ctx, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, direction, false, mm.pAdaptFsm.commChan, entityID)
Girish Gowdra0e533642021-03-02 22:02:51 -08001454 _ = mm.updatePmData(ctx, n, entityID, cPmRemove) // TODO: ignore error for now
1455 if resp = mm.waitForResponseOrTimeout(ctx, false, entityID, "EthernetFramePerformanceMonitoringHistoryData"); !resp {
1456 atLeastOneDeleteFailure = true
1457 } else {
1458 _ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
1459 break inner1
1460 }
1461 }
1462 if cnt == L2PmDeleteAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
1463 _ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
Girish Gowdrae0140f02021-02-02 16:55:09 -08001464 }
1465 }
1466 }
1467 case EthernetUniHistoryName:
Girish Gowdra0e533642021-03-02 22:02:51 -08001468 for _, entityID := range copyOfEntityIDs {
1469 inner2:
1470 // retry L2PmDeleteAttempts times to delete the instance of PM
1471 for cnt = 0; cnt < L2PmDeleteAttempts; cnt++ {
Girish Gowdrae0140f02021-02-02 16:55:09 -08001472 mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteEthernetUniHistoryME(
Girish Gowdra0b235842021-03-09 13:06:46 -08001473 ctx, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, false, mm.pAdaptFsm.commChan, entityID)
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001474 if resp = mm.waitForResponseOrTimeout(ctx, false, entityID, "EthernetPerformanceMonitoringHistoryData"); !resp {
Girish Gowdra0e533642021-03-02 22:02:51 -08001475 atLeastOneDeleteFailure = true
1476 } else {
1477 _ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001478 break inner2
Girish Gowdrae0140f02021-02-02 16:55:09 -08001479 }
1480 }
Girish Gowdra0e533642021-03-02 22:02:51 -08001481 if cnt == L2PmDeleteAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
1482 _ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
1483 }
Girish Gowdrae0140f02021-02-02 16:55:09 -08001484 }
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001485 case FecHistoryName:
Girish Gowdra0e533642021-03-02 22:02:51 -08001486 for _, entityID := range copyOfEntityIDs {
1487 inner3:
1488 // retry L2PmDeleteAttempts times to delete the instance of PM
1489 for cnt = 0; cnt < L2PmDeleteAttempts; cnt++ {
1490 mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteFecHistoryME(
Girish Gowdra0b235842021-03-09 13:06:46 -08001491 ctx, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, false, mm.pAdaptFsm.commChan, entityID)
Girish Gowdra0e533642021-03-02 22:02:51 -08001492 if resp := mm.waitForResponseOrTimeout(ctx, false, entityID, "FecPerformanceMonitoringHistoryData"); !resp {
1493 atLeastOneDeleteFailure = true
1494 } else {
1495 _ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
1496 break inner3
1497 }
1498 }
1499 if cnt == L2PmDeleteAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
1500 _ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001501 }
1502 }
1503 case GemPortHistoryName:
Girish Gowdra0e533642021-03-02 22:02:51 -08001504 for _, entityID := range copyOfEntityIDs {
1505 inner4:
1506 // retry L2PmDeleteAttempts times to delete the instance of PM
1507 for cnt = 0; cnt < L2PmDeleteAttempts; cnt++ {
1508 mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendCreateOrDeleteGemPortHistoryME(
Girish Gowdra0b235842021-03-09 13:06:46 -08001509 ctx, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, false, mm.pAdaptFsm.commChan, entityID)
Girish Gowdra0e533642021-03-02 22:02:51 -08001510 if resp = mm.waitForResponseOrTimeout(ctx, false, entityID, "GemPortNetworkCtpPerformanceMonitoringHistoryData"); !resp {
1511 atLeastOneDeleteFailure = true
1512 } else {
1513 _ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
1514 break inner4
1515 }
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001516 }
Girish Gowdra0e533642021-03-02 22:02:51 -08001517 if cnt == L2PmDeleteAttempts { // if we reached max attempts just give up hope on this given instance of the PM ME
1518 _ = mm.updatePmData(ctx, n, entityID, cPmRemoved) // TODO: ignore error for now
1519 }
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001520 }
Girish Gowdrae0140f02021-02-02 16:55:09 -08001521 default:
1522 logger.Errorw(ctx, "unsupported l2 pm", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "name": n})
1523 }
Girish Gowdra0e533642021-03-02 22:02:51 -08001524 // If we could not completely clean up the PM ME then just give up.
1525 if atLeastOneDeleteFailure {
1526 logger.Warnw(ctx, "state delete pm - failed to delete at least one instance of the PM ME",
1527 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "metricName": n,
1528 "active-l2-pms": mm.activeL2Pms, "pms-to-delete": mm.l2PmToDelete})
1529 mm.onuMetricsManagerLock.Lock()
1530 logger.Debugw(ctx, "exceeded-max-delete-retry-attempts--disabling-group", log.Fields{"groupName": n})
1531 mm.activeL2Pms = mm.removeIfFoundString(mm.activeL2Pms, n)
1532 mm.l2PmToDelete = mm.removeIfFoundString(mm.l2PmToDelete, n)
1533 mm.groupMetricMap[n].enabled = false
1534 mm.onuMetricsManagerLock.Unlock()
1535 } else { // success case
Girish Gowdrae0140f02021-02-02 16:55:09 -08001536 mm.onuMetricsManagerLock.Lock()
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001537 mm.activeL2Pms = mm.removeIfFoundString(mm.activeL2Pms, n)
1538 mm.l2PmToDelete = mm.removeIfFoundString(mm.l2PmToDelete, n)
1539 logger.Debugw(ctx, "success-resp", log.Fields{"pm-name": n, "active-l2-pms": mm.activeL2Pms, "pms-to-delete": mm.l2PmToDelete})
Girish Gowdrae0140f02021-02-02 16:55:09 -08001540 mm.onuMetricsManagerLock.Unlock()
1541 }
1542 }
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001543 logger.Debugw(ctx, "state delete pm - done", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "active-l2-pms": mm.activeL2Pms, "pms-to-delete": mm.l2PmToDelete})
Girish Gowdrae0140f02021-02-02 16:55:09 -08001544 // Does not matter we send success or failure here.
1545 // Those PMs that we failed to delete will be attempted to create again in the next PM collection cycle
1546 go func() {
1547 if err := mm.pAdaptFsm.pFsm.Event(l2PmEventSuccess); err != nil {
1548 logger.Errorw(ctx, "error calling event", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "err": err})
1549 }
1550 }()
1551}
1552
1553// ** L2 PM FSM Handlers end **
1554
1555// syncTime synchronizes time with the ONU to establish a 15 min boundary for PM collection and reporting.
1556func (mm *onuMetricsManager) syncTime(ctx context.Context) error {
Girish Gowdra0b235842021-03-09 13:06:46 -08001557 if err := mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendSyncTime(ctx, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, mm.pAdaptFsm.commChan); err != nil {
Girish Gowdrae0140f02021-02-02 16:55:09 -08001558 logger.Errorw(ctx, "cannot send sync time request", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
1559 return err
1560 }
1561
1562 select {
Girish Gowdra0b235842021-03-09 13:06:46 -08001563 case <-time.After(time.Duration(mm.pDeviceHandler.pOpenOnuAc.omciTimeout) * time.Second):
Girish Gowdra7b0ee5c2021-03-19 21:48:15 -07001564 logger.Errorw(ctx, "timed out waiting for sync time response from onu", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
Girish Gowdrae0140f02021-02-02 16:55:09 -08001565 return fmt.Errorf("timed-out-waiting-for-sync-time-response-%v", mm.pDeviceHandler.deviceID)
1566 case syncTimeRes := <-mm.syncTimeResponseChan:
1567 if !syncTimeRes {
1568 return fmt.Errorf("failed-to-sync-time-%v", mm.pDeviceHandler.deviceID)
1569 }
1570 logger.Infow(ctx, "sync time success", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
1571 return nil
1572 }
1573}
1574
1575func (mm *onuMetricsManager) collectEthernetFramePerformanceMonitoringHistoryData(ctx context.Context, upstream bool, entityID uint16) *voltha.MetricInformation {
1576 var mEnt *me.ManagedEntity
1577 var omciErr me.OmciErrors
1578 var classID me.ClassID
1579 var meAttributes me.AttributeValueMap
1580 logger.Debugw(ctx, "collecting data for EthernetFramePerformanceMonitoringHistoryData", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID, "upstream": upstream})
1581 meParam := me.ParamData{EntityID: entityID}
1582 if upstream {
1583 if mEnt, omciErr = me.NewEthernetFramePerformanceMonitoringHistoryDataUpstream(meParam); omciErr == nil || mEnt == nil || omciErr.GetError() != nil {
1584 logger.Errorw(ctx, "error creating me", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID, "upstream": upstream})
1585 return nil
1586 }
1587 classID = me.EthernetFramePerformanceMonitoringHistoryDataUpstreamClassID
1588 } else {
1589 if mEnt, omciErr = me.NewEthernetFramePerformanceMonitoringHistoryDataDownstream(meParam); omciErr == nil || mEnt == nil || omciErr.GetError() != nil {
1590 logger.Errorw(ctx, "error creating me", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID, "upstream": upstream})
1591 return nil
1592 }
1593 classID = me.EthernetFramePerformanceMonitoringHistoryDataDownstreamClassID
1594 }
1595
Girish Gowdrae0140f02021-02-02 16:55:09 -08001596 intervalEndTime := -1
1597 ethPMHistData := make(map[string]float32)
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001598 if err := mm.populateGroupSpecificMetrics(ctx, mEnt, classID, entityID, meAttributes, ethPMHistData, &intervalEndTime); err != nil {
1599 return nil
Girish Gowdrae0140f02021-02-02 16:55:09 -08001600 }
1601
1602 // Populate some relevant context for the EthernetFramePerformanceMonitoringHistoryData PM
1603 ethPMHistData["class_id"] = float32(classID)
1604 ethPMHistData["interval_end_time"] = float32(intervalEndTime)
1605 ethPMHistData["parent_class_id"] = float32(me.MacBridgeConfigurationDataClassID) // EthernetFramePerformanceMonitoringHistoryData is attached to MBPCD ME
1606 ethPMHistData["parent_entity_id"] = float32(entityID)
1607 if upstream {
1608 ethPMHistData["upstream"] = float32(1)
1609 } else {
1610 ethPMHistData["upstream"] = float32(0)
1611 }
1612
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001613 metricInfo := mm.populateOnuMetricInfo(EthernetBridgeHistoryName, ethPMHistData)
Girish Gowdrae0140f02021-02-02 16:55:09 -08001614
Girish Gowdrae0140f02021-02-02 16:55:09 -08001615 logger.Debugw(ctx, "collecting data for EthernetFramePerformanceMonitoringHistoryData successful",
1616 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID, "upstream": upstream, "metricInfo": metricInfo})
1617 return &metricInfo
1618}
1619
1620func (mm *onuMetricsManager) collectEthernetUniHistoryData(ctx context.Context, entityID uint16) *voltha.MetricInformation {
1621 var mEnt *me.ManagedEntity
1622 var omciErr me.OmciErrors
1623 var classID me.ClassID
1624 var meAttributes me.AttributeValueMap
1625 logger.Debugw(ctx, "collecting data for EthernetFramePerformanceMonitoringHistoryData", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID})
1626 meParam := me.ParamData{EntityID: entityID}
1627 if mEnt, omciErr = me.NewEthernetPerformanceMonitoringHistoryData(meParam); omciErr == nil || mEnt == nil || omciErr.GetError() != nil {
1628 logger.Errorw(ctx, "error creating me", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID})
1629 return nil
1630 }
1631 classID = me.EthernetPerformanceMonitoringHistoryDataClassID
1632
Girish Gowdrae0140f02021-02-02 16:55:09 -08001633 intervalEndTime := -1
1634 ethUniHistData := make(map[string]float32)
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001635 if err := mm.populateGroupSpecificMetrics(ctx, mEnt, classID, entityID, meAttributes, ethUniHistData, &intervalEndTime); err != nil {
1636 return nil
Girish Gowdrae0140f02021-02-02 16:55:09 -08001637 }
1638
1639 // Populate some relevant context for the EthernetPerformanceMonitoringHistoryData PM
1640 ethUniHistData["class_id"] = float32(classID)
1641 ethUniHistData["interval_end_time"] = float32(intervalEndTime)
1642
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001643 metricInfo := mm.populateOnuMetricInfo(EthernetUniHistoryName, ethUniHistData)
Girish Gowdrae0140f02021-02-02 16:55:09 -08001644
Girish Gowdrae0140f02021-02-02 16:55:09 -08001645 logger.Debugw(ctx, "collecting data for EthernetPerformanceMonitoringHistoryData successful",
1646 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID, "metricInfo": metricInfo})
1647 return &metricInfo
1648}
1649
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001650func (mm *onuMetricsManager) collectFecHistoryData(ctx context.Context, entityID uint16) *voltha.MetricInformation {
1651 var mEnt *me.ManagedEntity
1652 var omciErr me.OmciErrors
1653 var classID me.ClassID
1654 var meAttributes me.AttributeValueMap
1655 logger.Debugw(ctx, "collecting data for FecPerformanceMonitoringHistoryData", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID})
1656 meParam := me.ParamData{EntityID: entityID}
1657 if mEnt, omciErr = me.NewFecPerformanceMonitoringHistoryData(meParam); omciErr == nil || mEnt == nil || omciErr.GetError() != nil {
1658 logger.Errorw(ctx, "error creating me", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID})
1659 return nil
1660 }
1661 classID = me.FecPerformanceMonitoringHistoryDataClassID
1662
1663 intervalEndTime := -1
1664 fecHistData := make(map[string]float32)
1665 if err := mm.populateGroupSpecificMetrics(ctx, mEnt, classID, entityID, meAttributes, fecHistData, &intervalEndTime); err != nil {
1666 return nil
1667 }
1668
1669 // Populate some relevant context for the EthernetPerformanceMonitoringHistoryData PM
1670 fecHistData["class_id"] = float32(classID)
1671 fecHistData["interval_end_time"] = float32(intervalEndTime)
1672
1673 metricInfo := mm.populateOnuMetricInfo(FecHistoryName, fecHistData)
1674
1675 logger.Debugw(ctx, "collecting data for FecPerformanceMonitoringHistoryData successful",
1676 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID, "metricInfo": metricInfo})
1677 return &metricInfo
1678}
1679
1680func (mm *onuMetricsManager) collectGemHistoryData(ctx context.Context, entityID uint16) *voltha.MetricInformation {
1681 var mEnt *me.ManagedEntity
1682 var omciErr me.OmciErrors
1683 var classID me.ClassID
1684 var meAttributes me.AttributeValueMap
1685 logger.Debugw(ctx, "collecting data for GemPortNetworkCtpPerformanceMonitoringHistoryData", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID})
1686 meParam := me.ParamData{EntityID: entityID}
1687 if mEnt, omciErr = me.NewGemPortNetworkCtpPerformanceMonitoringHistoryData(meParam); omciErr == nil || mEnt == nil || omciErr.GetError() != nil {
1688 logger.Errorw(ctx, "error creating me", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID})
1689 return nil
1690 }
1691 classID = me.GemPortNetworkCtpPerformanceMonitoringHistoryDataClassID
1692
1693 intervalEndTime := -1
1694 gemHistData := make(map[string]float32)
1695 if err := mm.populateGroupSpecificMetrics(ctx, mEnt, classID, entityID, meAttributes, gemHistData, &intervalEndTime); err != nil {
1696 return nil
1697 }
1698
1699 // Populate some relevant context for the GemPortNetworkCtpPerformanceMonitoringHistoryData PM
1700 gemHistData["class_id"] = float32(classID)
1701 gemHistData["interval_end_time"] = float32(intervalEndTime)
1702
1703 metricInfo := mm.populateOnuMetricInfo(GemPortHistoryName, gemHistData)
1704
1705 logger.Debugw(ctx, "collecting data for GemPortNetworkCtpPerformanceMonitoringHistoryData successful",
1706 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID, "metricInfo": metricInfo})
1707 return &metricInfo
1708}
1709
Girish Gowdrae0140f02021-02-02 16:55:09 -08001710// nolint: gocyclo
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001711func (mm *onuMetricsManager) populateEthernetBridgeHistoryMetrics(ctx context.Context, classID me.ClassID, entityID uint16,
Girish Gowdrae0140f02021-02-02 16:55:09 -08001712 meAttributes me.AttributeValueMap, requestedAttributes me.AttributeValueMap, ethPMHistData map[string]float32, intervalEndTime *int) error {
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001713 upstream := false
1714 if classID == me.EthernetFramePerformanceMonitoringHistoryDataUpstreamClassID {
1715 upstream = true
1716 }
Girish Gowdrae0140f02021-02-02 16:55:09 -08001717 // Make sure "IntervalEndTime" is part of the requested attributes as we need this to compare the get responses when get request is multipart
1718 if _, ok := requestedAttributes["IntervalEndTime"]; !ok {
1719 requestedAttributes["IntervalEndTime"] = 0
1720 }
Girish Gowdra0b235842021-03-09 13:06:46 -08001721 if meInstance := mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendGetMe(ctx, classID, entityID, requestedAttributes, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, mm.pAdaptFsm.commChan); meInstance != nil {
Girish Gowdrae0140f02021-02-02 16:55:09 -08001722 select {
1723 case meAttributes = <-mm.l2PmChan:
1724 logger.Debugw(ctx, "received ethernet pm history data metrics",
1725 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "upstream": upstream, "entityID": entityID})
Girish Gowdra0b235842021-03-09 13:06:46 -08001726 case <-time.After(time.Duration(mm.pDeviceHandler.pOpenOnuAc.omciTimeout) * time.Second):
Girish Gowdrae0140f02021-02-02 16:55:09 -08001727 logger.Errorw(ctx, "timeout waiting for omci-get response for ethernet pm history data",
1728 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "upstream": upstream, "entityID": entityID})
1729 // The metrics will be empty in this case
1730 return fmt.Errorf("timeout-during-l2-pm-collection-for-ethernet-bridge-history-%v", mm.pDeviceHandler.deviceID)
1731 }
1732 // verify that interval end time has not changed during metric collection. If it changed, we abort the procedure
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001733 if valid := mm.updateAndValidateIntervalEndTime(ctx, entityID, meAttributes, intervalEndTime); !valid {
1734 return fmt.Errorf("interval-end-time-changed-during-metric-collection-for-ethernet-bridge-history-%v", mm.pDeviceHandler.deviceID)
Girish Gowdrae0140f02021-02-02 16:55:09 -08001735 }
1736 }
1737 for k := range EthernetBridgeHistory {
1738 // populate ethPMHistData only if metric key not already present (or populated), since it is possible that we populate
1739 // the attributes in multiple iterations for a given L2 PM ME as there is a limit on the max OMCI GET payload size.
1740 if _, ok := ethPMHistData[k]; !ok {
1741 switch k {
Girish Gowdra0e533642021-03-02 22:02:51 -08001742 case "entity_id":
1743 if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
1744 ethPMHistData[k] = float32(val.(uint16))
1745 }
Girish Gowdrae0140f02021-02-02 16:55:09 -08001746 case "drop_events":
1747 if val, ok := meAttributes["DropEvents"]; ok && val != nil {
1748 ethPMHistData[k] = float32(val.(uint32))
1749 }
1750 case "octets":
1751 if val, ok := meAttributes["Octets"]; ok && val != nil {
1752 ethPMHistData[k] = float32(val.(uint32))
1753 }
1754 case "packets":
1755 if val, ok := meAttributes["Packets"]; ok && val != nil {
1756 ethPMHistData[k] = float32(val.(uint32))
1757 }
1758 case "broadcast_packets":
1759 if val, ok := meAttributes["BroadcastPackets"]; ok && val != nil {
1760 ethPMHistData[k] = float32(val.(uint32))
1761 }
1762 case "multicast_packets":
1763 if val, ok := meAttributes["MulticastPackets"]; ok && val != nil {
1764 ethPMHistData[k] = float32(val.(uint32))
1765 }
1766 case "crc_errored_packets":
1767 if val, ok := meAttributes["CrcErroredPackets"]; ok && val != nil {
1768 ethPMHistData[k] = float32(val.(uint32))
1769 }
1770 case "undersize_packets":
1771 if val, ok := meAttributes["UndersizePackets"]; ok && val != nil {
1772 ethPMHistData[k] = float32(val.(uint32))
1773 }
1774 case "oversize_packets":
1775 if val, ok := meAttributes["OversizePackets"]; ok && val != nil {
1776 ethPMHistData[k] = float32(val.(uint32))
1777 }
1778 case "64_octets":
1779 if val, ok := meAttributes["Packets64Octets"]; ok && val != nil {
1780 ethPMHistData[k] = float32(val.(uint32))
1781 }
1782 case "65_to_127_octets":
1783 if val, ok := meAttributes["Packets65To127Octets"]; ok && val != nil {
1784 ethPMHistData[k] = float32(val.(uint32))
1785 }
1786 case "128_to_255_octets":
1787 if val, ok := meAttributes["Packets128To255Octets"]; ok && val != nil {
1788 ethPMHistData[k] = float32(val.(uint32))
1789 }
1790 case "256_to_511_octets":
1791 if val, ok := meAttributes["Packets256To511Octets"]; ok && val != nil {
1792 ethPMHistData[k] = float32(val.(uint32))
1793 }
1794 case "512_to_1023_octets":
1795 if val, ok := meAttributes["Packets512To1023Octets"]; ok && val != nil {
1796 ethPMHistData[k] = float32(val.(uint32))
1797 }
1798 case "1024_to_1518_octets":
1799 if val, ok := meAttributes["Packets1024To1518Octets"]; ok && val != nil {
1800 ethPMHistData[k] = float32(val.(uint32))
1801 }
1802 default:
1803 // do nothing
1804 }
1805 }
1806 }
1807 return nil
1808}
1809
1810// nolint: gocyclo
1811func (mm *onuMetricsManager) populateEthernetUniHistoryMetrics(ctx context.Context, classID me.ClassID, entityID uint16,
1812 meAttributes me.AttributeValueMap, requestedAttributes me.AttributeValueMap, ethPMUniHistData map[string]float32, intervalEndTime *int) error {
1813 // Make sure "IntervalEndTime" is part of the requested attributes as we need this to compare the get responses when get request is multipart
1814 if _, ok := requestedAttributes["IntervalEndTime"]; !ok {
1815 requestedAttributes["IntervalEndTime"] = 0
1816 }
Girish Gowdra0b235842021-03-09 13:06:46 -08001817 if meInstance := mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendGetMe(ctx, classID, entityID, requestedAttributes, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, mm.pAdaptFsm.commChan); meInstance != nil {
Girish Gowdrae0140f02021-02-02 16:55:09 -08001818 select {
1819 case meAttributes = <-mm.l2PmChan:
1820 logger.Debugw(ctx, "received ethernet uni history data metrics",
1821 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID})
Girish Gowdra0b235842021-03-09 13:06:46 -08001822 case <-time.After(time.Duration(mm.pDeviceHandler.pOpenOnuAc.omciTimeout) * time.Second):
Girish Gowdrae0140f02021-02-02 16:55:09 -08001823 logger.Errorw(ctx, "timeout waiting for omci-get response for ethernet uni history data",
1824 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID})
1825 // The metrics will be empty in this case
1826 return fmt.Errorf("timeout-during-l2-pm-collection-for-ethernet-uni-history-%v", mm.pDeviceHandler.deviceID)
1827 }
1828 // verify that interval end time has not changed during metric collection. If it changed, we abort the procedure
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001829 if valid := mm.updateAndValidateIntervalEndTime(ctx, entityID, meAttributes, intervalEndTime); !valid {
1830 return fmt.Errorf("interval-end-time-changed-during-metric-collection-for-ethernet-uni-history-%v", mm.pDeviceHandler.deviceID)
Girish Gowdrae0140f02021-02-02 16:55:09 -08001831 }
1832 }
1833 for k := range EthernetUniHistory {
1834 // populate ethPMUniHistData only if metric key not already present (or populated), since it is possible that we populate
1835 // the attributes in multiple iterations for a given L2 PM ME as there is a limit on the max OMCI GET payload size.
1836 if _, ok := ethPMUniHistData[k]; !ok {
1837 switch k {
Girish Gowdra0e533642021-03-02 22:02:51 -08001838 case "entity_id":
1839 if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
1840 ethPMUniHistData[k] = float32(val.(uint16))
1841 }
Girish Gowdrae0140f02021-02-02 16:55:09 -08001842 case "fcs_errors":
1843 if val, ok := meAttributes["FcsErrors"]; ok && val != nil {
1844 ethPMUniHistData[k] = float32(val.(uint32))
1845 }
1846 case "excessive_collision_counter":
1847 if val, ok := meAttributes["ExcessiveCollisionCounter"]; ok && val != nil {
1848 ethPMUniHistData[k] = float32(val.(uint32))
1849 }
1850 case "late_collision_counter":
1851 if val, ok := meAttributes["LateCollisionCounter"]; ok && val != nil {
1852 ethPMUniHistData[k] = float32(val.(uint32))
1853 }
1854 case "frames_too_long":
1855 if val, ok := meAttributes["FramesTooLong"]; ok && val != nil {
1856 ethPMUniHistData[k] = float32(val.(uint32))
1857 }
1858 case "buffer_overflows_on_rx":
1859 if val, ok := meAttributes["BufferOverflowsOnReceive"]; ok && val != nil {
1860 ethPMUniHistData[k] = float32(val.(uint32))
1861 }
1862 case "buffer_overflows_on_tx":
1863 if val, ok := meAttributes["BufferOverflowsOnTransmit"]; ok && val != nil {
1864 ethPMUniHistData[k] = float32(val.(uint32))
1865 }
1866 case "single_collision_frame_counter":
1867 if val, ok := meAttributes["SingleCollisionFrameCounter"]; ok && val != nil {
1868 ethPMUniHistData[k] = float32(val.(uint32))
1869 }
1870 case "multiple_collisions_frame_counter":
1871 if val, ok := meAttributes["MultipleCollisionsFrameCounter"]; ok && val != nil {
1872 ethPMUniHistData[k] = float32(val.(uint32))
1873 }
1874 case "sqe_counter":
1875 if val, ok := meAttributes["SqeCounter"]; ok && val != nil {
1876 ethPMUniHistData[k] = float32(val.(uint32))
1877 }
1878 case "deferred_tx_counter":
1879 if val, ok := meAttributes["DeferredTransmissionCounter"]; ok && val != nil {
1880 ethPMUniHistData[k] = float32(val.(uint32))
1881 }
1882 case "internal_mac_tx_error_counter":
1883 if val, ok := meAttributes["InternalMacTransmitErrorCounter"]; ok && val != nil {
1884 ethPMUniHistData[k] = float32(val.(uint32))
1885 }
1886 case "carrier_sense_error_counter":
1887 if val, ok := meAttributes["CarrierSenseErrorCounter"]; ok && val != nil {
1888 ethPMUniHistData[k] = float32(val.(uint32))
1889 }
1890 case "alignment_error_counter":
1891 if val, ok := meAttributes["AlignmentErrorCounter"]; ok && val != nil {
1892 ethPMUniHistData[k] = float32(val.(uint32))
1893 }
1894 case "internal_mac_rx_error_counter":
1895 if val, ok := meAttributes["InternalMacReceiveErrorCounter"]; ok && val != nil {
1896 ethPMUniHistData[k] = float32(val.(uint32))
1897 }
1898 default:
1899 // do nothing
1900 }
1901 }
1902 }
1903 return nil
1904}
1905
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001906// nolint: gocyclo
1907func (mm *onuMetricsManager) populateFecHistoryMetrics(ctx context.Context, classID me.ClassID, entityID uint16,
1908 meAttributes me.AttributeValueMap, requestedAttributes me.AttributeValueMap, fecHistData map[string]float32, intervalEndTime *int) error {
1909 // Make sure "IntervalEndTime" is part of the requested attributes as we need this to compare the get responses when get request is multipart
1910 if _, ok := requestedAttributes["IntervalEndTime"]; !ok {
1911 requestedAttributes["IntervalEndTime"] = 0
1912 }
Girish Gowdra0b235842021-03-09 13:06:46 -08001913 if meInstance := mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendGetMe(ctx, classID, entityID, requestedAttributes, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, mm.pAdaptFsm.commChan); meInstance != nil {
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001914 select {
1915 case meAttributes = <-mm.l2PmChan:
1916 logger.Debugw(ctx, "received fec history data metrics",
1917 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID})
Girish Gowdra0b235842021-03-09 13:06:46 -08001918 case <-time.After(time.Duration(mm.pDeviceHandler.pOpenOnuAc.omciTimeout) * time.Second):
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001919 logger.Errorw(ctx, "timeout waiting for omci-get response for fec history data",
1920 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID})
1921 // The metrics will be empty in this case
1922 return fmt.Errorf("timeout-during-l2-pm-collection-for-fec-history-%v", mm.pDeviceHandler.deviceID)
1923 }
1924 // verify that interval end time has not changed during metric collection. If it changed, we abort the procedure
1925 if valid := mm.updateAndValidateIntervalEndTime(ctx, entityID, meAttributes, intervalEndTime); !valid {
1926 return fmt.Errorf("interval-end-time-changed-during-metric-collection-for-fec-history-%v", mm.pDeviceHandler.deviceID)
1927 }
1928 }
1929 for k := range FecHistory {
1930 // populate fecHistData only if metric key not already present (or populated), since it is possible that we populate
1931 // the attributes in multiple iterations for a given L2 PM ME as there is a limit on the max OMCI GET payload size.
1932 if _, ok := fecHistData[k]; !ok {
1933 switch k {
Girish Gowdra0e533642021-03-02 22:02:51 -08001934 case "entity_id":
1935 if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
1936 fecHistData[k] = float32(val.(uint16))
1937 }
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001938 case "corrected_bytes":
1939 if val, ok := meAttributes["CorrectedBytes"]; ok && val != nil {
1940 fecHistData[k] = float32(val.(uint32))
1941 }
1942 case "corrected_code_words":
1943 if val, ok := meAttributes["CorrectedCodeWords"]; ok && val != nil {
1944 fecHistData[k] = float32(val.(uint32))
1945 }
1946 case "uncorrectable_code_words":
1947 if val, ok := meAttributes["UncorrectableCodeWords"]; ok && val != nil {
1948 fecHistData[k] = float32(val.(uint32))
1949 }
1950 case "total_code_words":
1951 if val, ok := meAttributes["TotalCodeWords"]; ok && val != nil {
1952 fecHistData[k] = float32(val.(uint32))
1953 }
1954 case "fec_seconds":
1955 if val, ok := meAttributes["FecSeconds"]; ok && val != nil {
1956 fecHistData[k] = float32(val.(uint16))
1957 }
1958 default:
1959 // do nothing
1960 }
1961 }
1962 }
1963 return nil
1964}
1965
1966// nolint: gocyclo
1967func (mm *onuMetricsManager) populateGemPortMetrics(ctx context.Context, classID me.ClassID, entityID uint16,
1968 meAttributes me.AttributeValueMap, requestedAttributes me.AttributeValueMap, gemPortHistData map[string]float32, intervalEndTime *int) error {
1969 // Make sure "IntervalEndTime" is part of the requested attributes as we need this to compare the get responses when get request is multipart
1970 if _, ok := requestedAttributes["IntervalEndTime"]; !ok {
1971 requestedAttributes["IntervalEndTime"] = 0
1972 }
Girish Gowdra0b235842021-03-09 13:06:46 -08001973 if meInstance := mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendGetMe(ctx, classID, entityID, requestedAttributes, mm.pDeviceHandler.pOpenOnuAc.omciTimeout, true, mm.pAdaptFsm.commChan); meInstance != nil {
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001974 select {
1975 case meAttributes = <-mm.l2PmChan:
1976 logger.Debugw(ctx, "received gem port history data metrics",
1977 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID})
Girish Gowdra0b235842021-03-09 13:06:46 -08001978 case <-time.After(time.Duration(mm.pDeviceHandler.pOpenOnuAc.omciTimeout) * time.Second):
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001979 logger.Errorw(ctx, "timeout waiting for omci-get response for gem port history data",
1980 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID})
1981 // The metrics will be empty in this case
1982 return fmt.Errorf("timeout-during-l2-pm-collection-for-gemport-history-%v", mm.pDeviceHandler.deviceID)
1983 }
1984 // verify that interval end time has not changed during metric collection. If it changed, we abort the procedure
1985 if valid := mm.updateAndValidateIntervalEndTime(ctx, entityID, meAttributes, intervalEndTime); !valid {
1986 return fmt.Errorf("interval-end-time-changed-during-metric-collection-for-gemport-history-%v", mm.pDeviceHandler.deviceID)
1987 }
1988 }
1989 for k := range GemPortHistory {
1990 // populate gemPortHistData only if metric key not already present (or populated), since it is possible that we populate
1991 // the attributes in multiple iterations for a given L2 PM ME as there is a limit on the max OMCI GET payload size.
1992 if _, ok := gemPortHistData[k]; !ok {
1993 switch k {
Girish Gowdra0e533642021-03-02 22:02:51 -08001994 case "entity_id":
1995 if val, ok := meAttributes["ManagedEntityId"]; ok && val != nil {
1996 gemPortHistData[k] = float32(val.(uint16))
1997 }
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08001998 case "transmitted_gem_frames":
1999 if val, ok := meAttributes["TransmittedGemFrames"]; ok && val != nil {
2000 gemPortHistData[k] = float32(val.(uint32))
2001 }
2002 case "received_gem_frames":
2003 if val, ok := meAttributes["ReceivedGemFrames"]; ok && val != nil {
2004 gemPortHistData[k] = float32(val.(uint32))
2005 }
2006 case "received_payload_bytes":
2007 if val, ok := meAttributes["ReceivedPayloadBytes"]; ok && val != nil {
2008 gemPortHistData[k] = float32(val.(uint64))
2009 }
2010 case "transmitted_payload_bytes":
2011 if val, ok := meAttributes["TransmittedPayloadBytes"]; ok && val != nil {
2012 gemPortHistData[k] = float32(val.(uint64))
2013 }
2014 case "encryption_key_errors":
2015 if val, ok := meAttributes["EncryptionKeyErrors"]; ok && val != nil {
2016 gemPortHistData[k] = float32(val.(uint32))
2017 }
2018 default:
2019 // do nothing
2020 }
2021 }
2022 }
2023 return nil
2024}
2025
Girish Gowdrae0140f02021-02-02 16:55:09 -08002026func (mm *onuMetricsManager) handleOmciCreateResponseMessage(ctx context.Context, msg OmciMessage) error {
2027 msgLayer := (*msg.OmciPacket).Layer(omci.LayerTypeCreateResponse)
2028 if msgLayer == nil {
2029 logger.Errorw(ctx, "omci Msg layer could not be detected for create response - handling stopped", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
2030 return fmt.Errorf("omci Msg layer could not be detected for create response - handling stopped: %s", mm.pDeviceHandler.deviceID)
2031 }
2032 msgObj, msgOk := msgLayer.(*omci.CreateResponse)
2033 if !msgOk {
2034 logger.Errorw(ctx, "omci Msg layer could not be assigned for create response - handling stopped", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
2035 return fmt.Errorf("omci Msg layer could not be assigned for delete response - handling stopped: %s", mm.pDeviceHandler.deviceID)
2036 }
2037 logger.Debugw(ctx, "OMCI create response Data", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "data-fields": msgObj})
2038 switch msgObj.EntityClass {
2039 case me.EthernetFramePerformanceMonitoringHistoryDataUpstreamClassID,
2040 me.EthernetFramePerformanceMonitoringHistoryDataDownstreamClassID,
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002041 me.EthernetPerformanceMonitoringHistoryDataClassID,
2042 me.FecPerformanceMonitoringHistoryDataClassID,
2043 me.GemPortNetworkCtpPerformanceMonitoringHistoryDataClassID:
Girish Gowdrae0140f02021-02-02 16:55:09 -08002044 // If the result is me.InstanceExists it means the entity was already created. It is ok handled that as success
2045 if msgObj.Result == me.Success || msgObj.Result == me.InstanceExists {
2046 mm.l2PmCreateOrDeleteResponseChan <- true
2047 } else {
2048 logger.Warnw(ctx, "failed to create me", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "class-id": msgObj.EntityClass})
2049 mm.l2PmCreateOrDeleteResponseChan <- false
2050 }
2051 return nil
2052 default:
2053 logger.Errorw(ctx, "unhandled omci create response message",
2054 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "class-id": msgObj.EntityClass})
2055 }
2056 return fmt.Errorf("unhandled-omci-create-response-message-%v", mm.pDeviceHandler.deviceID)
2057}
2058
2059func (mm *onuMetricsManager) handleOmciDeleteResponseMessage(ctx context.Context, msg OmciMessage) error {
2060 msgLayer := (*msg.OmciPacket).Layer(omci.LayerTypeDeleteResponse)
2061 if msgLayer == nil {
2062 logger.Errorw(ctx, "omci Msg layer could not be detected for delete response - handling stopped", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
2063 return fmt.Errorf("omci Msg layer could not be detected for create response - handling stopped: %s", mm.pDeviceHandler.deviceID)
2064 }
2065 msgObj, msgOk := msgLayer.(*omci.DeleteResponse)
2066 if !msgOk {
2067 logger.Errorw(ctx, "omci Msg layer could not be assigned for delete response - handling stopped", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
2068 return fmt.Errorf("omci Msg layer could not be assigned for delete response - handling stopped: %s", mm.pDeviceHandler.deviceID)
2069 }
2070 logger.Debugw(ctx, "OMCI delete response Data", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "data-fields": msgObj})
2071 switch msgObj.EntityClass {
2072 case me.EthernetFramePerformanceMonitoringHistoryDataUpstreamClassID,
2073 me.EthernetFramePerformanceMonitoringHistoryDataDownstreamClassID,
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002074 me.EthernetPerformanceMonitoringHistoryDataClassID,
2075 me.FecPerformanceMonitoringHistoryDataClassID,
2076 me.GemPortNetworkCtpPerformanceMonitoringHistoryDataClassID:
Girish Gowdrae0140f02021-02-02 16:55:09 -08002077 // If the result is me.UnknownInstance it means the entity was already deleted. It is ok handled that as success
2078 if msgObj.Result == me.Success || msgObj.Result == me.UnknownInstance {
2079 mm.l2PmCreateOrDeleteResponseChan <- true
2080 } else {
2081 logger.Warnw(ctx, "failed to delete me", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "class-id": msgObj.EntityClass})
2082 mm.l2PmCreateOrDeleteResponseChan <- false
2083 }
2084 return nil
2085 default:
2086 logger.Errorw(ctx, "unhandled omci delete response message",
2087 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "class-id": msgObj.EntityClass})
2088 }
2089 return fmt.Errorf("unhandled-omci-delete-response-message-%v", mm.pDeviceHandler.deviceID)
2090}
2091
2092func (mm *onuMetricsManager) generateTicks(ctx context.Context) {
Girish Gowdra7b0ee5c2021-03-19 21:48:15 -07002093 mm.updateTickGenerationStatus(true)
Girish Gowdrae0140f02021-02-02 16:55:09 -08002094 for {
2095 select {
2096 case <-time.After(L2PmCollectionInterval * time.Second):
2097 go func() {
2098 if err := mm.pAdaptFsm.pFsm.Event(l2PmEventTick); err != nil {
2099 logger.Errorw(ctx, "error calling event", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "err": err})
2100 }
2101 }()
2102 case <-mm.stopTicks:
2103 logger.Infow(ctx, "stopping ticks", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
Girish Gowdra7b0ee5c2021-03-19 21:48:15 -07002104 mm.updateTickGenerationStatus(false)
Girish Gowdrae0140f02021-02-02 16:55:09 -08002105 return
2106 }
2107 }
2108}
2109
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002110func (mm *onuMetricsManager) handleMetricsPublish(ctx context.Context, metricName string, metricInfoSlice []*voltha.MetricInformation) {
2111 // Publish metrics if it is valid
2112 if metricInfoSlice != nil {
2113 mm.publishMetrics(ctx, metricInfoSlice)
2114 } else {
2115 // If collectAttempts exceeds L2PmCollectAttempts then remove it from activeL2Pms
2116 // slice so that we do not collect data from that PM ME anymore
2117 mm.onuMetricsManagerLock.Lock()
2118 mm.groupMetricMap[metricName].collectAttempts++
2119 if mm.groupMetricMap[metricName].collectAttempts > L2PmCollectAttempts {
2120 mm.activeL2Pms = mm.removeIfFoundString(mm.activeL2Pms, metricName)
2121 }
2122 logger.Warnw(ctx, "state collect data - no metrics collected",
2123 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "metricName": metricName, "collectAttempts": mm.groupMetricMap[metricName].collectAttempts})
2124 mm.onuMetricsManagerLock.Unlock()
2125 }
2126}
2127
2128func (mm *onuMetricsManager) populateGroupSpecificMetrics(ctx context.Context, mEnt *me.ManagedEntity, classID me.ClassID, entityID uint16,
2129 meAttributes me.AttributeValueMap, data map[string]float32, intervalEndTime *int) error {
2130 var grpFunc groupMetricPopulateFunc
2131 switch classID {
2132 case me.EthernetFramePerformanceMonitoringHistoryDataUpstreamClassID, me.EthernetFramePerformanceMonitoringHistoryDataDownstreamClassID:
2133 grpFunc = mm.populateEthernetBridgeHistoryMetrics
2134 case me.EthernetPerformanceMonitoringHistoryDataClassID:
2135 grpFunc = mm.populateEthernetUniHistoryMetrics
2136 case me.FecPerformanceMonitoringHistoryDataClassID:
2137 grpFunc = mm.populateFecHistoryMetrics
2138 case me.GemPortNetworkCtpPerformanceMonitoringHistoryDataClassID:
2139 grpFunc = mm.populateGemPortMetrics
2140 default:
2141 return fmt.Errorf("unknown-classid-%v", classID)
2142 }
2143
2144 size := 0
2145 requestedAttributes := make(me.AttributeValueMap)
2146 for _, v := range mEnt.GetAttributeDefinitions() {
2147 if (v.Size + size) <= MaxL2PMGetPayLoadSize {
2148 requestedAttributes[v.Name] = v.DefValue
2149 size = v.Size + size
2150 } else { // We exceeded the allow omci get size
2151 // Let's collect the attributes via get now and collect remaining in the next iteration
2152 if err := grpFunc(ctx, classID, entityID, meAttributes, requestedAttributes, data, intervalEndTime); err != nil {
2153 logger.Errorw(ctx, "error during metric collection",
2154 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID, "err": err})
2155 return err
2156 }
2157 size = 0 // reset size
2158 requestedAttributes = make(me.AttributeValueMap) // reset map
2159 }
2160 }
2161 // Collect the omci get attributes for the last bunch of attributes.
2162 if len(requestedAttributes) > 0 {
2163 if err := grpFunc(ctx, classID, entityID, meAttributes, requestedAttributes, data, intervalEndTime); err != nil {
2164 logger.Errorw(ctx, "error during metric collection",
2165 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID, "err": err})
2166 return err
2167 }
2168 }
2169 return nil
2170}
2171
2172func (mm *onuMetricsManager) populateOnuMetricInfo(title string, data map[string]float32) voltha.MetricInformation {
2173 metricsContext := make(map[string]string)
2174 metricsContext["onuID"] = fmt.Sprintf("%d", mm.pDeviceHandler.device.ProxyAddress.OnuId)
2175 metricsContext["intfID"] = fmt.Sprintf("%d", mm.pDeviceHandler.device.ProxyAddress.ChannelId)
2176 metricsContext["devicetype"] = mm.pDeviceHandler.DeviceType
2177
2178 raisedTs := time.Now().Unix()
2179 mmd := voltha.MetricMetaData{
2180 Title: title,
2181 Ts: float64(raisedTs),
2182 Context: metricsContext,
2183 DeviceId: mm.pDeviceHandler.deviceID,
2184 LogicalDeviceId: mm.pDeviceHandler.logicalDeviceID,
2185 SerialNo: mm.pDeviceHandler.device.SerialNumber,
2186 }
2187
2188 // create slice of metrics given that there could be more than one VEIP instance
2189 metricInfo := voltha.MetricInformation{Metadata: &mmd, Metrics: data}
2190 return metricInfo
2191}
2192
2193func (mm *onuMetricsManager) updateAndValidateIntervalEndTime(ctx context.Context, entityID uint16, meAttributes me.AttributeValueMap, intervalEndTime *int) bool {
2194 valid := false
2195 if *intervalEndTime == -1 { // first time
2196 // Update the interval end time
2197 if val, ok := meAttributes["IntervalEndTime"]; ok && val != nil {
2198 *intervalEndTime = int(meAttributes["IntervalEndTime"].(uint8))
2199 valid = true
2200 }
2201 } else {
2202 var currIntervalEndTime int
2203 if val, ok := meAttributes["IntervalEndTime"]; ok && val != nil {
2204 currIntervalEndTime = int(meAttributes["IntervalEndTime"].(uint8))
2205 }
2206 if currIntervalEndTime != *intervalEndTime { // interval end time changed during metric collection
2207 logger.Errorw(ctx, "interval end time changed during metrics collection for ethernet pm history data",
2208 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "entityID": entityID,
2209 "currIntervalEndTime": *intervalEndTime, "newIntervalEndTime": currIntervalEndTime})
2210 } else {
2211 valid = true
2212 }
2213 }
2214 return valid
2215}
2216
2217func (mm *onuMetricsManager) waitForResponseOrTimeout(ctx context.Context, create bool, instID uint16, meClassName string) bool {
2218 logger.Debugw(ctx, "waitForResponseOrTimeout", log.Fields{"create": create, "instID": instID, "meClassName": meClassName})
2219 select {
2220 case resp := <-mm.l2PmCreateOrDeleteResponseChan:
Girish Gowdra36ccf7d2021-02-25 20:42:51 -08002221 logger.Debugw(ctx, "received l2 pm me response",
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002222 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "resp": resp, "create": create, "meClassName": meClassName, "instID": instID})
2223 return resp
Girish Gowdra0b235842021-03-09 13:06:46 -08002224 case <-time.After(time.Duration(mm.pDeviceHandler.pOpenOnuAc.omciTimeout) * time.Second):
Girish Gowdra36ccf7d2021-02-25 20:42:51 -08002225 logger.Errorw(ctx, "timeout waiting for l2 pm me response",
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002226 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "resp": false, "create": create, "meClassName": meClassName, "instID": instID})
2227 }
2228 return false
2229}
2230
2231func (mm *onuMetricsManager) initializeGroupMetric(grpMtrcs map[string]voltha.PmConfig_PmType, grpName string, grpEnabled bool, grpFreq uint32) {
2232 var pmConfigSlice []*voltha.PmConfig
2233 for k, v := range grpMtrcs {
Girish Gowdra0e533642021-03-02 22:02:51 -08002234 pmConfigSlice = append(pmConfigSlice,
2235 &voltha.PmConfig{
2236 Name: k,
2237 Type: v,
2238 Enabled: grpEnabled && mm.pDeviceHandler.pOpenOnuAc.metricsEnabled,
2239 SampleFreq: grpFreq})
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002240 }
2241 groupMetric := voltha.PmGroupConfig{
2242 GroupName: grpName,
2243 Enabled: grpEnabled && mm.pDeviceHandler.pOpenOnuAc.metricsEnabled,
2244 GroupFreq: grpFreq,
2245 Metrics: pmConfigSlice,
2246 }
2247 mm.pDeviceHandler.pmConfigs.Groups = append(mm.pDeviceHandler.pmConfigs.Groups, &groupMetric)
2248
2249}
2250
2251func (mm *onuMetricsManager) initializeL2PmFsm(ctx context.Context, aCommChannel chan Message) error {
2252 mm.pAdaptFsm = NewAdapterFsm("L2PmFSM", mm.pDeviceHandler.deviceID, aCommChannel)
2253 if mm.pAdaptFsm == nil {
2254 logger.Errorw(ctx, "L2PMFsm AdapterFsm could not be instantiated!!", log.Fields{
2255 "device-id": mm.pDeviceHandler.deviceID})
2256 return fmt.Errorf("nil-adapter-fsm")
2257 }
2258 // L2 PM FSM related state machine
2259 mm.pAdaptFsm.pFsm = fsm.NewFSM(
2260 l2PmStNull,
2261 fsm.Events{
2262 {Name: l2PmEventInit, Src: []string{l2PmStNull}, Dst: l2PmStStarting},
2263 {Name: l2PmEventTick, Src: []string{l2PmStStarting}, Dst: l2PmStSyncTime},
2264 {Name: l2PmEventTick, Src: []string{l2PmStIdle, l2PmEventDeleteMe, l2PmEventAddMe}, Dst: l2PmStCollectData},
2265 {Name: l2PmEventSuccess, Src: []string{l2PmStSyncTime, l2PmStCreatePmMe, l2PmStDeletePmMe, l2PmStCollectData}, Dst: l2PmStIdle},
2266 {Name: l2PmEventFailure, Src: []string{l2PmStCreatePmMe, l2PmStDeletePmMe, l2PmStCollectData}, Dst: l2PmStIdle},
2267 {Name: l2PmEventFailure, Src: []string{l2PmStSyncTime}, Dst: l2PmStSyncTime},
2268 {Name: l2PmEventAddMe, Src: []string{l2PmStIdle}, Dst: l2PmStCreatePmMe},
2269 {Name: l2PmEventDeleteMe, Src: []string{l2PmStIdle}, Dst: l2PmStDeletePmMe},
2270 {Name: l2PmEventStop, Src: []string{l2PmStNull, l2PmStStarting, l2PmStSyncTime, l2PmStIdle, l2PmStCreatePmMe, l2PmStDeletePmMe, l2PmStCollectData}, Dst: l2PmStNull},
2271 },
2272 fsm.Callbacks{
2273 "enter_state": func(e *fsm.Event) { mm.pAdaptFsm.logFsmStateChange(ctx, e) },
2274 "enter_" + l2PmStNull: func(e *fsm.Event) { mm.l2PMFsmNull(ctx, e) },
2275 "enter_" + l2PmStIdle: func(e *fsm.Event) { mm.l2PMFsmIdle(ctx, e) },
2276 "enter_" + l2PmStStarting: func(e *fsm.Event) { mm.l2PMFsmStarting(ctx, e) },
2277 "enter_" + l2PmStSyncTime: func(e *fsm.Event) { mm.l2PMFsmSyncTime(ctx, e) },
2278 "enter_" + l2PmStCollectData: func(e *fsm.Event) { mm.l2PmFsmCollectData(ctx, e) },
2279 "enter_" + l2PmStCreatePmMe: func(e *fsm.Event) { mm.l2PmFsmCreatePM(ctx, e) },
2280 "enter_" + l2PmStDeletePmMe: func(e *fsm.Event) { mm.l2PmFsmDeletePM(ctx, e) },
2281 },
2282 )
2283 return nil
2284}
2285
2286func (mm *onuMetricsManager) initializeAllGroupMetrics() {
2287 mm.pDeviceHandler.pmConfigs = &voltha.PmConfigs{}
2288 mm.pDeviceHandler.pmConfigs.Id = mm.pDeviceHandler.deviceID
2289 mm.pDeviceHandler.pmConfigs.DefaultFreq = DefaultMetricCollectionFrequency
2290 mm.pDeviceHandler.pmConfigs.Grouped = GroupMetricEnabled
2291 mm.pDeviceHandler.pmConfigs.FreqOverride = DefaultFrequencyOverrideEnabled
2292
2293 // Populate group metrics.
2294 // Lets populate irrespective of GroupMetricEnabled is true or not.
2295 // The group metrics collection will decided on this flag later
2296
2297 mm.initializeGroupMetric(OpticalPowerGroupMetrics, OpticalPowerGroupMetricName,
2298 OpticalPowerGroupMetricEnabled, OpticalPowerMetricGroupCollectionFrequency)
2299
2300 mm.initializeGroupMetric(UniStatusGroupMetrics, UniStatusGroupMetricName,
2301 UniStatusGroupMetricEnabled, UniStatusMetricGroupCollectionFrequency)
2302
2303 // classical l2 pm counter start
2304
2305 mm.initializeGroupMetric(EthernetBridgeHistory, EthernetBridgeHistoryName,
2306 EthernetBridgeHistoryEnabled, EthernetBridgeHistoryFrequency)
2307
2308 mm.initializeGroupMetric(EthernetUniHistory, EthernetUniHistoryName,
2309 EthernetUniHistoryEnabled, EthernetUniHistoryFrequency)
2310
2311 mm.initializeGroupMetric(FecHistory, FecHistoryName,
2312 FecHistoryEnabled, FecHistoryFrequency)
2313
2314 mm.initializeGroupMetric(GemPortHistory, GemPortHistoryName,
2315 GemPortHistoryEnabled, GemPortHistoryFrequency)
2316
2317 // classical l2 pm counter end
2318
2319 // Add standalone metric (if present) after this (will be added to dh.pmConfigs.Metrics)
2320}
2321
2322func (mm *onuMetricsManager) populateLocalGroupMetricData(ctx context.Context) {
2323 // Populate local group metric structures
2324 for _, g := range mm.pDeviceHandler.pmConfigs.Groups {
2325 mm.groupMetricMap[g.GroupName] = &groupMetric{
2326 groupName: g.GroupName,
2327 enabled: g.Enabled,
2328 frequency: g.GroupFreq,
2329 }
2330 switch g.GroupName {
2331 case OpticalPowerGroupMetricName:
2332 mm.groupMetricMap[g.GroupName].metricMap = OpticalPowerGroupMetrics
2333 case UniStatusGroupMetricName:
2334 mm.groupMetricMap[g.GroupName].metricMap = UniStatusGroupMetrics
2335 case EthernetBridgeHistoryName:
2336 mm.groupMetricMap[g.GroupName].metricMap = EthernetBridgeHistory
2337 mm.groupMetricMap[g.GroupName].isL2PMCounter = true
2338 case EthernetUniHistoryName:
2339 mm.groupMetricMap[g.GroupName].metricMap = EthernetUniHistory
2340 mm.groupMetricMap[g.GroupName].isL2PMCounter = true
2341 case FecHistoryName:
2342 mm.groupMetricMap[g.GroupName].metricMap = FecHistory
2343 mm.groupMetricMap[g.GroupName].isL2PMCounter = true
2344 case GemPortHistoryName:
2345 mm.groupMetricMap[g.GroupName].metricMap = GemPortHistory
2346 mm.groupMetricMap[g.GroupName].isL2PMCounter = true
2347 default:
2348 logger.Errorw(ctx, "unhandled-group-name", log.Fields{"groupName": g.GroupName})
2349 }
2350 }
2351
2352 // Populate local standalone metric structures
2353 for _, m := range mm.pDeviceHandler.pmConfigs.Metrics {
2354 mm.standaloneMetricMap[m.Name] = &standaloneMetric{
2355 metricName: m.Name,
2356 enabled: m.Enabled,
2357 frequency: m.SampleFreq,
2358 }
2359 switch m.Name {
2360 // None exist as of now. Add when available.
2361 default:
2362 logger.Errorw(ctx, "unhandled-metric-name", log.Fields{"metricName": m.Name})
2363 }
2364 }
2365}
2366
2367func (mm *onuMetricsManager) AddGemPortForPerfMonitoring(gemPortNTPInstID uint16) {
2368 mm.onuMetricsManagerLock.Lock()
2369 defer mm.onuMetricsManagerLock.Unlock()
2370 // mark the instance for addition
Girish Gowdra0e533642021-03-02 22:02:51 -08002371 mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd = mm.appendIfMissingUnt16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd, gemPortNTPInstID)
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002372 // If the instance presence toggles too soon, we need to remove it from gemPortNCTPPerfHistInstToDelete slice
Girish Gowdra0e533642021-03-02 22:02:51 -08002373 mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete = mm.removeIfFoundUint16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete, gemPortNTPInstID)
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002374
2375 mm.l2PmToAdd = mm.appendIfMissingString(mm.l2PmToAdd, GemPortHistoryName)
2376 // We do not need to remove from l2PmToDelete slice as we could have Add and Delete of
2377 // GemPortPerfHistory ME simultaneously for different instances of the ME.
2378 // The creation or deletion of an instance is decided based on its presence in gemPortNCTPPerfHistInstToDelete or
2379 // gemPortNCTPPerfHistInstToAdd slice
2380}
2381
2382func (mm *onuMetricsManager) RemoveGemPortForPerfMonitoring(gemPortNTPInstID uint16) {
2383 mm.onuMetricsManagerLock.Lock()
2384 defer mm.onuMetricsManagerLock.Unlock()
Girish Gowdra0e533642021-03-02 22:02:51 -08002385 mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete = mm.appendIfMissingUnt16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete, gemPortNTPInstID)
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002386 // If the instance presence toggles too soon, we need to remove it from gemPortNCTPPerfHistInstToAdd slice
Girish Gowdra0e533642021-03-02 22:02:51 -08002387 mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd = mm.removeIfFoundUint16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd, gemPortNTPInstID)
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002388
2389 mm.l2PmToDelete = mm.appendIfMissingString(mm.l2PmToDelete, GemPortHistoryName)
2390 // We do not need to remove from l2PmToAdd slice as we could have Add and Delete of
2391 // GemPortPerfHistory ME simultaneously for different instances of the ME.
2392 // The creation or deletion of an instance is decided based on its presence in gemPortNCTPPerfHistInstToDelete or
2393 // gemPortNCTPPerfHistInstToAdd slice
2394}
2395
Girish Gowdra36ccf7d2021-02-25 20:42:51 -08002396func (mm *onuMetricsManager) updateGemPortNTPInstanceToAddForPerfMonitoring(ctx context.Context) {
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002397 if mm.pDeviceHandler.pOnuTP != nil {
2398 gemPortInstIDs := mm.pDeviceHandler.pOnuTP.GetAllBidirectionalGemPortIDsForOnu()
Girish Gowdra36ccf7d2021-02-25 20:42:51 -08002399 // NOTE: It is expected that caller of this function has acquired the required mutex for synchronization purposes
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002400 for _, v := range gemPortInstIDs {
2401 // mark the instance for addition
Girish Gowdra0e533642021-03-02 22:02:51 -08002402 mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd = mm.appendIfMissingUnt16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd, v)
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002403 // If the instance presence toggles too soon, we need to remove it from gemPortNCTPPerfHistInstToDelete slice
Girish Gowdra0e533642021-03-02 22:02:51 -08002404 mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete = mm.removeIfFoundUint16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete, v)
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002405 }
Girish Gowdra36ccf7d2021-02-25 20:42:51 -08002406 logger.Debugw(ctx, "updateGemPortNTPInstanceToAddForPerfMonitoring",
Girish Gowdra0e533642021-03-02 22:02:51 -08002407 log.Fields{"deviceID": mm.pDeviceHandler.deviceID, "gemToAdd": mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd, "gemToDel": mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete})
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002408 }
2409}
2410
Girish Gowdra36ccf7d2021-02-25 20:42:51 -08002411func (mm *onuMetricsManager) updateGemPortNTPInstanceToDeleteForPerfMonitoring(ctx context.Context) {
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002412 if mm.pDeviceHandler.pOnuTP != nil {
2413 gemPortInstIDs := mm.pDeviceHandler.pOnuTP.GetAllBidirectionalGemPortIDsForOnu()
Girish Gowdra36ccf7d2021-02-25 20:42:51 -08002414 // NOTE: It is expected that caller of this function has acquired the required mutex for synchronization purposes
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002415 for _, v := range gemPortInstIDs {
Girish Gowdra0e533642021-03-02 22:02:51 -08002416 mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete = mm.appendIfMissingUnt16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete, v)
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002417 // If the instance presence toggles too soon, we need to remove it from gemPortNCTPPerfHistInstToAdd slice
Girish Gowdra0e533642021-03-02 22:02:51 -08002418 mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd = mm.removeIfFoundUint16(mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd, v)
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002419 }
2420 }
Girish Gowdra36ccf7d2021-02-25 20:42:51 -08002421 logger.Debugw(ctx, "updateGemPortNTPInstanceToDeleteForPerfMonitoring",
Girish Gowdra0e533642021-03-02 22:02:51 -08002422 log.Fields{"deviceID": mm.pDeviceHandler.deviceID, "gemToAdd": mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToAdd, "gemToDel": mm.groupMetricMap[GemPortHistoryName].pmMEData.InstancesToDelete})
2423}
2424
2425// restorePmData restores any PM data available on the KV store to local cache
2426func (mm *onuMetricsManager) restorePmData(ctx context.Context) error {
2427 logger.Debugw(ctx, "restorePmData - start", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
2428 if mm.pmKvStore == nil {
2429 logger.Errorw(ctx, "pmKvStore not set - abort", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
2430 return fmt.Errorf(fmt.Sprintf("pmKvStore-not-set-abort-%s", mm.pDeviceHandler.deviceID))
2431 }
2432 var errorsList []error
2433 for groupName, group := range mm.groupMetricMap {
2434 group.pmMEData = &pmMEData{}
2435 Value, err := mm.pmKvStore.Get(ctx, groupName)
2436 if err == nil {
2437 if Value != nil {
2438 logger.Debugw(ctx, "PM data read",
2439 log.Fields{"Key": Value.Key, "device-id": mm.pDeviceHandler.deviceID})
2440 tmpBytes, _ := kvstore.ToByte(Value.Value)
2441
2442 if err = json.Unmarshal(tmpBytes, &group.pmMEData); err != nil {
2443 logger.Errorw(ctx, "unable to unmarshal PM data", log.Fields{"error": err, "device-id": mm.pDeviceHandler.deviceID})
2444 errorsList = append(errorsList, fmt.Errorf(fmt.Sprintf("unable-to-unmarshal-PM-data-%s-for-group-%s", mm.pDeviceHandler.deviceID, groupName)))
2445 continue
2446 }
2447 logger.Debugw(ctx, "restorePmData - success", log.Fields{"pmData": group.pmMEData, "groupName": groupName, "device-id": mm.pDeviceHandler.deviceID})
2448 } else {
2449 logger.Debugw(ctx, "no PM data found", log.Fields{"groupName": groupName, "device-id": mm.pDeviceHandler.deviceID})
2450 continue
2451 }
2452 } else {
2453 logger.Errorw(ctx, "restorePmData - fail", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": groupName, "err": err})
2454 errorsList = append(errorsList, fmt.Errorf(fmt.Sprintf("unable-to-read-from-KVstore-%s-for-group-%s", mm.pDeviceHandler.deviceID, groupName)))
2455 continue
2456 }
2457 }
2458 if len(errorsList) > 0 {
2459 return fmt.Errorf("errors-restoring-pm-data-for-one-or-more-groups--errors:%v", errorsList)
2460 }
2461 logger.Debugw(ctx, "restorePmData - complete success", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
2462 return nil
2463}
2464
2465// getPmData gets pmMEData from cache. Since we have write through cache implementation for pmMEData,
2466// the data must be available in cache.
2467// Note, it is expected that caller of this function manages the required synchronization (like using locks etc.).
2468func (mm *onuMetricsManager) getPmData(ctx context.Context, groupName string) (*pmMEData, error) {
2469 if grp, ok := mm.groupMetricMap[groupName]; ok {
2470 return grp.pmMEData, nil
2471 }
2472 // Data not in cache, try to fetch from kv store.
2473 data := &pmMEData{}
2474 if mm.pmKvStore == nil {
2475 logger.Errorw(ctx, "pmKvStore not set - abort", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
2476 return data, fmt.Errorf("pmKvStore not set. device-id - %s", mm.pDeviceHandler.deviceID)
2477 }
2478 Value, err := mm.pmKvStore.Get(ctx, groupName)
2479 if err == nil {
2480 if Value != nil {
2481 logger.Debugw(ctx, "PM data read",
2482 log.Fields{"Key": Value.Key, "device-id": mm.pDeviceHandler.deviceID})
2483 tmpBytes, _ := kvstore.ToByte(Value.Value)
2484
2485 if err = json.Unmarshal(tmpBytes, data); err != nil {
2486 logger.Errorw(ctx, "unable to unmarshal PM data", log.Fields{"error": err, "device-id": mm.pDeviceHandler.deviceID})
2487 return data, err
2488 }
2489 logger.Debugw(ctx, "PM data", log.Fields{"pmData": data, "groupName": groupName, "device-id": mm.pDeviceHandler.deviceID})
2490 } else {
2491 logger.Debugw(ctx, "no PM data found", log.Fields{"groupName": groupName, "device-id": mm.pDeviceHandler.deviceID})
2492 return data, err
2493 }
2494 } else {
2495 logger.Errorw(ctx, "unable to read from KVstore", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
2496 return data, err
2497 }
2498
2499 return data, nil
2500}
2501
2502// updatePmData update pmMEData to store. It is write through cache, i.e., write to cache first and then update store
2503func (mm *onuMetricsManager) updatePmData(ctx context.Context, groupName string, meInstanceID uint16, pmAction string) error {
2504 logger.Debugw(ctx, "updatePmData - start", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": groupName, "entityID": meInstanceID, "pmAction": pmAction})
2505 mm.onuMetricsManagerLock.Lock()
2506 defer mm.onuMetricsManagerLock.Unlock()
2507
2508 if mm.pmKvStore == nil {
2509 logger.Errorw(ctx, "pmKvStore not set - abort", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
2510 return fmt.Errorf(fmt.Sprintf("pmKvStore-not-set-abort-%s", mm.pDeviceHandler.deviceID))
2511 }
2512
2513 pmMEData, err := mm.getPmData(ctx, groupName)
2514 if err != nil || pmMEData == nil {
2515 // error already logged in called function.
2516 return err
2517 }
2518 switch pmAction {
2519 case cPmAdd:
2520 pmMEData.InstancesToAdd = mm.appendIfMissingUnt16(pmMEData.InstancesToAdd, meInstanceID)
2521 pmMEData.InstancesToDelete = mm.removeIfFoundUint16(pmMEData.InstancesToDelete, meInstanceID)
2522 pmMEData.InstancesActive = mm.removeIfFoundUint16(pmMEData.InstancesActive, meInstanceID)
2523 case cPmAdded:
2524 pmMEData.InstancesActive = mm.appendIfMissingUnt16(pmMEData.InstancesActive, meInstanceID)
2525 pmMEData.InstancesToAdd = mm.removeIfFoundUint16(pmMEData.InstancesToAdd, meInstanceID)
2526 pmMEData.InstancesToDelete = mm.removeIfFoundUint16(pmMEData.InstancesToDelete, meInstanceID)
2527 case cPmRemove:
2528 pmMEData.InstancesToDelete = mm.appendIfMissingUnt16(pmMEData.InstancesToDelete, meInstanceID)
2529 pmMEData.InstancesToAdd = mm.removeIfFoundUint16(pmMEData.InstancesToAdd, meInstanceID)
2530 pmMEData.InstancesActive = mm.removeIfFoundUint16(pmMEData.InstancesActive, meInstanceID)
2531 case cPmRemoved:
2532 pmMEData.InstancesToDelete = mm.removeIfFoundUint16(pmMEData.InstancesToDelete, meInstanceID)
2533 pmMEData.InstancesToAdd = mm.removeIfFoundUint16(pmMEData.InstancesToAdd, meInstanceID)
2534 pmMEData.InstancesActive = mm.removeIfFoundUint16(pmMEData.InstancesActive, meInstanceID)
2535 default:
2536 logger.Errorw(ctx, "unknown pm action", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "pmAction": pmAction, "groupName": groupName})
2537 return fmt.Errorf(fmt.Sprintf("unknown-pm-action-deviceid-%s-groupName-%s-pmaction-%s", mm.pDeviceHandler.deviceID, groupName, pmAction))
2538 }
2539 // write through cache
2540 mm.groupMetricMap[groupName].pmMEData = pmMEData
2541
2542 Value, err := json.Marshal(*pmMEData)
2543 if err != nil {
2544 logger.Errorw(ctx, "unable to marshal PM data", log.Fields{"groupName": groupName, "pmAction": pmAction, "pmData": *pmMEData, "err": err})
2545 return err
2546 }
2547 // Update back to kv store
2548 if err = mm.pmKvStore.Put(ctx, groupName, Value); err != nil {
2549 logger.Errorw(ctx, "unable to put PM data to kv store", log.Fields{"groupName": groupName, "pmData": *pmMEData, "pmAction": pmAction, "err": err})
2550 return err
2551 }
2552 logger.Debugw(ctx, "updatePmData - success", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": groupName, "pmData": *pmMEData, "pmAction": pmAction})
2553
2554 return nil
2555}
2556
2557// clearPmGroupData cleans PM Group data from store
2558func (mm *onuMetricsManager) clearPmGroupData(ctx context.Context) error {
2559 mm.onuMetricsManagerLock.Lock()
2560 defer mm.onuMetricsManagerLock.Unlock()
2561 logger.Debugw(ctx, "clearPmGroupData - start", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
2562 if mm.pmKvStore == nil {
2563 logger.Errorw(ctx, "pmKvStore not set - abort", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
2564 return fmt.Errorf(fmt.Sprintf("pmKvStore-not-set-abort-%s", mm.pDeviceHandler.deviceID))
2565 }
2566
2567 for n := range mm.groupMetricMap {
2568 if err := mm.pmKvStore.Delete(ctx, n); err != nil {
2569 logger.Errorw(ctx, "clearPmGroupData - fail", log.Fields{"deviceID": mm.pDeviceHandler.deviceID, "groupName": n, "err": err})
2570 // do not abort this procedure. continue to delete next group.
2571 } else {
2572 logger.Debugw(ctx, "clearPmGroupData - success", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": n})
2573 }
2574 }
2575
2576 return nil
2577}
2578
2579// clearAllPmData clears all PM data associated with the device from KV store
2580func (mm *onuMetricsManager) clearAllPmData(ctx context.Context) error {
2581 mm.onuMetricsManagerLock.Lock()
2582 defer mm.onuMetricsManagerLock.Unlock()
2583 logger.Debugw(ctx, "clearAllPmData - start", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
2584 if mm.pmKvStore == nil {
2585 logger.Errorw(ctx, "pmKvStore not set - abort", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
2586 return fmt.Errorf(fmt.Sprintf("pmKvStore-not-set-abort-%s", mm.pDeviceHandler.deviceID))
2587 }
Holger Hildebrandt44a0d4f2021-03-18 14:00:54 +00002588 var value error
2589 for n := range mm.groupMetricMap {
2590 if err := mm.pmKvStore.Delete(ctx, n); err != nil {
2591 logger.Errorw(ctx, "clearPmGroupData - fail", log.Fields{"deviceID": mm.pDeviceHandler.deviceID, "groupName": n, "err": err})
2592 value = err
2593 // do not abort this procedure - continue to delete next group.
2594 } else {
2595 logger.Debugw(ctx, "clearPmGroupData - success", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "groupName": n})
2596 }
Girish Gowdra0e533642021-03-02 22:02:51 -08002597 }
Holger Hildebrandt44a0d4f2021-03-18 14:00:54 +00002598 if value == nil {
2599 logger.Debugw(ctx, "clearAllPmData - success", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
2600 }
2601 return value
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002602}
2603
Girish Gowdra7b0ee5c2021-03-19 21:48:15 -07002604func (mm *onuMetricsManager) updateOmciProcessingStatus(status bool) {
2605 mm.onuMetricsManagerLock.Lock()
2606 defer mm.onuMetricsManagerLock.Unlock()
2607 mm.omciProcessingActive = status
2608}
2609
2610func (mm *onuMetricsManager) updateTickGenerationStatus(status bool) {
2611 mm.onuMetricsManagerLock.Lock()
2612 defer mm.onuMetricsManagerLock.Unlock()
2613 mm.tickGenerationActive = status
2614}
2615
2616func (mm *onuMetricsManager) getOmciProcessingStatus() bool {
2617 mm.onuMetricsManagerLock.RLock()
2618 defer mm.onuMetricsManagerLock.RUnlock()
2619 return mm.omciProcessingActive
2620}
2621
2622func (mm *onuMetricsManager) getTickGenerationStatus() bool {
2623 mm.onuMetricsManagerLock.RLock()
2624 defer mm.onuMetricsManagerLock.RUnlock()
2625 return mm.tickGenerationActive
2626}
2627
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002628func (mm *onuMetricsManager) appendIfMissingString(slice []string, n string) []string {
Girish Gowdrae0140f02021-02-02 16:55:09 -08002629 for _, ele := range slice {
2630 if ele == n {
2631 return slice
2632 }
2633 }
2634 return append(slice, n)
2635}
2636
Girish Gowdra5c5aaf42021-02-17 19:40:50 -08002637func (mm *onuMetricsManager) removeIfFoundString(slice []string, n string) []string {
2638 for i, ele := range slice {
2639 if ele == n {
2640 return append(slice[:i], slice[i+1:]...)
2641 }
2642 }
2643 return slice
2644}
2645
2646func (mm *onuMetricsManager) appendIfMissingUnt16(slice []uint16, n uint16) []uint16 {
2647 for _, ele := range slice {
2648 if ele == n {
2649 return slice
2650 }
2651 }
2652 return append(slice, n)
2653}
2654
2655func (mm *onuMetricsManager) removeIfFoundUint16(slice []uint16, n uint16) []uint16 {
Girish Gowdrae0140f02021-02-02 16:55:09 -08002656 for i, ele := range slice {
2657 if ele == n {
2658 return append(slice[:i], slice[i+1:]...)
2659 }
2660 }
2661 return slice
Girish Gowdrae09a6202021-01-12 18:10:59 -08002662}
Girish Gowdrae20a4f62021-03-09 16:06:23 -08002663
2664func (mm *onuMetricsManager) twosComplementToSignedInt16(val uint16) int16 {
2665 var uint16MsbMask uint16 = 0x8000
2666 if val&uint16MsbMask == uint16MsbMask {
2667 return int16(^val+1) * -1
2668 }
2669
2670 return int16(val)
2671}
2672
2673/* // These are need in the future
2674
2675func (mm *onuMetricsManager) twosComplementToSignedInt32(val uint32) int32 {
2676 var uint32MsbMask uint32 = 0x80000000
2677 if val & uint32MsbMask == uint32MsbMask {
2678 return int32(^val + 1) * -1
2679 }
2680
2681 return int32(val)
2682}
2683
2684func (mm *onuMetricsManager) twosComplementToSignedInt64(val uint64) int64 {
2685 var uint64MsbMask uint64 = 0x8000000000000000
2686 if val & uint64MsbMask == uint64MsbMask {
2687 return int64(^val + 1) * -1
2688 }
2689
2690 return int64(val)
2691}
2692
2693*/