blob: 3e432753e62e77550fb34443ff10b042387b9f8a [file] [log] [blame]
Girish Gowdrae09a6202021-01-12 18:10:59 -08001/*
2 * Copyright 2021-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//Package adaptercoreonu provides the utility for onu devices, flows and statistics
18package adaptercoreonu
19
20import (
21 "context"
22 "errors"
23 "fmt"
24 "github.com/opencord/omci-lib-go"
25 me "github.com/opencord/omci-lib-go/generated"
26 "github.com/opencord/voltha-lib-go/v4/pkg/log"
27 "github.com/opencord/voltha-protos/v4/go/voltha"
28 "time"
29)
30
31type onuMetricsManager struct {
32 pDeviceHandler *deviceHandler
33
34 commMetricsChan chan Message
35 opticalMetricsChan chan me.AttributeValueMap
36 uniStatusMetricsChan chan me.AttributeValueMap
37
38 stopProcessingOmciResponses chan bool
39}
40
41// newonuMetricsManager returns a new instance of the newonuMetricsManager
42func newonuMetricsManager(ctx context.Context, dh *deviceHandler) *onuMetricsManager {
43
44 var metricsManager onuMetricsManager
45 logger.Debugw(ctx, "init-onuMetricsManager", log.Fields{"device-id": dh.deviceID})
46 metricsManager.pDeviceHandler = dh
47
48 metricsManager.commMetricsChan = make(chan Message)
49 metricsManager.opticalMetricsChan = make(chan me.AttributeValueMap)
50 metricsManager.uniStatusMetricsChan = make(chan me.AttributeValueMap)
51 metricsManager.stopProcessingOmciResponses = make(chan bool)
52
53 return &metricsManager
54}
55
56func (mm *onuMetricsManager) collectOpticalMetrics(ctx context.Context) []*voltha.MetricInformation {
57 logger.Debugw(ctx, "collectOpticalMetrics", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
58 var metricInfoSlice []*voltha.MetricInformation
59 metricsContext := make(map[string]string)
60 metricsContext["onuID"] = fmt.Sprintf("%d", mm.pDeviceHandler.device.ProxyAddress.OnuId)
61 metricsContext["intfID"] = fmt.Sprintf("%d", mm.pDeviceHandler.device.ProxyAddress.ChannelId)
62 metricsContext["devicetype"] = mm.pDeviceHandler.DeviceType
63
64 raisedTs := time.Now().UnixNano()
65 mmd := voltha.MetricMetaData{
66 Title: "OpticalMetrics",
67 Ts: float64(raisedTs),
68 Context: metricsContext,
69 DeviceId: mm.pDeviceHandler.deviceID,
70 LogicalDeviceId: mm.pDeviceHandler.logicalDeviceID,
71 SerialNo: mm.pDeviceHandler.device.SerialNumber,
72 }
73
74 enabledMetrics := make([]string, 0)
75 // Populate enabled metrics
76 for _, m := range mm.pDeviceHandler.pmMetrics.ToPmConfigs().Metrics {
77 if m.Enabled {
78 enabledMetrics = append(enabledMetrics, m.Name)
79 }
80 }
81 logger.Debugw(ctx, "enabled metrics", log.Fields{"enabledMetrics": enabledMetrics})
82 // get the ANI-G instance IDs
83 anigInstKeys := mm.pDeviceHandler.pOnuOmciDevice.pOnuDB.getSortedInstKeys(ctx, me.AniGClassID)
84loop:
85 for _, anigInstID := range anigInstKeys {
86 var meAttributes me.AttributeValueMap
87 opticalMetrics := make(map[string]float32)
88 // Get the ANI-G instance optical power attributes
89 requestedAttributes := me.AttributeValueMap{"OpticalSignalLevel": 0, "TransmitOpticalLevel": 0}
90 if meInstance := mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendGetMe(ctx, me.AniGClassID, anigInstID, requestedAttributes, ConstDefaultOmciTimeout, true, mm.commMetricsChan); meInstance != nil {
91 select {
92 case meAttributes = <-mm.opticalMetricsChan:
93 logger.Debugw(ctx, "received optical metrics", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
94 case <-time.After(time.Duration(ConstDefaultOmciTimeout) * time.Second):
95 logger.Errorw(ctx, "timeout waiting for omci-get response for uni status", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
96 // The metrics will be empty in this case
97 break loop
98 }
99 // Populate metric only if it was enabled.
100 for _, v := range enabledMetrics {
101 switch v {
102 case "transmit_power":
103 opticalMetrics["transmit_power"] = float32(meAttributes["TransmitOpticalLevel"].(uint16))
104 case "receive_power":
105 opticalMetrics["receive_power"] = float32(meAttributes["OpticalSignalLevel"].(uint16))
106 default:
107 // do nothing
108 }
109 }
110 }
111 // create slice of metrics given that there could be more than one ANI-G instance and
112 // optical metrics are collected per ANI-G instance
113 metricInfo := voltha.MetricInformation{Metadata: &mmd, Metrics: opticalMetrics}
114 metricInfoSlice = append(metricInfoSlice, &metricInfo)
115 }
116
117 return metricInfoSlice
118}
119
120// Note: UNI status does not seem to be a metric, but this is being treated as metric in Python implementation
121// nolint: gocyclo
122func (mm *onuMetricsManager) collectUniStatusMetrics(ctx context.Context) []*voltha.MetricInformation {
123 logger.Debugw(ctx, "collectUniStatusMetrics", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
124 var metricInfoSlice []*voltha.MetricInformation
125 metricsContext := make(map[string]string)
126 metricsContext["onuID"] = fmt.Sprintf("%d", mm.pDeviceHandler.device.ProxyAddress.OnuId)
127 metricsContext["intfID"] = fmt.Sprintf("%d", mm.pDeviceHandler.device.ProxyAddress.ChannelId)
128 metricsContext["devicetype"] = mm.pDeviceHandler.DeviceType
129
130 raisedTs := time.Now().UnixNano()
131 mmd := voltha.MetricMetaData{
132 Title: "UniStatus", // Is this ok to hard code?
133 Ts: float64(raisedTs),
134 Context: metricsContext,
135 DeviceId: mm.pDeviceHandler.deviceID,
136 LogicalDeviceId: mm.pDeviceHandler.logicalDeviceID,
137 SerialNo: mm.pDeviceHandler.device.SerialNumber,
138 }
139
140 enabledMetrics := make([]string, 0)
141 // Populate enabled metrics
142 for _, m := range mm.pDeviceHandler.pmMetrics.ToPmConfigs().Metrics {
143 if m.Enabled {
144 enabledMetrics = append(enabledMetrics, m.Name)
145 }
146 }
147 logger.Debugw(ctx, "enabled metrics", log.Fields{"enabledMetrics": enabledMetrics})
148
149 // get the UNI-G instance IDs
150 unigInstKeys := mm.pDeviceHandler.pOnuOmciDevice.pOnuDB.getSortedInstKeys(ctx, me.UniGClassID)
151loop1:
152 for _, unigInstID := range unigInstKeys {
153 // TODO: Include additional information in the voltha.MetricMetaData - like portno, uni-id, instance-id
154 // to uniquely identify this ME instance and also to correlate the ME instance to physical instance
155 unigMetrics := make(map[string]float32)
156 var meAttributes me.AttributeValueMap
157 // Get the UNI-G instance optical power attributes
158 requestedAttributes := me.AttributeValueMap{"AdministrativeState": 0}
159 if meInstance := mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendGetMe(ctx, me.UniGClassID, unigInstID, requestedAttributes, ConstDefaultOmciTimeout, true, mm.commMetricsChan); meInstance != nil {
160 // Wait for metrics or timeout
161 select {
162 case meAttributes = <-mm.uniStatusMetricsChan:
163 logger.Debugw(ctx, "received uni-g metrics", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
164 case <-time.After(time.Duration(ConstDefaultOmciTimeout) * time.Second):
165 logger.Errorw(ctx, "timeout waiting for omci-get response for uni status", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
166 // The metrics could be empty in this case
167 break loop1
168 }
169 // Populate metric only if it was enabled.
170 for _, v := range enabledMetrics {
171 switch v {
172 case "uni_admin_state":
173 unigMetrics["uni_admin_state"] = float32(meAttributes["AdministrativeState"].(byte))
174 default:
175 // do nothing
176 }
177 }
178 // create slice of metrics given that there could be more than one UNI-G instance
179 metricInfo := voltha.MetricInformation{Metadata: &mmd, Metrics: unigMetrics}
180 metricInfoSlice = append(metricInfoSlice, &metricInfo)
181 }
182 }
183
184 // get the PPTP instance IDs
185 pptpInstKeys := mm.pDeviceHandler.pOnuOmciDevice.pOnuDB.getSortedInstKeys(ctx, me.PhysicalPathTerminationPointEthernetUniClassID)
186loop2:
187 for _, pptpInstID := range pptpInstKeys {
188 // TODO: Include additional information in the voltha.MetricMetaData - like portno, uni-id, instance-id
189 // to uniquely identify this ME instance and also to correlate the ME instance to physical instance
190 var meAttributes me.AttributeValueMap
191 pptpMetrics := make(map[string]float32)
192
193 requestedAttributes := me.AttributeValueMap{"SensedType": 0, "OperationalState": 0, "AdministrativeState": 0}
194 if meInstance := mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendGetMe(ctx, me.PhysicalPathTerminationPointEthernetUniClassID, pptpInstID, requestedAttributes, ConstDefaultOmciTimeout, true, mm.commMetricsChan); meInstance != nil {
195 // Wait for metrics or timeout
196 select {
197 case meAttributes = <-mm.uniStatusMetricsChan:
198 logger.Debugw(ctx, "received pptp metrics", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
199 case <-time.After(time.Duration(ConstDefaultOmciTimeout) * time.Second):
200 logger.Errorw(ctx, "timeout waiting for omci-get response for uni status", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
201 // The metrics could be empty in this case
202 break loop2
203 }
204
205 // Populate metric only if it was enabled.
206 for _, v := range enabledMetrics {
207 switch v {
208 case "ethernet_type":
209 pptpMetrics["ethernet_type"] = float32(meAttributes["SensedType"].(byte))
210 case "oper_status":
211 pptpMetrics["oper_status"] = float32(meAttributes["OperationalState"].(byte))
212 case "uni_admin_state":
213 pptpMetrics["uni_admin_state"] = float32(meAttributes["AdministrativeState"].(byte))
214 default:
215 // do nothing
216 }
217 }
218 }
219 // create slice of metrics given that there could be more than one PPTP instance and
220 metricInfo := voltha.MetricInformation{Metadata: &mmd, Metrics: pptpMetrics}
221 metricInfoSlice = append(metricInfoSlice, &metricInfo)
222 }
223
224 // get the VEIP instance IDs
225 veipInstKeys := mm.pDeviceHandler.pOnuOmciDevice.pOnuDB.getSortedInstKeys(ctx, me.VirtualEthernetInterfacePointClassID)
226loop3:
227 for _, veipInstID := range veipInstKeys {
228 // TODO: Include additional information in the voltha.MetricMetaData - like portno, uni-id, instance-id
229 // to uniquely identify this ME instance and also to correlate the ME instance to physical instance
230 var meAttributes me.AttributeValueMap
231 pptpMetrics := make(map[string]float32)
232
233 requestedAttributes := me.AttributeValueMap{"OperationalState": 0, "AdministrativeState": 0}
234 if meInstance := mm.pDeviceHandler.pOnuOmciDevice.PDevOmciCC.sendGetMe(ctx, me.VirtualEthernetInterfacePointClassID, veipInstID, requestedAttributes, ConstDefaultOmciTimeout, true, mm.commMetricsChan); meInstance != nil {
235 // Wait for metrics or timeout
236 select {
237 case meAttributes = <-mm.uniStatusMetricsChan:
238 logger.Debugw(ctx, "received veip metrics", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
239 case <-time.After(time.Duration(ConstDefaultOmciTimeout) * time.Second):
240 logger.Errorw(ctx, "timeout waiting for omci-get response for uni status", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
241 // The metrics could be empty in this case
242 break loop3
243 }
244
245 // Populate metric only if it was enabled.
246 for _, v := range enabledMetrics {
247 switch v {
248 case "oper_status":
249 pptpMetrics["oper_status"] = float32(meAttributes["OperationalState"].(byte))
250 case "uni_admin_state":
251 pptpMetrics["uni_admin_state"] = float32(meAttributes["AdministrativeState"].(byte))
252 default:
253 // do nothing
254 }
255 }
256 }
257 // create slice of metrics given that there could be more than one VEIP instance
258 metricInfo := voltha.MetricInformation{Metadata: &mmd, Metrics: pptpMetrics}
259 metricInfoSlice = append(metricInfoSlice, &metricInfo)
260 }
261
262 return metricInfoSlice
263}
264
265// publishMetrics publishes the metrics on kafka
266func (mm *onuMetricsManager) publishMetrics(ctx context.Context, metricInfo []*voltha.MetricInformation) {
267 var ke voltha.KpiEvent2
268 ts := time.Now().UnixNano()
269 ke.SliceData = metricInfo
270 ke.Type = voltha.KpiEventType_slice
271 ke.Ts = float64(ts)
272
273 if err := mm.pDeviceHandler.EventProxy.SendKpiEvent(ctx, "STATS_EVENT", &ke, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_ONU, ts); err != nil {
274 logger.Errorw(ctx, "failed-to-send-pon-stats", log.Fields{"err": err})
275 }
276}
277
278func (mm *onuMetricsManager) processOmciMessages(ctx context.Context) {
279 logger.Infow(ctx, "Start routine to process OMCI-GET messages for device-id", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
280 // Flush metric collection channels to be safe.
281 // It is possible that there is stale data on this channel if the processOmciMessages routine
282 // is stopped right after issuing a OMCI-GET request and started again.
283 // The processOmciMessages routine will get stopped if startCollector routine (in device_handler.go)
284 // is stopped - as a result of ONU going down.
285 mm.flushMetricCollectionChannels(ctx)
286
287 for {
288 select {
289 case <-mm.stopProcessingOmciResponses: // stop this routine
290 logger.Infow(ctx, "Stop routine to process OMCI-GET messages for device-id", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
291 return
292 case message, ok := <-mm.commMetricsChan:
293 if !ok {
294 logger.Errorw(ctx, "Message couldn't be read from channel", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
295 continue
296 }
297 logger.Debugw(ctx, "Received message on ONU metrics channel", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
298
299 switch message.Type {
300 case OMCI:
301 msg, _ := message.Data.(OmciMessage)
302 mm.handleOmciMessage(ctx, msg)
303 default:
304 logger.Warn(ctx, "Unknown message type received", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "message.Type": message.Type})
305 }
306 }
307 }
308}
309
310func (mm *onuMetricsManager) handleOmciMessage(ctx context.Context, msg OmciMessage) {
311 logger.Debugw(ctx, "omci Msg", log.Fields{"device-id": mm.pDeviceHandler.deviceID,
312 "msgType": msg.OmciMsg.MessageType, "msg": msg})
313 switch msg.OmciMsg.MessageType {
314 case omci.GetResponseType:
315 //TODO: error handling
316 _ = mm.handleOmciGetResponseMessage(ctx, msg)
317
318 default:
319 logger.Warnw(ctx, "Unknown Message Type", log.Fields{"msgType": msg.OmciMsg.MessageType})
320
321 }
322}
323
324func (mm *onuMetricsManager) handleOmciGetResponseMessage(ctx context.Context, msg OmciMessage) error {
325 msgLayer := (*msg.OmciPacket).Layer(omci.LayerTypeGetResponse)
326 if msgLayer == nil {
327 logger.Errorw(ctx, "omci Msg layer could not be detected for GetResponse - handling stopped", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
328 return fmt.Errorf("omci Msg layer could not be detected for GetResponse - handling stopped: %s", mm.pDeviceHandler.deviceID)
329 }
330 msgObj, msgOk := msgLayer.(*omci.GetResponse)
331 if !msgOk {
332 logger.Errorw(ctx, "omci Msg layer could not be assigned for GetResponse - handling stopped", log.Fields{"device-id": mm.pDeviceHandler.deviceID})
333 return fmt.Errorf("omci Msg layer could not be assigned for GetResponse - handling stopped: %s", mm.pDeviceHandler.deviceID)
334 }
335 logger.Debugw(ctx, "OMCI GetResponse Data", log.Fields{"device-id": mm.pDeviceHandler.deviceID, "data-fields": msgObj})
336 if msgObj.Result == me.Success {
337 meAttributes := msgObj.Attributes
338 switch msgObj.EntityClass {
339 case me.AniGClassID:
340 mm.opticalMetricsChan <- meAttributes
341 return nil
342 case me.UniGClassID:
343 mm.uniStatusMetricsChan <- meAttributes
344 return nil
345 case me.PhysicalPathTerminationPointEthernetUniClassID:
346 mm.uniStatusMetricsChan <- meAttributes
347 return nil
348 case me.VirtualEthernetInterfacePointClassID:
349 mm.uniStatusMetricsChan <- meAttributes
350 return nil
351 default:
352 logger.Errorw(ctx, "unhandled omci get response message",
353 log.Fields{"device-id": mm.pDeviceHandler.deviceID, "class-id": msgObj.EntityClass})
354 }
355 }
356
357 return errors.New("unhandled-omci-get-response-message")
358}
359
360// flushMetricCollectionChannels flushes all metric collection channels for any stale OMCI responses
361func (mm *onuMetricsManager) flushMetricCollectionChannels(ctx context.Context) {
362 // flush commMetricsChan
363 select {
364 case <-mm.commMetricsChan:
365 logger.Debug(ctx, "flushed common metrics channel")
366 default:
367 }
368
369 // flush opticalMetricsChan
370 select {
371 case <-mm.opticalMetricsChan:
372 logger.Debug(ctx, "flushed optical metrics channel")
373 default:
374 }
375
376 // flush uniStatusMetricsChan
377 select {
378 case <-mm.uniStatusMetricsChan:
379 logger.Debug(ctx, "flushed uni status metrics channel")
380 default:
381 }
382}