blob: 21e72fc69600504310d64d814c3170fb3e7beae0 [file] [log] [blame]
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +00001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//Package adaptercoreonu provides the utility for onu devices, flows and statistics
18package adaptercoreonu
19
20import (
21 "context"
22 "encoding/hex"
23 "errors"
24 "fmt"
25 "sync"
26 "time"
27
28 "github.com/gogo/protobuf/proto"
29 "github.com/golang/protobuf/ptypes"
30 "github.com/looplab/fsm"
31 "github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
32 "github.com/opencord/voltha-lib-go/v3/pkg/log"
33 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
34 oop "github.com/opencord/voltha-protos/v3/go/openolt"
35 "github.com/opencord/voltha-protos/v3/go/voltha"
36)
37
38/*
39// Constants for number of retries and for timeout
40const (
41 MaxRetry = 10
42 MaxTimeOutInMs = 500
43)
44*/
45
46//DeviceHandler will interact with the ONU ? device.
47type DeviceHandler struct {
48 deviceID string
49 DeviceType string
50 adminState string
51 device *voltha.Device
52 logicalDeviceID string
53 ProxyAddressID string
54 ProxyAddressType string
55
56 coreProxy adapterif.CoreProxy
57 AdapterProxy adapterif.AdapterProxy
58 EventProxy adapterif.EventProxy
59 pOpenOnuAc *OpenONUAC
60 pDeviceStateFsm *fsm.FSM
61 pPonPort *voltha.Port
62 pOnuOmciDevice *OnuDeviceEntry
63 exitChannel chan int
64 lockDevice sync.RWMutex
65
66 //Client oop.OpenoltClient
67 //clientCon *grpc.ClientConn
68 //flowMgr *OpenOltFlowMgr
69 //eventMgr *OpenOltEventMgr
70 //resourceMgr *rsrcMgr.OpenOltResourceMgr
71
72 //discOnus sync.Map
73 //onus sync.Map
74 //portStats *OpenOltStatisticsMgr
75 //metrics *pmmetrics.PmMetrics
76 stopCollector chan bool
77 stopHeartbeatCheck chan bool
78 activePorts sync.Map
79 uniEntityMap map[uint16]*OnuUniPort
80}
81
82/*
83//OnuDevice represents ONU related info
84type OnuDevice struct {
85 deviceID string
86 deviceType string
87 serialNumber string
88 onuID uint32
89 intfID uint32
90 proxyDeviceID string
91 uniPorts map[uint32]struct{}
92}
93
94//NewOnuDevice creates a new Onu Device
95func NewOnuDevice(devID, deviceTp, serialNum string, onuID, intfID uint32, proxyDevID string) *OnuDevice {
96 var device OnuDevice
97 device.deviceID = devID
98 device.deviceType = deviceTp
99 device.serialNumber = serialNum
100 device.onuID = onuID
101 device.intfID = intfID
102 device.proxyDeviceID = proxyDevID
103 device.uniPorts = make(map[uint32]struct{})
104 return &device
105}
106*/
107
108//NewDeviceHandler creates a new device handler
109func NewDeviceHandler(cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep adapterif.EventProxy, device *voltha.Device, adapter *OpenONUAC) *DeviceHandler {
110 var dh DeviceHandler
111 dh.coreProxy = cp
112 dh.AdapterProxy = ap
113 dh.EventProxy = ep
114 cloned := (proto.Clone(device)).(*voltha.Device)
115 dh.deviceID = cloned.Id
116 dh.DeviceType = cloned.Type
117 dh.adminState = "up"
118 dh.device = cloned
119 dh.pOpenOnuAc = adapter
120 dh.exitChannel = make(chan int, 1)
121 dh.lockDevice = sync.RWMutex{}
122 dh.stopCollector = make(chan bool, 2)
123 dh.stopHeartbeatCheck = make(chan bool, 2)
124 //dh.metrics = pmmetrics.NewPmMetrics(cloned.Id, pmmetrics.Frequency(150), pmmetrics.FrequencyOverride(false), pmmetrics.Grouped(false), pmmetrics.Metrics(pmNames))
125 dh.activePorts = sync.Map{}
126 //TODO initialize the support classes.
127 dh.uniEntityMap = make(map[uint16]*OnuUniPort)
128
129 // Device related state machine
130 dh.pDeviceStateFsm = fsm.NewFSM(
131 "null",
132 fsm.Events{
133 {Name: "DeviceInit", Src: []string{"null", "down"}, Dst: "init"},
134 {Name: "GrpcConnected", Src: []string{"init"}, Dst: "connected"},
135 {Name: "GrpcDisconnected", Src: []string{"connected", "down"}, Dst: "init"},
136 {Name: "DeviceUpInd", Src: []string{"connected", "down"}, Dst: "up"},
137 {Name: "DeviceDownInd", Src: []string{"up"}, Dst: "down"},
138 },
139 fsm.Callbacks{
140 "before_event": func(e *fsm.Event) { dh.logStateChange(e) },
141 "before_DeviceInit": func(e *fsm.Event) { dh.doStateInit(e) },
142 "after_DeviceInit": func(e *fsm.Event) { dh.postInit(e) },
143 "before_GrpcConnected": func(e *fsm.Event) { dh.doStateConnected(e) },
144 "before_GrpcDisconnected": func(e *fsm.Event) { dh.doStateInit(e) },
145 "after_GrpcDisconnected": func(e *fsm.Event) { dh.postInit(e) },
146 "before_DeviceUpInd": func(e *fsm.Event) { dh.doStateUp(e) },
147 "before_DeviceDownInd": func(e *fsm.Event) { dh.doStateDown(e) },
148 },
149 )
150 return &dh
151}
152
153// start save the device to the data model
154func (dh *DeviceHandler) Start(ctx context.Context) {
155 logger.Debugw("starting-device-handler", log.Fields{"device": dh.device, "deviceId": dh.deviceID})
156 // Add the initial device to the local model
157 logger.Debug("device-handler-started")
158}
159
160// stop stops the device dh. Not much to do for now
161func (dh *DeviceHandler) stop(ctx context.Context) {
162 logger.Debug("stopping-device-handler")
163 dh.exitChannel <- 1
164}
165
166// ##########################################################################################
167// DeviceHandler methods that implement the adapters interface requests ##### begin #########
168
169//AdoptDevice adopts the OLT device
170func (dh *DeviceHandler) AdoptDevice(ctx context.Context, device *voltha.Device) {
171 logger.Debugw("Adopt_device", log.Fields{"deviceID": device.Id, "Address": device.GetHostAndPort()})
172
173 logger.Debug("Device FSM: ", log.Fields{"state": string(dh.pDeviceStateFsm.Current())})
174 if dh.pDeviceStateFsm.Is("null") {
175 if err := dh.pDeviceStateFsm.Event("DeviceInit"); err != nil {
176 logger.Errorw("Device FSM: Can't go to state DeviceInit", log.Fields{"err": err})
177 }
178 logger.Debug("Device FSM: ", log.Fields{"state": string(dh.pDeviceStateFsm.Current())})
179 } else {
180 logger.Debug("AdoptDevice: Agent/device init already done")
181 }
182
183 /*
184 // Now, set the initial PM configuration for that device
185 if err := dh.coreProxy.DevicePMConfigUpdate(nil, dh.metrics.ToPmConfigs()); err != nil {
186 logger.Errorw("error-updating-PMs", log.Fields{"deviceId": device.Id, "error": err})
187 }
188
189 go startCollector(dh)
190 go startHeartbeatCheck(dh)
191 */
192}
193
194//ProcessInterAdapterMessage sends the proxied messages to the target device
195// If the proxy address is not found in the unmarshalled message, it first fetches the onu device for which the message
196// is meant, and then send the unmarshalled omci message to this onu
197func (dh *DeviceHandler) ProcessInterAdapterMessage(msg *ic.InterAdapterMessage) error {
198 msgID := msg.Header.Id
199 msgType := msg.Header.Type
200 fromTopic := msg.Header.FromTopic
201 toTopic := msg.Header.ToTopic
202 toDeviceID := msg.Header.ToDeviceId
203 proxyDeviceID := msg.Header.ProxyDeviceId
204 logger.Debugw("InterAdapter message header", log.Fields{"msgID": msgID, "msgType": msgType,
205 "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
206
207 switch msgType {
208 case ic.InterAdapterMessageType_OMCI_REQUEST:
209 {
210 /* TOBECHECKED: I assume, ONU Adapter receives the message hier already 'unmarshalled'? else: (howTo?)*/
211 msgBody := msg.GetBody()
212
213 omciMsg := &ic.InterAdapterOmciMessage{}
214 if err := ptypes.UnmarshalAny(msgBody, omciMsg); err != nil {
215 logger.Warnw("cannot-unmarshal-omci-msg-body", log.Fields{"error": err})
216 return err
217 }
218
219 //assuming omci message content is hex coded!
220 // with restricted output of 16(?) bytes would be ...omciMsg.Message[:16]
221 logger.Debugw("inter-adapter-recv-omci",
222 log.Fields{"RxOmciMessage": hex.EncodeToString(omciMsg.Message)})
223 //receive_message(omci_msg.message)
224 return dh.GetOnuDeviceEntry().PDevOmciCC.ReceiveMessage(context.TODO(), omciMsg.Message)
225 }
226 case ic.InterAdapterMessageType_ONU_IND_REQUEST:
227 {
228 /* TOBECHECKED: I assume, ONU Adapter receives the message hier already 'unmarshalled'? else: see above omci block */
229 msgBody := msg.GetBody()
230
231 onu_indication := &oop.OnuIndication{}
232 if err := ptypes.UnmarshalAny(msgBody, onu_indication); err != nil {
233 logger.Warnw("cannot-unmarshal-onu-indication-msg-body", log.Fields{"error": err})
234 return err
235 }
236
237 onu_operstate := onu_indication.GetOperState()
238 logger.Debugw("inter-adapter-recv-onu-ind", log.Fields{"OnuId": onu_indication.GetOnuId(),
239 "AdminState": onu_indication.GetAdminState(), "OperState": onu_operstate,
240 "SNR": onu_indication.GetSerialNumber()})
241
242 //interface related functioons might be error checked ....
243 if onu_operstate == "up" {
244 dh.create_interface(onu_indication)
245 } else if (onu_operstate == "down") || (onu_operstate == "unreachable") {
246 dh.update_interface(onu_indication)
247 } else {
248 logger.Errorw("unknown-onu-indication operState", log.Fields{"OnuId": onu_indication.GetOnuId()})
249 return errors.New("InvalidOperState")
250 }
251 }
252 default:
253 {
254 logger.Errorw("inter-adapter-unhandled-type", log.Fields{"msgType": msg.Header.Type})
255 return errors.New("unimplemented")
256 }
257 }
258
259 /* form py code:
260 elif request.header.type == InterAdapterMessageType.TECH_PROFILE_DOWNLOAD_REQUEST:
261 tech_msg = InterAdapterTechProfileDownloadMessage()
262 request.body.Unpack(tech_msg)
263 self.logger.debug('inter-adapter-recv-tech-profile', tech_msg=tech_msg)
264
265 self.load_and_configure_tech_profile(tech_msg.uni_id, tech_msg.path)
266
267 elif request.header.type == InterAdapterMessageType.DELETE_GEM_PORT_REQUEST:
268 del_gem_msg = InterAdapterDeleteGemPortMessage()
269 request.body.Unpack(del_gem_msg)
270 self.logger.debug('inter-adapter-recv-del-gem', gem_del_msg=del_gem_msg)
271
272 self.delete_tech_profile(uni_id=del_gem_msg.uni_id,
273 gem_port_id=del_gem_msg.gem_port_id,
274 tp_path=del_gem_msg.tp_path)
275
276 elif request.header.type == InterAdapterMessageType.DELETE_TCONT_REQUEST:
277 del_tcont_msg = InterAdapterDeleteTcontMessage()
278 request.body.Unpack(del_tcont_msg)
279 self.logger.debug('inter-adapter-recv-del-tcont', del_tcont_msg=del_tcont_msg)
280
281 self.delete_tech_profile(uni_id=del_tcont_msg.uni_id,
282 alloc_id=del_tcont_msg.alloc_id,
283 tp_path=del_tcont_msg.tp_path)
284 else:
285 self.logger.error("inter-adapter-unhandled-type", request=request)
286 */
287 return nil
288}
289
290// DeviceHandler methods that implement the adapters interface requests## end #########
291// #####################################################################################
292
293// ################ to be updated acc. needs of ONU Device ########################
294// DeviceHandler StateMachine related state transition methods ##### begin #########
295
296func (dh *DeviceHandler) logStateChange(e *fsm.Event) {
297 logger.Debugw("Device FSM: ", log.Fields{"event name": string(e.Event), "src state": string(e.Src), "dst state": string(e.Dst), "device-id": dh.deviceID})
298}
299
300// doStateInit provides the device update to the core
301func (dh *DeviceHandler) doStateInit(e *fsm.Event) {
302
303 logger.Debug("doStateInit-started")
304 var err error
305
306 /*
307 var err error
308 dh.clientCon, err = grpc.Dial(dh.device.GetHostAndPort(), grpc.WithInsecure(), grpc.WithBlock())
309 if err != nil {
310 logger.Errorw("Failed to dial device", log.Fields{"DeviceId": dh.deviceID, "HostAndPort": dh.device.GetHostAndPort(), "err": err})
311 return err
312 }
313 return nil
314 */
315
316 // populate what we know. rest comes later after mib sync
317 dh.device.Root = false
318 dh.device.Vendor = "OpenONU"
319 dh.device.Model = "go"
320 dh.device.Reason = "activating-onu"
321 dh.logicalDeviceID = dh.deviceID
322
323 dh.coreProxy.DeviceUpdate(context.TODO(), dh.device)
324
325 // store proxy parameters for later communication - assumption: invariant, else they have to be requested dynamically!!
326 dh.ProxyAddressID = dh.device.ProxyAddress.GetDeviceId()
327 dh.ProxyAddressType = dh.device.ProxyAddress.GetDeviceType()
328 logger.Debugw("device-updated", log.Fields{"deviceID": dh.deviceID, "proxyAddressID": dh.ProxyAddressID,
329 "proxyAddressType": dh.ProxyAddressType, "SNR": dh.device.SerialNumber,
330 "ParentId": dh.device.ParentId, "ParentPortNo": dh.device.ParentPortNo})
331
332 /*
333 self._pon = PonPort.create(self, self._pon_port_number)
334 self._pon.add_peer(self.parent_id, self._pon_port_number)
335 self.logger.debug('adding-pon-port-to-agent',
336 type=self._pon.get_port().type,
337 admin_state=self._pon.get_port().admin_state,
338 oper_status=self._pon.get_port().oper_status,
339 )
340 */
341 logger.Debug("adding-pon-port")
342 pPonPortNo := uint32(1)
343 if dh.device.ParentPortNo != 0 {
344 pPonPortNo = dh.device.ParentPortNo
345 }
346
347 pPonPort := &voltha.Port{
348 PortNo: pPonPortNo,
349 Label: fmt.Sprintf("pon-%d", pPonPortNo),
350 Type: voltha.Port_PON_ONU,
351 OperStatus: voltha.OperStatus_ACTIVE,
352 Peers: []*voltha.Port_PeerPort{{DeviceId: dh.device.ParentId, // Peer device is OLT
353 PortNo: dh.device.ParentPortNo}}, // Peer port is parent's port number
354 }
355 if err = dh.coreProxy.PortCreated(context.TODO(), dh.deviceID, pPonPort); err != nil {
356 logger.Fatalf("Device FSM: PortCreated-failed-%s", err)
357 e.Cancel(err)
358 return
359 }
360 logger.Debug("doStateInit-done")
361}
362
363// postInit setups the DeviceEntry for the conerned device
364func (dh *DeviceHandler) postInit(e *fsm.Event) {
365
366 logger.Debug("postInit-started")
367 var err error
368 /*
369 dh.Client = oop.NewOpenoltClient(dh.clientCon)
370 dh.pTransitionMap.Handle(ctx, GrpcConnected)
371 return nil
372 */
373 if err = dh.Add_OnuDeviceEntry(context.TODO()); err != nil {
374 logger.Fatalf("Device FSM: Add_OnuDeviceEntry-failed-%s", err)
375 e.Cancel(err)
376 return
377 }
378
379 /*
380 ############################################################################
381 # Setup Alarm handler
382 self.events = AdapterEvents(self.core_proxy, device.id, self.logical_device_id,
383 device.serial_number)
384 ############################################################################
385 # Setup PM configuration for this device
386 # Pass in ONU specific options
387 kwargs = {
388 OnuPmMetrics.DEFAULT_FREQUENCY_KEY: OnuPmMetrics.DEFAULT_ONU_COLLECTION_FREQUENCY,
389 'heartbeat': self.heartbeat,
390 OnuOmciPmMetrics.OMCI_DEV_KEY: self._onu_omci_device
391 }
392 self.logger.debug('create-pm-metrics', device_id=device.id, serial_number=device.serial_number)
393 self._pm_metrics = OnuPmMetrics(self.events, self.core_proxy, self.device_id,
394 self.logical_device_id, device.serial_number,
395 grouped=True, freq_override=False, **kwargs)
396 pm_config = self._pm_metrics.make_proto()
397 self._onu_omci_device.set_pm_config(self._pm_metrics.omci_pm.openomci_interval_pm)
398 self.logger.info("initial-pm-config", device_id=device.id, serial_number=device.serial_number)
399 yield self.core_proxy.device_pm_config_update(pm_config, init=True)
400
401 # Note, ONU ID and UNI intf set in add_uni_port method
402 self._onu_omci_device.alarm_synchronizer.set_alarm_params(mgr=self.events,
403 ani_ports=[self._pon])
404
405 # Code to Run OMCI Test Action
406 kwargs_omci_test_action = {
407 OmciTestRequest.DEFAULT_FREQUENCY_KEY:
408 OmciTestRequest.DEFAULT_COLLECTION_FREQUENCY
409 }
410 serial_number = device.serial_number
411 self._test_request = OmciTestRequest(self.core_proxy,
412 self.omci_agent, self.device_id,
413 AniG, serial_number,
414 self.logical_device_id,
415 exclusive=False,
416 **kwargs_omci_test_action)
417
418 self.enabled = True
419 else:
420 self.logger.info('onu-already-activated')
421 */
422 logger.Debug("postInit-done")
423}
424
425// doStateConnected get the device info and update to voltha core
426// for comparison of the original method (not that easy to uncomment): compare here:
427// voltha-openolt-adapter/adaptercore/device_handler.go
428// -> this one obviously initiates all communication interfaces of the device ...?
429func (dh *DeviceHandler) doStateConnected(e *fsm.Event) {
430
431 logger.Debug("doStateConnected-started")
432 var err error
433 err = errors.New("Device FSM: function not implemented yet!")
434 e.Cancel(err)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000435 logger.Debug("doStateConnected-done")
Matteo Scandolod132c0e2020-04-24 17:06:25 -0700436 return
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000437}
438
439// doStateUp handle the onu up indication and update to voltha core
440func (dh *DeviceHandler) doStateUp(e *fsm.Event) {
441
442 logger.Debug("doStateUp-started")
443 var err error
444 err = errors.New("Device FSM: function not implemented yet!")
445 e.Cancel(err)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000446 logger.Debug("doStateUp-done")
Matteo Scandolod132c0e2020-04-24 17:06:25 -0700447 return
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000448
449 /*
450 // Synchronous call to update device state - this method is run in its own go routine
451 if err := dh.coreProxy.DeviceStateUpdate(ctx, dh.device.Id, voltha.ConnectStatus_REACHABLE,
452 voltha.OperStatus_ACTIVE); err != nil {
453 logger.Errorw("Failed to update device with OLT UP indication", log.Fields{"deviceID": dh.device.Id, "error": err})
454 return err
455 }
456 return nil
457 */
458}
459
460// doStateDown handle the onu down indication
461func (dh *DeviceHandler) doStateDown(e *fsm.Event) {
462
463 logger.Debug("doStateDown-started")
464 var err error
465
466 device, err := dh.coreProxy.GetDevice(context.TODO(), dh.device.Id, dh.device.Id)
467 if err != nil || device == nil {
468 /*TODO: needs to handle error scenarios */
469 logger.Errorw("Failed to fetch device device", log.Fields{"err": err})
470 e.Cancel(err)
471 return
472 }
473
474 cloned := proto.Clone(device).(*voltha.Device)
475 logger.Debugw("do-state-down", log.Fields{"ClonedDeviceID": cloned.Id})
476 /*
477 // Update the all ports state on that device to disable
478 if er := dh.coreProxy.PortsStateUpdate(ctx, cloned.Id, voltha.OperStatus_UNKNOWN); er != nil {
479 logger.Errorw("updating-ports-failed", log.Fields{"deviceID": device.Id, "error": er})
480 return er
481 }
482
483 //Update the device oper state and connection status
484 cloned.OperStatus = voltha.OperStatus_UNKNOWN
485 cloned.ConnectStatus = common.ConnectStatus_UNREACHABLE
486 dh.device = cloned
487
488 if er := dh.coreProxy.DeviceStateUpdate(ctx, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); er != nil {
489 logger.Errorw("error-updating-device-state", log.Fields{"deviceID": device.Id, "error": er})
490 return er
491 }
492
493 //get the child device for the parent device
494 onuDevices, err := dh.coreProxy.GetChildDevices(ctx, dh.device.Id)
495 if err != nil {
496 logger.Errorw("failed to get child devices information", log.Fields{"deviceID": dh.device.Id, "error": err})
497 return err
498 }
499 for _, onuDevice := range onuDevices.Items {
500
501 // Update onu state as down in onu adapter
502 onuInd := oop.OnuIndication{}
503 onuInd.OperState = "down"
504 er := dh.AdapterProxy.SendInterAdapterMessage(ctx, &onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST,
505 "openolt", onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
506 if er != nil {
507 logger.Errorw("Failed to send inter-adapter-message", log.Fields{"OnuInd": onuInd,
508 "From Adapter": "openolt", "DevieType": onuDevice.Type, "DeviceID": onuDevice.Id})
509 //Do not return here and continue to process other ONUs
510 }
511 }
512 // * Discovered ONUs entries need to be cleared , since after OLT
513 // is up, it starts sending discovery indications again* /
514 dh.discOnus = sync.Map{}
515 logger.Debugw("do-state-down-end", log.Fields{"deviceID": device.Id})
516 return nil
517 */
518 err = errors.New("Device FSM: function not implemented yet!")
519 e.Cancel(err)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000520 logger.Debug("doStateDown-done")
Matteo Scandolod132c0e2020-04-24 17:06:25 -0700521 return
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000522}
523
524// DeviceHandler StateMachine related state transition methods ##### end #########
525// #################################################################################
526
527// ###################################################
528// DeviceHandler utility methods ##### begin #########
529
530// Get ONU device entry for this deviceId specific handler
531func (dh *DeviceHandler) GetOnuDeviceEntry() *OnuDeviceEntry {
532 dh.lockDevice.Lock()
533 defer dh.lockDevice.Unlock()
534 if dh.pOnuOmciDevice != nil {
535 logger.Debugw("GetOnuDeviceEntry params:",
536 log.Fields{"onu_device_entry": dh.pOnuOmciDevice, "device_id": dh.pOnuOmciDevice.deviceID,
537 "device_handler": dh.pOnuOmciDevice.baseDeviceHandler, "core_proxy": dh.pOnuOmciDevice.coreProxy, "adapter_proxy": dh.pOnuOmciDevice.adapterProxy})
538 } else {
539 logger.Error("GetOnuDeviceEntry returns nil")
540 }
541 return dh.pOnuOmciDevice
542}
543
544// Set ONU device entry
545func (dh *DeviceHandler) SetOnuDeviceEntry(pDeviceEntry *OnuDeviceEntry) error {
546 dh.lockDevice.Lock()
547 defer dh.lockDevice.Unlock()
548 dh.pOnuOmciDevice = pDeviceEntry
549 return nil
550}
551
552//creates a new ONU device or returns the existing
553func (dh *DeviceHandler) Add_OnuDeviceEntry(ctx context.Context) error {
554 logger.Debugw("adding-deviceEntry", log.Fields{"for deviceId": dh.deviceID})
555
556 deviceEntry := dh.GetOnuDeviceEntry()
557 if deviceEntry == nil {
558 /* costum_me_map in python code seems always to be None,
559 we omit that here first (declaration unclear) -> todo at Adapter specialization ...*/
560 /* also no 'clock' argument - usage open ...*/
561 /* and no alarm_db yet (oo.alarm_db) */
562 deviceEntry = NewOnuDeviceEntry(ctx, dh.deviceID, dh, dh.coreProxy, dh.AdapterProxy,
563 dh.pOpenOnuAc.pSupportedFsms) //nil as FSM pointer would yield deviceEntry internal defaults ...
564 //error treatment possible //TODO!!!
565 dh.SetOnuDeviceEntry(deviceEntry)
566 logger.Infow("onuDeviceEntry-added", log.Fields{"for deviceId": dh.deviceID})
567 } else {
568 logger.Infow("onuDeviceEntry-add: Device already exists", log.Fields{"for deviceId": dh.deviceID})
569 }
570 // might be updated with some error handling !!!
571 return nil
572}
573
574// doStateInit provides the device update to the core
575func (dh *DeviceHandler) create_interface(onuind *oop.OnuIndication) error {
576 logger.Debug("create_interface-started - not yet fully implemented (only device state update)")
577
578 if err := dh.coreProxy.DeviceStateUpdate(context.TODO(), dh.deviceID, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVATING); err != nil {
579 logger.Errorw("error-updating-device-state", log.Fields{"deviceID": dh.deviceID, "error": err})
580 }
581
582 device, err := dh.coreProxy.GetDevice(context.TODO(), dh.device.Id, dh.device.Id)
583 if err != nil || device == nil {
584 /*TODO: needs to handle error scenarios */
585 logger.Errorw("Failed to fetch device device at creating If", log.Fields{"err": err})
586 return errors.New("Voltha Device not found")
587 }
588
589 dh.GetOnuDeviceEntry().Start(context.TODO())
590 if err := dh.coreProxy.DeviceReasonUpdate(context.TODO(), dh.deviceID, "starting-openomci"); err != nil {
591 logger.Errorw("error-DeviceReasonUpdate to starting-openomci", log.Fields{"deviceID": dh.deviceID, "error": err})
592 }
593
594 /* this might be a good time for Omci Verify message? */
595 verifyExec := make(chan bool)
596 omci_verify := NewOmciTestRequest(context.TODO(),
597 dh.device.Id, dh.GetOnuDeviceEntry().PDevOmciCC,
598 true, true) //eclusive and allowFailure (anyway not yet checked)
599 omci_verify.PerformOmciTest(context.TODO(), verifyExec)
600
601 /* give the handler some time here to wait for the OMCi verification result
602 after Timeout start and try MibUpload FSM anyway
603 (to prevent stopping on just not supported OMCI verification from ONU) */
604 select {
605 case <-time.After(2 * time.Second):
606 logger.Warn("omci start-verification timed out (continue normal)")
607 case testresult := <-verifyExec:
608 logger.Infow("Omci start verification done", log.Fields{"result": testresult})
609 }
610
611 /* In py code it looks earlier (on activate ..)
612 # Code to Run OMCI Test Action
613 kwargs_omci_test_action = {
614 OmciTestRequest.DEFAULT_FREQUENCY_KEY:
615 OmciTestRequest.DEFAULT_COLLECTION_FREQUENCY
616 }
617 serial_number = device.serial_number
618 self._test_request = OmciTestRequest(self.core_proxy,
619 self.omci_agent, self.device_id,
620 AniG, serial_number,
621 self.logical_device_id,
622 exclusive=False,
623 **kwargs_omci_test_action)
624 ...
625 # Start test requests after a brief pause
626 if not self._test_request_started:
627 self._test_request_started = True
628 tststart = _STARTUP_RETRY_WAIT * (random.randint(1, 5))
629 reactor.callLater(tststart, self._test_request.start_collector)
630
631 */
632 /* which is then: in omci_test_request.py : */
633 /*
634 def start_collector(self, callback=None):
635 """
636 Start the collection loop for an adapter if the frequency > 0
637
638 :param callback: (callable) Function to call to collect PM data
639 """
640 self.logger.info("starting-pm-collection", device_name=self.name, default_freq=self.default_freq)
641 if callback is None:
642 callback = self.perform_test_omci
643
644 if self.lc is None:
645 self.lc = LoopingCall(callback)
646
647 if self.default_freq > 0:
648 self.lc.start(interval=self.default_freq / 10)
649
650 def perform_test_omci(self):
651 """
652 Perform the initial test request
653 """
654 ani_g_entities = self._device.configuration.ani_g_entities
655 ani_g_entities_ids = list(ani_g_entities.keys()) if ani_g_entities \
656 is not None else None
657 self._entity_id = ani_g_entities_ids[0]
658 self.logger.info('perform-test', entity_class=self._entity_class,
659 entity_id=self._entity_id)
660 try:
661 frame = MEFrame(self._entity_class, self._entity_id, []).test()
662 result = yield self._device.omci_cc.send(frame)
663 if not result.fields['omci_message'].fields['success_code']:
664 self.logger.info('Self-Test Submitted Successfully',
665 code=result.fields[
666 'omci_message'].fields['success_code'])
667 else:
668 raise TestFailure('Test Failure: {}'.format(
669 result.fields['omci_message'].fields['success_code']))
670 except TimeoutError as e:
671 self.deferred.errback(failure.Failure(e))
672
673 except Exception as e:
674 self.logger.exception('perform-test-Error', e=e,
675 class_id=self._entity_class,
676 entity_id=self._entity_id)
677 self.deferred.errback(failure.Failure(e))
678
679 */
680
681 // PM related heartbeat??? !!!TODO....
682 //self._heartbeat.enabled = True
683
684 //example how to call FSM - transition up to state "uploading"
685 if dh.GetOnuDeviceEntry().MibSyncFsm.Is("disabled") {
686
687 if err := dh.GetOnuDeviceEntry().MibSyncFsm.Event("start"); err != nil {
688 logger.Errorw("MibSyncFsm: Can't go to state starting", log.Fields{"err": err})
689 return errors.New("Can't go to state starting")
690 } else {
691 logger.Debug("MibSyncFsm", log.Fields{"state": string(dh.GetOnuDeviceEntry().MibSyncFsm.Current())})
692 //Determine ONU status and start/re-start MIB Synchronization tasks
693 //Determine if this ONU has ever synchronized
694 if true { //TODO: insert valid check
695 if err := dh.GetOnuDeviceEntry().MibSyncFsm.Event("load_mib_template"); err != nil {
696 logger.Errorw("MibSyncFsm: Can't go to state loading_mib_template", log.Fields{"err": err})
697 return errors.New("Can't go to state loading_mib_template")
698 } else {
699 logger.Debug("MibSyncFsm", log.Fields{"state": string(dh.GetOnuDeviceEntry().MibSyncFsm.Current())})
700 //Find and load a mib template. If not found proceed with mib_upload
701 // callbacks to be handled:
702 // Event("success")
703 // Event("timeout")
704 //no mib template found
705 if true { //TODO: insert valid check
706 if err := dh.GetOnuDeviceEntry().MibSyncFsm.Event("upload_mib"); err != nil {
707 logger.Errorw("MibSyncFsm: Can't go to state uploading", log.Fields{"err": err})
708 return errors.New("Can't go to state uploading")
709 } else {
710 logger.Debug("state of MibSyncFsm", log.Fields{"state": string(dh.GetOnuDeviceEntry().MibSyncFsm.Current())})
711 //Begin full MIB data upload, starting with a MIB RESET
712 // callbacks to be handled:
713 // success: e.Event("success")
714 // failure: e.Event("timeout")
715 }
716 }
717 }
718 } else {
719 dh.GetOnuDeviceEntry().MibSyncFsm.Event("examine_mds")
720 logger.Debug("state of MibSyncFsm", log.Fields{"state": string(dh.GetOnuDeviceEntry().MibSyncFsm.Current())})
721 //Examine the MIB Data Sync
722 // callbacks to be handled:
723 // Event("success")
724 // Event("timeout")
725 // Event("mismatch")
726 }
727 }
728 } else {
729 logger.Errorw("wrong state of MibSyncFsm - want: disabled", log.Fields{"have": string(dh.GetOnuDeviceEntry().MibSyncFsm.Current())})
730 return errors.New("wrong state of MibSyncFsm")
731 }
732 return nil
733}
734
735func (dh *DeviceHandler) update_interface(onuind *oop.OnuIndication) error {
736 logger.Debug("update_interface-started - not yet implemented")
737 return nil
738}
739
740func (dh *DeviceHandler) DeviceStateUpdate(dev_Event OnuDeviceEvent) {
741 if dev_Event == MibDatabaseSync {
742 logger.Debug("MibInSync event: update dev state to 'MibSync complete'")
743 //initiate DevStateUpdate
744 if err := dh.coreProxy.DeviceReasonUpdate(context.TODO(), dh.deviceID, "discovery-mibsync-complete"); err != nil {
745 logger.Errorw("error-DeviceReasonUpdate to 'mibsync-complete'", log.Fields{"deviceID": dh.deviceID, "error": err})
746 }
747
748 // fixed assumption about PPTP/UNI-G ONU-config
749 // to be replaced by DB parsing of MibUpload data TODO!!!
750 // parameters are: InstanceNo, running UniNo, type
751 dh.addUniPort(257, 0, UniPPTP)
752 dh.addUniPort(258, 1, UniPPTP)
753 dh.addUniPort(259, 2, UniPPTP)
754 dh.addUniPort(260, 3, UniPPTP)
755
756 // start the MibDownload (assumed here to be done via some FSM again - open //TODO!!!)
757 /* the mib-download code may look something like that:
758 if err := dh.GetOnuDeviceEntry().MibDownloadFsm.Event("start"); err != nil {
759 logger.Errorw("MibDownloadFsm: Can't go to state starting", log.Fields{"err": err})
760 return errors.New("Can't go to state starting")
761 } else {
762 logger.Debug("MibDownloadFsm", log.Fields{"state": string(dh.GetOnuDeviceEntry().MibDownloadFsm.Current())})
763 //Determine ONU status and start/re-start MIB MibDownloadFsm
764 //Determine if this ONU has ever synchronized
765 if true { //TODO: insert valid check
766 if err := dh.GetOnuDeviceEntry().MibSyncFsm.Event("download_mib"); err != nil {
767 logger.Errorw("MibDownloadFsm: Can't go to state 'download_mib'", log.Fields{"err": err})
768 return errors.New("Can't go to state 'download_mib'")
769 } else {
770 //some further processing ???
771 logger.Debug("state of MibDownloadFsm", log.Fields{"state": string(dh.GetOnuDeviceEntry().MibDownloadFsm.Current())})
772 //some further processing ???
773 }
774 }
775 }
776 but by now we shortcut the download here and immediately fake the ONU-active state to get the state indication on ONUS!!!:
777 */
778 //shortcut code to fake download-done!!!:
779 go dh.GetOnuDeviceEntry().transferSystemEvent(MibDownloadDone)
780 } else if dev_Event == MibDownloadDone {
781 logger.Debug("MibDownloadDone event: update dev state to 'Oper.Active'")
782 //initiate DevStateUpdate
783 if err := dh.coreProxy.DeviceStateUpdate(context.TODO(), dh.deviceID,
784 voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE); err != nil {
785 logger.Errorw("error-updating-device-state", log.Fields{"deviceID": dh.deviceID, "error": err})
786 }
787 logger.Debug("MibDownloadDone Event: update dev reasone to 'initial-mib-downloaded'")
788 if err := dh.coreProxy.DeviceReasonUpdate(context.TODO(), dh.deviceID, "initial-mib-downloaded"); err != nil {
789 logger.Errorw("error-DeviceReasonUpdate to 'initial-mib-downloaded'",
790 log.Fields{"deviceID": dh.deviceID, "error": err})
791 }
792
793 //TODO !!! following activities according to python code:
794 /*
795 yield self.enable_ports()
796 self._mib_download_task = None
797 yield self.onu_active_event() -> with 'OnuActiveEvent' !!! might be this is required for ONOS visibility??
798 */
799 } else {
800 logger.Warnw("unhandled-device-event", log.Fields{"event": dev_Event})
801 }
802}
803
804func (dh *DeviceHandler) addUniPort(a_uniInstNo uint16, a_uniId uint16, a_portType UniPortType) {
805 if _, present := dh.uniEntityMap[a_uniInstNo]; present {
806 logger.Warnw("onuUniPort-add: Port already exists", log.Fields{"for InstanceId": a_uniInstNo})
807 } else {
808 //TODO: need to find the ONU intfId and OnuId, using hard coded values for single ONU test!!!
809 // parameters are IntfId, OnuId, uniId
810 uni_no := MkUniPortNum(0, 1, uint32(a_uniId))
811 //with arguments a_uniId, a_portNo, a_portType
812 pUniPort := NewOnuUniPort(a_uniId, uni_no, a_uniInstNo, a_portType)
813 if pUniPort == nil {
814 logger.Warnw("onuUniPort-add: Could not create Port", log.Fields{"for InstanceId": a_uniInstNo})
815 } else {
816 dh.uniEntityMap[a_uniInstNo] = pUniPort
817 // create announce the UniPort to the core as VOLTHA Port object
818 if err := pUniPort.CreateVolthaPort(dh); err != nil {
819 logger.Infow("onuUniPort-added", log.Fields{"for InstanceId": a_uniInstNo})
820 } //error looging already within UniPort method
821 }
822 }
823}