blob: 1744406cb90870897a15d2088416e70f1ee6cb76 [file] [log] [blame]
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +00001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//Package adaptercoreonu provides the utility for onu devices, flows and statistics
18package adaptercoreonu
19
20import (
21 "context"
22 "encoding/hex"
23 "errors"
24 "fmt"
Holger Hildebrandt24d51952020-05-04 14:03:42 +000025 "strconv"
26 "strings"
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000027 "sync"
28 "time"
29
30 "github.com/gogo/protobuf/proto"
31 "github.com/golang/protobuf/ptypes"
32 "github.com/looplab/fsm"
33 "github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
34 "github.com/opencord/voltha-lib-go/v3/pkg/log"
Holger Hildebrandt24d51952020-05-04 14:03:42 +000035 vc "github.com/opencord/voltha-protos/v3/go/common"
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000036 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
Holger Hildebrandt24d51952020-05-04 14:03:42 +000037 of "github.com/opencord/voltha-protos/v3/go/openflow_13"
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000038 oop "github.com/opencord/voltha-protos/v3/go/openolt"
39 "github.com/opencord/voltha-protos/v3/go/voltha"
40)
41
42/*
43// Constants for number of retries and for timeout
44const (
45 MaxRetry = 10
46 MaxTimeOutInMs = 500
47)
48*/
49
Holger Hildebrandt24d51952020-05-04 14:03:42 +000050//Event category and subcategory definitions - same as defiend for OLT in eventmgr.go - should be done more centrally
51const (
52 pon = voltha.EventSubCategory_PON
53 olt = voltha.EventSubCategory_OLT
54 ont = voltha.EventSubCategory_ONT
55 onu = voltha.EventSubCategory_ONU
56 nni = voltha.EventSubCategory_NNI
57 service = voltha.EventCategory_SERVICE
58 security = voltha.EventCategory_SECURITY
59 equipment = voltha.EventCategory_EQUIPMENT
60 processing = voltha.EventCategory_PROCESSING
61 environment = voltha.EventCategory_ENVIRONMENT
62 communication = voltha.EventCategory_COMMUNICATION
63)
64
65const (
66 cEventObjectType = "ONU"
67)
68const (
69 cOnuActivatedEvent = "ONU_ACTIVATED"
70)
71
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000072//DeviceHandler will interact with the ONU ? device.
73type DeviceHandler struct {
74 deviceID string
75 DeviceType string
76 adminState string
77 device *voltha.Device
78 logicalDeviceID string
79 ProxyAddressID string
80 ProxyAddressType string
Holger Hildebrandt24d51952020-05-04 14:03:42 +000081 parentId string
82 ponPortNumber uint32
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000083
84 coreProxy adapterif.CoreProxy
85 AdapterProxy adapterif.AdapterProxy
86 EventProxy adapterif.EventProxy
87 pOpenOnuAc *OpenONUAC
88 pDeviceStateFsm *fsm.FSM
89 pPonPort *voltha.Port
90 pOnuOmciDevice *OnuDeviceEntry
91 exitChannel chan int
92 lockDevice sync.RWMutex
Holger Hildebrandt24d51952020-05-04 14:03:42 +000093 pOnuIndication *oop.OnuIndication
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +000094
95 //Client oop.OpenoltClient
96 //clientCon *grpc.ClientConn
97 //flowMgr *OpenOltFlowMgr
98 //eventMgr *OpenOltEventMgr
99 //resourceMgr *rsrcMgr.OpenOltResourceMgr
100
101 //discOnus sync.Map
102 //onus sync.Map
103 //portStats *OpenOltStatisticsMgr
104 //metrics *pmmetrics.PmMetrics
105 stopCollector chan bool
106 stopHeartbeatCheck chan bool
107 activePorts sync.Map
Holger Hildebrandt24d51952020-05-04 14:03:42 +0000108 uniEntityMap map[uint32]*OnuUniPort
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000109}
110
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000111//NewDeviceHandler creates a new device handler
112func NewDeviceHandler(cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep adapterif.EventProxy, device *voltha.Device, adapter *OpenONUAC) *DeviceHandler {
113 var dh DeviceHandler
114 dh.coreProxy = cp
115 dh.AdapterProxy = ap
116 dh.EventProxy = ep
117 cloned := (proto.Clone(device)).(*voltha.Device)
118 dh.deviceID = cloned.Id
119 dh.DeviceType = cloned.Type
120 dh.adminState = "up"
121 dh.device = cloned
122 dh.pOpenOnuAc = adapter
123 dh.exitChannel = make(chan int, 1)
124 dh.lockDevice = sync.RWMutex{}
125 dh.stopCollector = make(chan bool, 2)
126 dh.stopHeartbeatCheck = make(chan bool, 2)
127 //dh.metrics = pmmetrics.NewPmMetrics(cloned.Id, pmmetrics.Frequency(150), pmmetrics.FrequencyOverride(false), pmmetrics.Grouped(false), pmmetrics.Metrics(pmNames))
128 dh.activePorts = sync.Map{}
129 //TODO initialize the support classes.
Holger Hildebrandt24d51952020-05-04 14:03:42 +0000130 dh.uniEntityMap = make(map[uint32]*OnuUniPort)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000131
132 // Device related state machine
133 dh.pDeviceStateFsm = fsm.NewFSM(
134 "null",
135 fsm.Events{
136 {Name: "DeviceInit", Src: []string{"null", "down"}, Dst: "init"},
137 {Name: "GrpcConnected", Src: []string{"init"}, Dst: "connected"},
138 {Name: "GrpcDisconnected", Src: []string{"connected", "down"}, Dst: "init"},
139 {Name: "DeviceUpInd", Src: []string{"connected", "down"}, Dst: "up"},
140 {Name: "DeviceDownInd", Src: []string{"up"}, Dst: "down"},
141 },
142 fsm.Callbacks{
143 "before_event": func(e *fsm.Event) { dh.logStateChange(e) },
144 "before_DeviceInit": func(e *fsm.Event) { dh.doStateInit(e) },
145 "after_DeviceInit": func(e *fsm.Event) { dh.postInit(e) },
146 "before_GrpcConnected": func(e *fsm.Event) { dh.doStateConnected(e) },
147 "before_GrpcDisconnected": func(e *fsm.Event) { dh.doStateInit(e) },
148 "after_GrpcDisconnected": func(e *fsm.Event) { dh.postInit(e) },
149 "before_DeviceUpInd": func(e *fsm.Event) { dh.doStateUp(e) },
150 "before_DeviceDownInd": func(e *fsm.Event) { dh.doStateDown(e) },
151 },
152 )
153 return &dh
154}
155
156// start save the device to the data model
157func (dh *DeviceHandler) Start(ctx context.Context) {
158 logger.Debugw("starting-device-handler", log.Fields{"device": dh.device, "deviceId": dh.deviceID})
159 // Add the initial device to the local model
160 logger.Debug("device-handler-started")
161}
162
163// stop stops the device dh. Not much to do for now
164func (dh *DeviceHandler) stop(ctx context.Context) {
165 logger.Debug("stopping-device-handler")
166 dh.exitChannel <- 1
167}
168
169// ##########################################################################################
170// DeviceHandler methods that implement the adapters interface requests ##### begin #########
171
172//AdoptDevice adopts the OLT device
173func (dh *DeviceHandler) AdoptDevice(ctx context.Context, device *voltha.Device) {
174 logger.Debugw("Adopt_device", log.Fields{"deviceID": device.Id, "Address": device.GetHostAndPort()})
175
176 logger.Debug("Device FSM: ", log.Fields{"state": string(dh.pDeviceStateFsm.Current())})
177 if dh.pDeviceStateFsm.Is("null") {
178 if err := dh.pDeviceStateFsm.Event("DeviceInit"); err != nil {
179 logger.Errorw("Device FSM: Can't go to state DeviceInit", log.Fields{"err": err})
180 }
181 logger.Debug("Device FSM: ", log.Fields{"state": string(dh.pDeviceStateFsm.Current())})
182 } else {
183 logger.Debug("AdoptDevice: Agent/device init already done")
184 }
185
186 /*
187 // Now, set the initial PM configuration for that device
188 if err := dh.coreProxy.DevicePMConfigUpdate(nil, dh.metrics.ToPmConfigs()); err != nil {
189 logger.Errorw("error-updating-PMs", log.Fields{"deviceId": device.Id, "error": err})
190 }
191
192 go startCollector(dh)
193 go startHeartbeatCheck(dh)
194 */
195}
196
197//ProcessInterAdapterMessage sends the proxied messages to the target device
198// If the proxy address is not found in the unmarshalled message, it first fetches the onu device for which the message
199// is meant, and then send the unmarshalled omci message to this onu
200func (dh *DeviceHandler) ProcessInterAdapterMessage(msg *ic.InterAdapterMessage) error {
201 msgID := msg.Header.Id
202 msgType := msg.Header.Type
203 fromTopic := msg.Header.FromTopic
204 toTopic := msg.Header.ToTopic
205 toDeviceID := msg.Header.ToDeviceId
206 proxyDeviceID := msg.Header.ProxyDeviceId
207 logger.Debugw("InterAdapter message header", log.Fields{"msgID": msgID, "msgType": msgType,
208 "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
209
210 switch msgType {
211 case ic.InterAdapterMessageType_OMCI_REQUEST:
212 {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000213 msgBody := msg.GetBody()
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000214 omciMsg := &ic.InterAdapterOmciMessage{}
215 if err := ptypes.UnmarshalAny(msgBody, omciMsg); err != nil {
216 logger.Warnw("cannot-unmarshal-omci-msg-body", log.Fields{"error": err})
217 return err
218 }
219
220 //assuming omci message content is hex coded!
221 // with restricted output of 16(?) bytes would be ...omciMsg.Message[:16]
222 logger.Debugw("inter-adapter-recv-omci",
223 log.Fields{"RxOmciMessage": hex.EncodeToString(omciMsg.Message)})
224 //receive_message(omci_msg.message)
225 return dh.GetOnuDeviceEntry().PDevOmciCC.ReceiveMessage(context.TODO(), omciMsg.Message)
226 }
227 case ic.InterAdapterMessageType_ONU_IND_REQUEST:
228 {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000229 msgBody := msg.GetBody()
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000230 onu_indication := &oop.OnuIndication{}
231 if err := ptypes.UnmarshalAny(msgBody, onu_indication); err != nil {
232 logger.Warnw("cannot-unmarshal-onu-indication-msg-body", log.Fields{"error": err})
233 return err
234 }
235
236 onu_operstate := onu_indication.GetOperState()
237 logger.Debugw("inter-adapter-recv-onu-ind", log.Fields{"OnuId": onu_indication.GetOnuId(),
238 "AdminState": onu_indication.GetAdminState(), "OperState": onu_operstate,
239 "SNR": onu_indication.GetSerialNumber()})
240
241 //interface related functioons might be error checked ....
242 if onu_operstate == "up" {
243 dh.create_interface(onu_indication)
244 } else if (onu_operstate == "down") || (onu_operstate == "unreachable") {
245 dh.update_interface(onu_indication)
246 } else {
247 logger.Errorw("unknown-onu-indication operState", log.Fields{"OnuId": onu_indication.GetOnuId()})
248 return errors.New("InvalidOperState")
249 }
250 }
251 default:
252 {
253 logger.Errorw("inter-adapter-unhandled-type", log.Fields{"msgType": msg.Header.Type})
254 return errors.New("unimplemented")
255 }
256 }
257
258 /* form py code:
259 elif request.header.type == InterAdapterMessageType.TECH_PROFILE_DOWNLOAD_REQUEST:
260 tech_msg = InterAdapterTechProfileDownloadMessage()
261 request.body.Unpack(tech_msg)
262 self.logger.debug('inter-adapter-recv-tech-profile', tech_msg=tech_msg)
263
264 self.load_and_configure_tech_profile(tech_msg.uni_id, tech_msg.path)
265
266 elif request.header.type == InterAdapterMessageType.DELETE_GEM_PORT_REQUEST:
267 del_gem_msg = InterAdapterDeleteGemPortMessage()
268 request.body.Unpack(del_gem_msg)
269 self.logger.debug('inter-adapter-recv-del-gem', gem_del_msg=del_gem_msg)
270
271 self.delete_tech_profile(uni_id=del_gem_msg.uni_id,
272 gem_port_id=del_gem_msg.gem_port_id,
273 tp_path=del_gem_msg.tp_path)
274
275 elif request.header.type == InterAdapterMessageType.DELETE_TCONT_REQUEST:
276 del_tcont_msg = InterAdapterDeleteTcontMessage()
277 request.body.Unpack(del_tcont_msg)
278 self.logger.debug('inter-adapter-recv-del-tcont', del_tcont_msg=del_tcont_msg)
279
280 self.delete_tech_profile(uni_id=del_tcont_msg.uni_id,
281 alloc_id=del_tcont_msg.alloc_id,
282 tp_path=del_tcont_msg.tp_path)
283 else:
284 self.logger.error("inter-adapter-unhandled-type", request=request)
285 */
286 return nil
287}
288
Holger Hildebrandt24d51952020-05-04 14:03:42 +0000289func (dh *DeviceHandler) GetOfpPortInfo(device *voltha.Device,
290 portNo int64) (*ic.PortCapability, error) {
291 logger.Debugw("GetOfpPortInfo start", log.Fields{"deviceID": device.Id, "portNo": portNo})
292
293 //function body as per OLTAdapter handler code
294 // adapted with values from py dapter code
295 if pUniPort, exist := dh.uniEntityMap[uint32(portNo)]; exist {
296 var macOctets [6]uint8
297 macOctets[5] = 0x08
298 macOctets[4] = uint8(dh.ponPortNumber >> 8)
299 macOctets[3] = uint8(dh.ponPortNumber)
300 macOctets[2] = uint8(portNo >> 16)
301 macOctets[1] = uint8(portNo >> 8)
302 macOctets[0] = uint8(portNo)
303 hwAddr := genMacFromOctets(macOctets)
304 capacity := uint32(of.OfpPortFeatures_OFPPF_1GB_FD | of.OfpPortFeatures_OFPPF_FIBER)
305 name := device.SerialNumber + "-" + strconv.FormatUint(uint64(pUniPort.macBpNo), 10)
306 ofUniPortState := of.OfpPortState_OFPPS_LINK_DOWN
307 if pUniPort.operState == vc.OperStatus_ACTIVE {
308 ofUniPortState = of.OfpPortState_OFPPS_LIVE
309 }
310 logger.Debugw("setting LogicalPort", log.Fields{"with-name": name,
311 "withUniPort": pUniPort.name, "withMacBase": hwAddr, "OperState": ofUniPortState})
312
313 return &ic.PortCapability{
314 Port: &voltha.LogicalPort{
315 OfpPort: &of.OfpPort{
316 Name: name,
317 //HwAddr: macAddressToUint32Array(dh.device.MacAddress),
318 HwAddr: macAddressToUint32Array(hwAddr),
319 Config: 0,
320 State: uint32(ofUniPortState),
321 Curr: capacity,
322 Advertised: capacity,
323 Peer: capacity,
324 CurrSpeed: uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
325 MaxSpeed: uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
326 },
327 DeviceId: device.Id,
328 DevicePortNo: uint32(portNo),
329 },
330 }, nil
331 }
332 logger.Warnw("No UniPort found - abort", log.Fields{"for PortNo": uint32(portNo)})
333 return nil, errors.New("UniPort not found")
334}
335
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000336// DeviceHandler methods that implement the adapters interface requests## end #########
337// #####################################################################################
338
339// ################ to be updated acc. needs of ONU Device ########################
340// DeviceHandler StateMachine related state transition methods ##### begin #########
341
342func (dh *DeviceHandler) logStateChange(e *fsm.Event) {
343 logger.Debugw("Device FSM: ", log.Fields{"event name": string(e.Event), "src state": string(e.Src), "dst state": string(e.Dst), "device-id": dh.deviceID})
344}
345
346// doStateInit provides the device update to the core
347func (dh *DeviceHandler) doStateInit(e *fsm.Event) {
348
349 logger.Debug("doStateInit-started")
350 var err error
351
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000352 // populate what we know. rest comes later after mib sync
353 dh.device.Root = false
354 dh.device.Vendor = "OpenONU"
355 dh.device.Model = "go"
356 dh.device.Reason = "activating-onu"
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000357
Holger Hildebrandt24d51952020-05-04 14:03:42 +0000358 dh.logicalDeviceID = dh.deviceID // really needed - what for ??? //TODO!!!
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000359 dh.coreProxy.DeviceUpdate(context.TODO(), dh.device)
360
Holger Hildebrandt24d51952020-05-04 14:03:42 +0000361 dh.parentId = dh.device.ParentId
362 dh.ponPortNumber = dh.device.ParentPortNo
363
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000364 // store proxy parameters for later communication - assumption: invariant, else they have to be requested dynamically!!
365 dh.ProxyAddressID = dh.device.ProxyAddress.GetDeviceId()
366 dh.ProxyAddressType = dh.device.ProxyAddress.GetDeviceType()
367 logger.Debugw("device-updated", log.Fields{"deviceID": dh.deviceID, "proxyAddressID": dh.ProxyAddressID,
368 "proxyAddressType": dh.ProxyAddressType, "SNR": dh.device.SerialNumber,
Holger Hildebrandt24d51952020-05-04 14:03:42 +0000369 "ParentId": dh.parentId, "ParentPortNo": dh.ponPortNumber})
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000370
371 /*
372 self._pon = PonPort.create(self, self._pon_port_number)
373 self._pon.add_peer(self.parent_id, self._pon_port_number)
374 self.logger.debug('adding-pon-port-to-agent',
375 type=self._pon.get_port().type,
376 admin_state=self._pon.get_port().admin_state,
377 oper_status=self._pon.get_port().oper_status,
378 )
379 */
380 logger.Debug("adding-pon-port")
Holger Hildebrandt24d51952020-05-04 14:03:42 +0000381 var ponPortNo uint32 = 1
382 if dh.ponPortNumber != 0 {
383 ponPortNo = dh.ponPortNumber
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000384 }
385
386 pPonPort := &voltha.Port{
Holger Hildebrandt24d51952020-05-04 14:03:42 +0000387 PortNo: ponPortNo,
388 Label: fmt.Sprintf("pon-%d", ponPortNo),
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000389 Type: voltha.Port_PON_ONU,
390 OperStatus: voltha.OperStatus_ACTIVE,
Holger Hildebrandt24d51952020-05-04 14:03:42 +0000391 Peers: []*voltha.Port_PeerPort{{DeviceId: dh.parentId, // Peer device is OLT
392 PortNo: ponPortNo}}, // Peer port is parent's port number
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000393 }
394 if err = dh.coreProxy.PortCreated(context.TODO(), dh.deviceID, pPonPort); err != nil {
395 logger.Fatalf("Device FSM: PortCreated-failed-%s", err)
396 e.Cancel(err)
397 return
398 }
399 logger.Debug("doStateInit-done")
400}
401
402// postInit setups the DeviceEntry for the conerned device
403func (dh *DeviceHandler) postInit(e *fsm.Event) {
404
405 logger.Debug("postInit-started")
406 var err error
407 /*
408 dh.Client = oop.NewOpenoltClient(dh.clientCon)
409 dh.pTransitionMap.Handle(ctx, GrpcConnected)
410 return nil
411 */
412 if err = dh.Add_OnuDeviceEntry(context.TODO()); err != nil {
413 logger.Fatalf("Device FSM: Add_OnuDeviceEntry-failed-%s", err)
414 e.Cancel(err)
415 return
416 }
417
418 /*
419 ############################################################################
420 # Setup Alarm handler
421 self.events = AdapterEvents(self.core_proxy, device.id, self.logical_device_id,
422 device.serial_number)
423 ############################################################################
424 # Setup PM configuration for this device
425 # Pass in ONU specific options
426 kwargs = {
427 OnuPmMetrics.DEFAULT_FREQUENCY_KEY: OnuPmMetrics.DEFAULT_ONU_COLLECTION_FREQUENCY,
428 'heartbeat': self.heartbeat,
429 OnuOmciPmMetrics.OMCI_DEV_KEY: self._onu_omci_device
430 }
431 self.logger.debug('create-pm-metrics', device_id=device.id, serial_number=device.serial_number)
432 self._pm_metrics = OnuPmMetrics(self.events, self.core_proxy, self.device_id,
433 self.logical_device_id, device.serial_number,
434 grouped=True, freq_override=False, **kwargs)
435 pm_config = self._pm_metrics.make_proto()
436 self._onu_omci_device.set_pm_config(self._pm_metrics.omci_pm.openomci_interval_pm)
437 self.logger.info("initial-pm-config", device_id=device.id, serial_number=device.serial_number)
438 yield self.core_proxy.device_pm_config_update(pm_config, init=True)
439
440 # Note, ONU ID and UNI intf set in add_uni_port method
441 self._onu_omci_device.alarm_synchronizer.set_alarm_params(mgr=self.events,
442 ani_ports=[self._pon])
443
444 # Code to Run OMCI Test Action
445 kwargs_omci_test_action = {
446 OmciTestRequest.DEFAULT_FREQUENCY_KEY:
447 OmciTestRequest.DEFAULT_COLLECTION_FREQUENCY
448 }
449 serial_number = device.serial_number
450 self._test_request = OmciTestRequest(self.core_proxy,
451 self.omci_agent, self.device_id,
452 AniG, serial_number,
453 self.logical_device_id,
454 exclusive=False,
455 **kwargs_omci_test_action)
456
457 self.enabled = True
458 else:
459 self.logger.info('onu-already-activated')
460 */
461 logger.Debug("postInit-done")
462}
463
464// doStateConnected get the device info and update to voltha core
465// for comparison of the original method (not that easy to uncomment): compare here:
466// voltha-openolt-adapter/adaptercore/device_handler.go
467// -> this one obviously initiates all communication interfaces of the device ...?
468func (dh *DeviceHandler) doStateConnected(e *fsm.Event) {
469
470 logger.Debug("doStateConnected-started")
471 var err error
472 err = errors.New("Device FSM: function not implemented yet!")
473 e.Cancel(err)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000474 logger.Debug("doStateConnected-done")
Matteo Scandolod132c0e2020-04-24 17:06:25 -0700475 return
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000476}
477
478// doStateUp handle the onu up indication and update to voltha core
479func (dh *DeviceHandler) doStateUp(e *fsm.Event) {
480
481 logger.Debug("doStateUp-started")
482 var err error
483 err = errors.New("Device FSM: function not implemented yet!")
484 e.Cancel(err)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000485 logger.Debug("doStateUp-done")
Matteo Scandolod132c0e2020-04-24 17:06:25 -0700486 return
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000487
488 /*
489 // Synchronous call to update device state - this method is run in its own go routine
490 if err := dh.coreProxy.DeviceStateUpdate(ctx, dh.device.Id, voltha.ConnectStatus_REACHABLE,
491 voltha.OperStatus_ACTIVE); err != nil {
492 logger.Errorw("Failed to update device with OLT UP indication", log.Fields{"deviceID": dh.device.Id, "error": err})
493 return err
494 }
495 return nil
496 */
497}
498
499// doStateDown handle the onu down indication
500func (dh *DeviceHandler) doStateDown(e *fsm.Event) {
501
502 logger.Debug("doStateDown-started")
503 var err error
504
Holger Hildebrandt24d51952020-05-04 14:03:42 +0000505 device := dh.device
506 if device == nil {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000507 /*TODO: needs to handle error scenarios */
Holger Hildebrandt24d51952020-05-04 14:03:42 +0000508 logger.Error("Failed to fetch handler device")
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000509 e.Cancel(err)
510 return
511 }
512
513 cloned := proto.Clone(device).(*voltha.Device)
514 logger.Debugw("do-state-down", log.Fields{"ClonedDeviceID": cloned.Id})
515 /*
516 // Update the all ports state on that device to disable
517 if er := dh.coreProxy.PortsStateUpdate(ctx, cloned.Id, voltha.OperStatus_UNKNOWN); er != nil {
518 logger.Errorw("updating-ports-failed", log.Fields{"deviceID": device.Id, "error": er})
519 return er
520 }
521
522 //Update the device oper state and connection status
523 cloned.OperStatus = voltha.OperStatus_UNKNOWN
524 cloned.ConnectStatus = common.ConnectStatus_UNREACHABLE
525 dh.device = cloned
526
527 if er := dh.coreProxy.DeviceStateUpdate(ctx, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); er != nil {
528 logger.Errorw("error-updating-device-state", log.Fields{"deviceID": device.Id, "error": er})
529 return er
530 }
531
532 //get the child device for the parent device
533 onuDevices, err := dh.coreProxy.GetChildDevices(ctx, dh.device.Id)
534 if err != nil {
535 logger.Errorw("failed to get child devices information", log.Fields{"deviceID": dh.device.Id, "error": err})
536 return err
537 }
538 for _, onuDevice := range onuDevices.Items {
539
540 // Update onu state as down in onu adapter
541 onuInd := oop.OnuIndication{}
542 onuInd.OperState = "down"
543 er := dh.AdapterProxy.SendInterAdapterMessage(ctx, &onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST,
544 "openolt", onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
545 if er != nil {
546 logger.Errorw("Failed to send inter-adapter-message", log.Fields{"OnuInd": onuInd,
547 "From Adapter": "openolt", "DevieType": onuDevice.Type, "DeviceID": onuDevice.Id})
548 //Do not return here and continue to process other ONUs
549 }
550 }
551 // * Discovered ONUs entries need to be cleared , since after OLT
552 // is up, it starts sending discovery indications again* /
553 dh.discOnus = sync.Map{}
554 logger.Debugw("do-state-down-end", log.Fields{"deviceID": device.Id})
555 return nil
556 */
557 err = errors.New("Device FSM: function not implemented yet!")
558 e.Cancel(err)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000559 logger.Debug("doStateDown-done")
Matteo Scandolod132c0e2020-04-24 17:06:25 -0700560 return
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000561}
562
563// DeviceHandler StateMachine related state transition methods ##### end #########
564// #################################################################################
565
566// ###################################################
567// DeviceHandler utility methods ##### begin #########
568
569// Get ONU device entry for this deviceId specific handler
570func (dh *DeviceHandler) GetOnuDeviceEntry() *OnuDeviceEntry {
571 dh.lockDevice.Lock()
572 defer dh.lockDevice.Unlock()
573 if dh.pOnuOmciDevice != nil {
574 logger.Debugw("GetOnuDeviceEntry params:",
575 log.Fields{"onu_device_entry": dh.pOnuOmciDevice, "device_id": dh.pOnuOmciDevice.deviceID,
576 "device_handler": dh.pOnuOmciDevice.baseDeviceHandler, "core_proxy": dh.pOnuOmciDevice.coreProxy, "adapter_proxy": dh.pOnuOmciDevice.adapterProxy})
577 } else {
578 logger.Error("GetOnuDeviceEntry returns nil")
579 }
580 return dh.pOnuOmciDevice
581}
582
583// Set ONU device entry
584func (dh *DeviceHandler) SetOnuDeviceEntry(pDeviceEntry *OnuDeviceEntry) error {
585 dh.lockDevice.Lock()
586 defer dh.lockDevice.Unlock()
587 dh.pOnuOmciDevice = pDeviceEntry
588 return nil
589}
590
591//creates a new ONU device or returns the existing
592func (dh *DeviceHandler) Add_OnuDeviceEntry(ctx context.Context) error {
593 logger.Debugw("adding-deviceEntry", log.Fields{"for deviceId": dh.deviceID})
594
595 deviceEntry := dh.GetOnuDeviceEntry()
596 if deviceEntry == nil {
597 /* costum_me_map in python code seems always to be None,
598 we omit that here first (declaration unclear) -> todo at Adapter specialization ...*/
599 /* also no 'clock' argument - usage open ...*/
600 /* and no alarm_db yet (oo.alarm_db) */
601 deviceEntry = NewOnuDeviceEntry(ctx, dh.deviceID, dh, dh.coreProxy, dh.AdapterProxy,
602 dh.pOpenOnuAc.pSupportedFsms) //nil as FSM pointer would yield deviceEntry internal defaults ...
603 //error treatment possible //TODO!!!
604 dh.SetOnuDeviceEntry(deviceEntry)
605 logger.Infow("onuDeviceEntry-added", log.Fields{"for deviceId": dh.deviceID})
606 } else {
607 logger.Infow("onuDeviceEntry-add: Device already exists", log.Fields{"for deviceId": dh.deviceID})
608 }
609 // might be updated with some error handling !!!
610 return nil
611}
612
613// doStateInit provides the device update to the core
614func (dh *DeviceHandler) create_interface(onuind *oop.OnuIndication) error {
Holger Hildebrandt24d51952020-05-04 14:03:42 +0000615 logger.Debugw("create_interface-started", log.Fields{"OnuId": onuind.GetOnuId(),
616 "OnuIntfId": onuind.GetIntfId(), "OnuSerialNumber": onuind.GetSerialNumber()})
617
618 dh.pOnuIndication = onuind // let's revise if storing the pointer is sufficient...
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000619
620 if err := dh.coreProxy.DeviceStateUpdate(context.TODO(), dh.deviceID, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVATING); err != nil {
621 logger.Errorw("error-updating-device-state", log.Fields{"deviceID": dh.deviceID, "error": err})
622 }
623
Holger Hildebrandt24d51952020-05-04 14:03:42 +0000624 // It does not look to me as if makes sense to work with the real core device here, (not the stored clone)?
625 // in this code the GetDevice would just make a check if the DeviceID's Device still exists in core
626 // in python code it looks as the started onu_omci_device might have been updated with some new instance state of the core device
627 // but I would not know why, and the go code anyway dows not work with the device directly anymore in the OnuDeviceEntry
628 // so let's just try to keep it simple ...
629 /*
630 device, err := dh.coreProxy.GetDevice(context.TODO(), dh.device.Id, dh.device.Id)
631 if err != nil || device == nil {
632 //TODO: needs to handle error scenarios
633 logger.Errorw("Failed to fetch device device at creating If", log.Fields{"err": err})
634 return errors.New("Voltha Device not found")
635 }
636 */
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000637
638 dh.GetOnuDeviceEntry().Start(context.TODO())
639 if err := dh.coreProxy.DeviceReasonUpdate(context.TODO(), dh.deviceID, "starting-openomci"); err != nil {
640 logger.Errorw("error-DeviceReasonUpdate to starting-openomci", log.Fields{"deviceID": dh.deviceID, "error": err})
641 }
642
643 /* this might be a good time for Omci Verify message? */
644 verifyExec := make(chan bool)
645 omci_verify := NewOmciTestRequest(context.TODO(),
646 dh.device.Id, dh.GetOnuDeviceEntry().PDevOmciCC,
647 true, true) //eclusive and allowFailure (anyway not yet checked)
648 omci_verify.PerformOmciTest(context.TODO(), verifyExec)
649
650 /* give the handler some time here to wait for the OMCi verification result
651 after Timeout start and try MibUpload FSM anyway
652 (to prevent stopping on just not supported OMCI verification from ONU) */
653 select {
654 case <-time.After(2 * time.Second):
655 logger.Warn("omci start-verification timed out (continue normal)")
656 case testresult := <-verifyExec:
657 logger.Infow("Omci start verification done", log.Fields{"result": testresult})
658 }
659
660 /* In py code it looks earlier (on activate ..)
661 # Code to Run OMCI Test Action
662 kwargs_omci_test_action = {
663 OmciTestRequest.DEFAULT_FREQUENCY_KEY:
664 OmciTestRequest.DEFAULT_COLLECTION_FREQUENCY
665 }
666 serial_number = device.serial_number
667 self._test_request = OmciTestRequest(self.core_proxy,
668 self.omci_agent, self.device_id,
669 AniG, serial_number,
670 self.logical_device_id,
671 exclusive=False,
672 **kwargs_omci_test_action)
673 ...
674 # Start test requests after a brief pause
675 if not self._test_request_started:
676 self._test_request_started = True
677 tststart = _STARTUP_RETRY_WAIT * (random.randint(1, 5))
678 reactor.callLater(tststart, self._test_request.start_collector)
679
680 */
681 /* which is then: in omci_test_request.py : */
682 /*
683 def start_collector(self, callback=None):
684 """
685 Start the collection loop for an adapter if the frequency > 0
686
687 :param callback: (callable) Function to call to collect PM data
688 """
689 self.logger.info("starting-pm-collection", device_name=self.name, default_freq=self.default_freq)
690 if callback is None:
691 callback = self.perform_test_omci
692
693 if self.lc is None:
694 self.lc = LoopingCall(callback)
695
696 if self.default_freq > 0:
697 self.lc.start(interval=self.default_freq / 10)
698
699 def perform_test_omci(self):
700 """
701 Perform the initial test request
702 """
703 ani_g_entities = self._device.configuration.ani_g_entities
704 ani_g_entities_ids = list(ani_g_entities.keys()) if ani_g_entities \
705 is not None else None
706 self._entity_id = ani_g_entities_ids[0]
707 self.logger.info('perform-test', entity_class=self._entity_class,
708 entity_id=self._entity_id)
709 try:
710 frame = MEFrame(self._entity_class, self._entity_id, []).test()
711 result = yield self._device.omci_cc.send(frame)
712 if not result.fields['omci_message'].fields['success_code']:
713 self.logger.info('Self-Test Submitted Successfully',
714 code=result.fields[
715 'omci_message'].fields['success_code'])
716 else:
717 raise TestFailure('Test Failure: {}'.format(
718 result.fields['omci_message'].fields['success_code']))
719 except TimeoutError as e:
720 self.deferred.errback(failure.Failure(e))
721
722 except Exception as e:
723 self.logger.exception('perform-test-Error', e=e,
724 class_id=self._entity_class,
725 entity_id=self._entity_id)
726 self.deferred.errback(failure.Failure(e))
727
728 */
729
730 // PM related heartbeat??? !!!TODO....
731 //self._heartbeat.enabled = True
732
733 //example how to call FSM - transition up to state "uploading"
734 if dh.GetOnuDeviceEntry().MibSyncFsm.Is("disabled") {
735
736 if err := dh.GetOnuDeviceEntry().MibSyncFsm.Event("start"); err != nil {
737 logger.Errorw("MibSyncFsm: Can't go to state starting", log.Fields{"err": err})
738 return errors.New("Can't go to state starting")
739 } else {
740 logger.Debug("MibSyncFsm", log.Fields{"state": string(dh.GetOnuDeviceEntry().MibSyncFsm.Current())})
741 //Determine ONU status and start/re-start MIB Synchronization tasks
742 //Determine if this ONU has ever synchronized
743 if true { //TODO: insert valid check
744 if err := dh.GetOnuDeviceEntry().MibSyncFsm.Event("load_mib_template"); err != nil {
745 logger.Errorw("MibSyncFsm: Can't go to state loading_mib_template", log.Fields{"err": err})
746 return errors.New("Can't go to state loading_mib_template")
747 } else {
748 logger.Debug("MibSyncFsm", log.Fields{"state": string(dh.GetOnuDeviceEntry().MibSyncFsm.Current())})
749 //Find and load a mib template. If not found proceed with mib_upload
750 // callbacks to be handled:
751 // Event("success")
752 // Event("timeout")
753 //no mib template found
754 if true { //TODO: insert valid check
755 if err := dh.GetOnuDeviceEntry().MibSyncFsm.Event("upload_mib"); err != nil {
756 logger.Errorw("MibSyncFsm: Can't go to state uploading", log.Fields{"err": err})
757 return errors.New("Can't go to state uploading")
758 } else {
759 logger.Debug("state of MibSyncFsm", log.Fields{"state": string(dh.GetOnuDeviceEntry().MibSyncFsm.Current())})
760 //Begin full MIB data upload, starting with a MIB RESET
761 // callbacks to be handled:
762 // success: e.Event("success")
763 // failure: e.Event("timeout")
764 }
765 }
766 }
767 } else {
768 dh.GetOnuDeviceEntry().MibSyncFsm.Event("examine_mds")
769 logger.Debug("state of MibSyncFsm", log.Fields{"state": string(dh.GetOnuDeviceEntry().MibSyncFsm.Current())})
770 //Examine the MIB Data Sync
771 // callbacks to be handled:
772 // Event("success")
773 // Event("timeout")
774 // Event("mismatch")
775 }
776 }
777 } else {
778 logger.Errorw("wrong state of MibSyncFsm - want: disabled", log.Fields{"have": string(dh.GetOnuDeviceEntry().MibSyncFsm.Current())})
779 return errors.New("wrong state of MibSyncFsm")
780 }
781 return nil
782}
783
784func (dh *DeviceHandler) update_interface(onuind *oop.OnuIndication) error {
785 logger.Debug("update_interface-started - not yet implemented")
786 return nil
787}
788
789func (dh *DeviceHandler) DeviceStateUpdate(dev_Event OnuDeviceEvent) {
790 if dev_Event == MibDatabaseSync {
Holger Hildebrandt24d51952020-05-04 14:03:42 +0000791 logger.Debugw("MibInSync event: update dev state to 'MibSync complete'", log.Fields{"deviceID": dh.deviceID})
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000792 //initiate DevStateUpdate
793 if err := dh.coreProxy.DeviceReasonUpdate(context.TODO(), dh.deviceID, "discovery-mibsync-complete"); err != nil {
794 logger.Errorw("error-DeviceReasonUpdate to 'mibsync-complete'", log.Fields{"deviceID": dh.deviceID, "error": err})
795 }
796
Holger Hildebrandt24d51952020-05-04 14:03:42 +0000797 for i := uint16(0); i < dh.GetOnuDeviceEntry().pOnuDB.unigMeCount; i++ {
798 mgmtEntityId, _ := dh.GetOnuDeviceEntry().pOnuDB.unigMe[i].GetAttribute("ManagedEntityId")
799 logger.Debugw("Add UNI port for stored UniG instance:", log.Fields{"deviceId": dh.GetOnuDeviceEntry().deviceID, "UnigMe EntityID": mgmtEntityId})
800 dh.addUniPort(mgmtEntityId.(uint16), i, UniPPTP)
801 }
802
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000803 // fixed assumption about PPTP/UNI-G ONU-config
804 // to be replaced by DB parsing of MibUpload data TODO!!!
805 // parameters are: InstanceNo, running UniNo, type
Holger Hildebrandt24d51952020-05-04 14:03:42 +0000806 // dh.addUniPort(257, 0, UniPPTP)
807 // dh.addUniPort(258, 1, UniPPTP)
808 // dh.addUniPort(259, 2, UniPPTP)
809 // dh.addUniPort(260, 3, UniPPTP)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000810
811 // start the MibDownload (assumed here to be done via some FSM again - open //TODO!!!)
812 /* the mib-download code may look something like that:
813 if err := dh.GetOnuDeviceEntry().MibDownloadFsm.Event("start"); err != nil {
814 logger.Errorw("MibDownloadFsm: Can't go to state starting", log.Fields{"err": err})
815 return errors.New("Can't go to state starting")
816 } else {
817 logger.Debug("MibDownloadFsm", log.Fields{"state": string(dh.GetOnuDeviceEntry().MibDownloadFsm.Current())})
818 //Determine ONU status and start/re-start MIB MibDownloadFsm
819 //Determine if this ONU has ever synchronized
820 if true { //TODO: insert valid check
821 if err := dh.GetOnuDeviceEntry().MibSyncFsm.Event("download_mib"); err != nil {
822 logger.Errorw("MibDownloadFsm: Can't go to state 'download_mib'", log.Fields{"err": err})
823 return errors.New("Can't go to state 'download_mib'")
824 } else {
825 //some further processing ???
826 logger.Debug("state of MibDownloadFsm", log.Fields{"state": string(dh.GetOnuDeviceEntry().MibDownloadFsm.Current())})
827 //some further processing ???
828 }
829 }
830 }
831 but by now we shortcut the download here and immediately fake the ONU-active state to get the state indication on ONUS!!!:
832 */
833 //shortcut code to fake download-done!!!:
834 go dh.GetOnuDeviceEntry().transferSystemEvent(MibDownloadDone)
835 } else if dev_Event == MibDownloadDone {
Holger Hildebrandt24d51952020-05-04 14:03:42 +0000836 logger.Debugw("MibDownloadDone event: update dev state to 'Oper.Active'", log.Fields{"deviceID": dh.deviceID})
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000837 //initiate DevStateUpdate
838 if err := dh.coreProxy.DeviceStateUpdate(context.TODO(), dh.deviceID,
839 voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE); err != nil {
840 logger.Errorw("error-updating-device-state", log.Fields{"deviceID": dh.deviceID, "error": err})
841 }
Holger Hildebrandt24d51952020-05-04 14:03:42 +0000842 logger.Debug("MibDownloadDone Event: update dev reason to 'initial-mib-downloaded'")
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000843 if err := dh.coreProxy.DeviceReasonUpdate(context.TODO(), dh.deviceID, "initial-mib-downloaded"); err != nil {
844 logger.Errorw("error-DeviceReasonUpdate to 'initial-mib-downloaded'",
845 log.Fields{"deviceID": dh.deviceID, "error": err})
846 }
847
Holger Hildebrandt24d51952020-05-04 14:03:42 +0000848 go dh.enableUniPortStateUpdate(dh.deviceID) //cmp python yield self.enable_ports()
849
850 raisedTs := time.Now().UnixNano()
851 go dh.sendOnuOperStateEvent(voltha.OperStatus_ACTIVE, dh.deviceID, raisedTs) //cmp python onu_active_event
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000852 } else {
Holger Hildebrandt24d51952020-05-04 14:03:42 +0000853 logger.Warnw("unhandled-device-event", log.Fields{"deviceID": dh.deviceID, "event": dev_Event})
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000854 }
855}
856
857func (dh *DeviceHandler) addUniPort(a_uniInstNo uint16, a_uniId uint16, a_portType UniPortType) {
Holger Hildebrandt24d51952020-05-04 14:03:42 +0000858 // parameters are IntfId, OnuId, uniId
859 uniNo := MkUniPortNum(dh.pOnuIndication.GetIntfId(), dh.pOnuIndication.GetOnuId(),
860 uint32(a_uniId))
861 if _, present := dh.uniEntityMap[uniNo]; present {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000862 logger.Warnw("onuUniPort-add: Port already exists", log.Fields{"for InstanceId": a_uniInstNo})
863 } else {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000864 //with arguments a_uniId, a_portNo, a_portType
Holger Hildebrandt24d51952020-05-04 14:03:42 +0000865 pUniPort := NewOnuUniPort(a_uniId, uniNo, a_uniInstNo, a_portType)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000866 if pUniPort == nil {
867 logger.Warnw("onuUniPort-add: Could not create Port", log.Fields{"for InstanceId": a_uniInstNo})
868 } else {
Holger Hildebrandt24d51952020-05-04 14:03:42 +0000869 //store UniPort with the System-PortNumber key
870 dh.uniEntityMap[uniNo] = pUniPort
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000871 // create announce the UniPort to the core as VOLTHA Port object
Holger Hildebrandt24d51952020-05-04 14:03:42 +0000872 if err := pUniPort.CreateVolthaPort(dh); err == nil {
873 logger.Infow("onuUniPort-added", log.Fields{"for PortNo": uniNo})
874 } //error logging already within UniPort method
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000875 }
876 }
877}
Holger Hildebrandt24d51952020-05-04 14:03:42 +0000878
879// Enable listen on UniPortState changes and update core port state accordingly
880func (dh *DeviceHandler) enableUniPortStateUpdate(a_deviceID string) {
Holger Hildebrandtbe674422020-05-05 13:05:30 +0000881 // py code was updated 2003xx to activate the real ONU UNI ports per OMCI (VEIP or PPTP)
882 // but towards core only the first port active state is signalled
883 // with following remark:
884 // # TODO: for now only support the first UNI given no requirement for multiple uni yet. Also needed to reduce flow
885 // # load on the core
886
887 // dh.lock_ports(false) ONU port activation via OMCI //TODO!!! not yet supported
888
Holger Hildebrandt24d51952020-05-04 14:03:42 +0000889 for uniNo, uniPort := range dh.uniEntityMap {
Holger Hildebrandtbe674422020-05-05 13:05:30 +0000890 // only if this port is validated for operState transfer}
891 if (1<<uniPort.uniId)&ActiveUniPortStateUpdateMask == (1 << uniPort.uniId) {
892 logger.Infow("onuUniPort-forced-OperState-ACTIVE", log.Fields{"for PortNo": uniNo})
893 uniPort.SetOperState(vc.OperStatus_ACTIVE)
894 //maybe also use getter functions on uniPort - perhaps later ...
895 go dh.coreProxy.PortStateUpdate(context.TODO(), a_deviceID, voltha.Port_ETHERNET_UNI, uniPort.portNo, uniPort.operState)
896 }
Holger Hildebrandt24d51952020-05-04 14:03:42 +0000897 }
898}
899
900// ONU_Active/Inactive announcement on system KAFKA bus
901// tried to re-use procedure of oltUpDownIndication from openolt_eventmgr.go with used values from Py code
902func (dh *DeviceHandler) sendOnuOperStateEvent(a_OperState vc.OperStatus_Types, a_deviceID string, raisedTs int64) {
903 var de voltha.DeviceEvent
904 eventContext := make(map[string]string)
905 //Populating event context
906 // assume giving ParentId in GetDevice twice really gives the ParentDevice (there is no GetParentDevice()...)
907 parentDevice, err := dh.coreProxy.GetDevice(context.TODO(), dh.parentId, dh.parentId)
908 if err != nil || parentDevice == nil {
909 logger.Errorw("Failed to fetch parent device for OnuEvent",
910 log.Fields{"parentId": dh.parentId, "err": err})
911 }
912 oltSerialNumber := parentDevice.SerialNumber
913
914 eventContext["pon-id"] = strconv.FormatUint(uint64(dh.pOnuIndication.IntfId), 10)
915 eventContext["onu-id"] = strconv.FormatUint(uint64(dh.pOnuIndication.OnuId), 10)
916 eventContext["serial-number"] = dh.device.SerialNumber
917 eventContext["olt_serial_number"] = oltSerialNumber
918 eventContext["device_id"] = a_deviceID
919 eventContext["registration_id"] = a_deviceID //py: string(device_id)??
920 logger.Debugw("prepare ONU_ACTIVATED event",
921 log.Fields{"DeviceId": a_deviceID, "EventContext": eventContext})
922
923 /* Populating device event body */
924 de.Context = eventContext
925 de.ResourceId = a_deviceID
926 if a_OperState == voltha.OperStatus_ACTIVE {
927 de.DeviceEventName = fmt.Sprintf("%s_%s", cOnuActivatedEvent, "RAISE_EVENT")
928 de.Description = fmt.Sprintf("%s Event - %s - %s",
929 cEventObjectType, cOnuActivatedEvent, "Raised")
930 } else {
931 de.DeviceEventName = fmt.Sprintf("%s_%s", cOnuActivatedEvent, "CLEAR_EVENT")
932 de.Description = fmt.Sprintf("%s Event - %s - %s",
933 cEventObjectType, cOnuActivatedEvent, "Cleared")
934 }
935 /* Send event to KAFKA */
936 if err := dh.EventProxy.SendDeviceEvent(&de, equipment, pon, raisedTs); err != nil {
937 logger.Warnw("could not send ONU_ACTIVATED event",
938 log.Fields{"DeviceId": a_deviceID, "error": err})
939 }
940 logger.Debugw("ONU_ACTIVATED event sent to KAFKA",
941 log.Fields{"DeviceId": a_deviceID, "with-EventName": de.DeviceEventName})
942}
943
944/* *********************************************************** */
945
946func genMacFromOctets(a_octets [6]uint8) string {
947 return fmt.Sprintf("%02x:%02x:%02x:%02x:%02x:%02x",
948 a_octets[5], a_octets[4], a_octets[3],
949 a_octets[2], a_octets[1], a_octets[0])
950}
951
952//copied from OLT Adapter: unify centrally ?
953func macAddressToUint32Array(mac string) []uint32 {
954 slist := strings.Split(mac, ":")
955 result := make([]uint32, len(slist))
956 var err error
957 var tmp int64
958 for index, val := range slist {
959 if tmp, err = strconv.ParseInt(val, 16, 32); err != nil {
960 return []uint32{1, 2, 3, 4, 5, 6}
961 }
962 result[index] = uint32(tmp)
963 }
964 return result
965}