blob: 9d00d5abe7092c49b0d8e0cff09cd318d9185d12 [file] [log] [blame]
Takahiro Suzuki241c10e2020-12-17 20:17:57 +09001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//Package adaptercoreonu provides the utility for onu devices, flows and statistics
18package adaptercoreonu
19
20import (
21 "context"
22 "encoding/hex"
23 "errors"
24 "fmt"
25 "strconv"
26 "sync"
27 "time"
28
29 "github.com/gogo/protobuf/proto"
30 "github.com/golang/protobuf/ptypes"
31 "github.com/looplab/fsm"
32 me "github.com/opencord/omci-lib-go/generated"
33 "github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
34 "github.com/opencord/voltha-lib-go/v3/pkg/db"
35 flow "github.com/opencord/voltha-lib-go/v3/pkg/flows"
36 "github.com/opencord/voltha-lib-go/v3/pkg/log"
37 vc "github.com/opencord/voltha-protos/v3/go/common"
38 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
39 "github.com/opencord/voltha-protos/v3/go/openflow_13"
40 of "github.com/opencord/voltha-protos/v3/go/openflow_13"
41 ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
42 oop "github.com/opencord/voltha-protos/v3/go/openolt"
43 "github.com/opencord/voltha-protos/v3/go/voltha"
44)
45
46/*
47// Constants for number of retries and for timeout
48const (
49 MaxRetry = 10
50 MaxTimeOutInMs = 500
51)
52*/
53
54const (
55 devEvDeviceInit = "devEvDeviceInit"
56 devEvGrpcConnected = "devEvGrpcConnected"
57 devEvGrpcDisconnected = "devEvGrpcDisconnected"
58 devEvDeviceUpInd = "devEvDeviceUpInd"
59 devEvDeviceDownInd = "devEvDeviceDownInd"
60)
61const (
62 devStNull = "devStNull"
63 devStDown = "devStDown"
64 devStInit = "devStInit"
65 devStConnected = "devStConnected"
66 devStUp = "devStUp"
67)
68
69//Event category and subcategory definitions - same as defiend for OLT in eventmgr.go - should be done more centrally
70const (
71 pon = voltha.EventSubCategory_PON
72 equipment = voltha.EventCategory_EQUIPMENT
73)
74
75const (
76 cEventObjectType = "ONU"
77)
78const (
79 cOnuActivatedEvent = "ONU_ACTIVATED"
80)
81
82//deviceHandler will interact with the ONU ? device.
83type deviceHandler struct {
84 deviceID string
85 DeviceType string
86 adminState string
87 device *voltha.Device
88 logicalDeviceID string
89 ProxyAddressID string
90 ProxyAddressType string
91 parentID string
92 ponPortNumber uint32
93
94 coreProxy adapterif.CoreProxy
95 AdapterProxy adapterif.AdapterProxy
96 EventProxy adapterif.EventProxy
97
98 pOpenOnuAc *OpenONUAC
99 pDeviceStateFsm *fsm.FSM
100 deviceEntrySet chan bool //channel for DeviceEntry set event
101 pOnuOmciDevice *OnuDeviceEntry
102 pOnuTP *onuUniTechProf
103 exitChannel chan int
104 lockDevice sync.RWMutex
105 pOnuIndication *oop.OnuIndication
106 deviceReason string
107 pLockStateFsm *lockStateFsm
108 pUnlockStateFsm *lockStateFsm
109
110
111 stopCollector chan bool
112 stopHeartbeatCheck chan bool
113 activePorts sync.Map
114 uniEntityMap map[uint32]*onuUniPort
115 UniVlanConfigFsmMap map[uint8]*UniVlanConfigFsm
116 reconciling bool
117}
118
119//newDeviceHandler creates a new device handler
120func newDeviceHandler(cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep adapterif.EventProxy, device *voltha.Device, adapter *OpenONUAC) *deviceHandler {
121 var dh deviceHandler
122 dh.coreProxy = cp
123 dh.AdapterProxy = ap
124 dh.EventProxy = ep
125 cloned := (proto.Clone(device)).(*voltha.Device)
126 dh.deviceID = cloned.Id
127 dh.DeviceType = cloned.Type
128 dh.adminState = "up"
129 dh.device = cloned
130 dh.pOpenOnuAc = adapter
131 dh.exitChannel = make(chan int, 1)
132 dh.lockDevice = sync.RWMutex{}
133 dh.deviceEntrySet = make(chan bool, 1)
134 dh.stopCollector = make(chan bool, 2)
135 dh.stopHeartbeatCheck = make(chan bool, 2)
136 dh.activePorts = sync.Map{}
137 dh.uniEntityMap = make(map[uint32]*onuUniPort)
138 dh.UniVlanConfigFsmMap = make(map[uint8]*UniVlanConfigFsm)
139 dh.reconciling = false
140
141 dh.pDeviceStateFsm = fsm.NewFSM(
142 devStNull,
143 fsm.Events{
144 {Name: devEvDeviceInit, Src: []string{devStNull, devStDown}, Dst: devStInit},
145 {Name: devEvGrpcConnected, Src: []string{devStInit}, Dst: devStConnected},
146 {Name: devEvGrpcDisconnected, Src: []string{devStConnected, devStDown}, Dst: devStInit},
147 {Name: devEvDeviceUpInd, Src: []string{devStConnected, devStDown}, Dst: devStUp},
148 {Name: devEvDeviceDownInd, Src: []string{devStUp}, Dst: devStDown},
149 },
150 fsm.Callbacks{
151 "before_event": func(e *fsm.Event) { dh.logStateChange(e) },
152 ("before_" + devEvDeviceInit): func(e *fsm.Event) { dh.doStateInit(e) },
153 ("after_" + devEvDeviceInit): func(e *fsm.Event) { dh.postInit(e) },
154 ("before_" + devEvGrpcConnected): func(e *fsm.Event) { dh.doStateConnected(e) },
155 ("before_" + devEvGrpcDisconnected): func(e *fsm.Event) { dh.doStateInit(e) },
156 ("after_" + devEvGrpcDisconnected): func(e *fsm.Event) { dh.postInit(e) },
157 ("before_" + devEvDeviceUpInd): func(e *fsm.Event) { dh.doStateUp(e) },
158 ("before_" + devEvDeviceDownInd): func(e *fsm.Event) { dh.doStateDown(e) },
159 },
160 )
161
162 return &dh
163}
164
165// start save the device to the data model
166func (dh *deviceHandler) start(ctx context.Context) {
167 logger.Debugw("starting-device-handler", log.Fields{"device": dh.device, "device-id": dh.deviceID})
168 logger.Debug("device-handler-started")
169}
170
171/*
172// stop stops the device dh. Not much to do for now
173func (dh *deviceHandler) stop(ctx context.Context) {
174 logger.Debug("stopping-device-handler")
175 dh.exitChannel <- 1
176}
177*/
178
179// ##########################################################################################
180// deviceHandler methods that implement the adapters interface requests ##### begin #########
181
182//adoptOrReconcileDevice adopts the OLT device
183func (dh *deviceHandler) adoptOrReconcileDevice(ctx context.Context, device *voltha.Device) {
184 logger.Debugw("Adopt_or_reconcile_device", log.Fields{"device-id": device.Id, "Address": device.GetHostAndPort()})
185
186 logger.Debugw("Device FSM: ", log.Fields{"state": string(dh.pDeviceStateFsm.Current())})
187 if dh.pDeviceStateFsm.Is(devStNull) {
188 if err := dh.pDeviceStateFsm.Event(devEvDeviceInit); err != nil {
189 logger.Errorw("Device FSM: Can't go to state DeviceInit", log.Fields{"err": err})
190 }
191 logger.Debugw("Device FSM: ", log.Fields{"state": string(dh.pDeviceStateFsm.Current())})
192 } else {
193 logger.Debugw("AdoptOrReconcileDevice: Agent/device init already done", log.Fields{"device-id": device.Id})
194 }
195
196}
197
198func (dh *deviceHandler) processInterAdapterOMCIReqMessage(msg *ic.InterAdapterMessage) error {
199 msgBody := msg.GetBody()
200 omciMsg := &ic.InterAdapterOmciMessage{}
201 if err := ptypes.UnmarshalAny(msgBody, omciMsg); err != nil {
202 logger.Warnw("cannot-unmarshal-omci-msg-body", log.Fields{
203 "device-id": dh.deviceID, "error": err})
204 return err
205 }
206
207 logger.Debugw("inter-adapter-recv-omci", log.Fields{
208 "device-id": dh.deviceID, "RxOmciMessage": hex.EncodeToString(omciMsg.Message)})
209 pDevEntry := dh.getOnuDeviceEntry(true)
210 if pDevEntry != nil {
211 return pDevEntry.PDevOmciCC.receiveMessage(context.TODO(), omciMsg.Message)
212 }
213 logger.Errorw("No valid OnuDevice -aborting", log.Fields{"device-id": dh.deviceID})
214 return errors.New("no valid OnuDevice")
215}
216
217func (dh *deviceHandler) processInterAdapterONUIndReqMessage(msg *ic.InterAdapterMessage) error {
218 msgBody := msg.GetBody()
219 onuIndication := &oop.OnuIndication{}
220 if err := ptypes.UnmarshalAny(msgBody, onuIndication); err != nil {
221 logger.Warnw("cannot-unmarshal-onu-indication-msg-body", log.Fields{
222 "device-id": dh.deviceID, "error": err})
223 return err
224 }
225
226 onuOperstate := onuIndication.GetOperState()
227 logger.Debugw("inter-adapter-recv-onu-ind", log.Fields{"OnuId": onuIndication.GetOnuId(),
228 "AdminState": onuIndication.GetAdminState(), "OperState": onuOperstate,
229 "SNR": onuIndication.GetSerialNumber()})
230
231 if onuOperstate == "up" {
232 _ = dh.createInterface(onuIndication)
233 } else if (onuOperstate == "down") || (onuOperstate == "unreachable") {
234 _ = dh.updateInterface(onuIndication)
235 } else {
236 logger.Errorw("unknown-onu-indication operState", log.Fields{"OnuId": onuIndication.GetOnuId()})
237 return errors.New("invalidOperState")
238 }
239 return nil
240}
241
242func (dh *deviceHandler) processInterAdapterTechProfileDownloadReqMessage(
243 msg *ic.InterAdapterMessage) error {
244 if dh.pOnuTP == nil {
245 //should normally not happen ...
246 logger.Warnw("onuTechProf instance not set up for DLMsg request - ignoring request",
247 log.Fields{"device-id": dh.deviceID})
248 return errors.New("techProfile DLMsg request while onuTechProf instance not setup")
249 }
250 if (dh.deviceReason == "stopping-openomci") || (dh.deviceReason == "omci-admin-lock") {
251 // I've seen cases for this request, where the device was already stopped
252 logger.Warnw("TechProf stopped: device-unreachable", log.Fields{"device-id": dh.deviceID})
253 return errors.New("device-unreachable")
254 }
255
256 msgBody := msg.GetBody()
257 techProfMsg := &ic.InterAdapterTechProfileDownloadMessage{}
258 if err := ptypes.UnmarshalAny(msgBody, techProfMsg); err != nil {
259 logger.Warnw("cannot-unmarshal-techprof-msg-body", log.Fields{
260 "device-id": dh.deviceID, "error": err})
261 return err
262 }
263
264 dh.pOnuTP.lockTpProcMutex()
265 if bTpModify := dh.pOnuTP.updateOnuUniTpPath(techProfMsg.UniId, techProfMsg.Path); bTpModify {
266 // if there has been some change for some uni TechProfilePath
267 //in order to allow concurrent calls to other dh instances we do not wait for execution here
268 //but doing so we can not indicate problems to the caller (who does what with that then?)
269 //by now we just assume straightforward successful execution
270 //TODO!!! Generally: In this scheme it would be good to have some means to indicate
271 // possible problems to the caller later autonomously
272
273 // deadline context to ensure completion of background routines waited for
274 //20200721: 10s proved to be less in 8*8 ONU test on local vbox machine with debug, might be further adapted
275 deadline := time.Now().Add(30 * time.Second) //allowed run time to finish before execution
276 dctx, cancel := context.WithDeadline(context.Background(), deadline)
277
278 dh.pOnuTP.resetProcessingErrorIndication()
279 var wg sync.WaitGroup
280 wg.Add(2) // for the 2 go routines to finish
281 // attention: deadline completion check and wg.Done is to be done in both routines
282 go dh.pOnuTP.configureUniTp(dctx, uint8(techProfMsg.UniId), techProfMsg.Path, &wg)
283 go dh.pOnuTP.updateOnuTpPathKvStore(dctx, &wg)
284 //the wait.. function is responsible for tpProcMutex.Unlock()
285 err := dh.pOnuTP.waitForTpCompletion(cancel, &wg) //wait for background process to finish and collect their result
286 return err
287 }
288 dh.pOnuTP.unlockTpProcMutex()
289 return nil
290}
291
292func (dh *deviceHandler) processInterAdapterDeleteGemPortReqMessage(
293 msg *ic.InterAdapterMessage) error {
294
295 if dh.pOnuTP == nil {
296 //should normally not happen ...
297 logger.Warnw("onuTechProf instance not set up for DelGem request - ignoring request",
298 log.Fields{"device-id": dh.deviceID})
299 return errors.New("techProfile DelGem request while onuTechProf instance not setup")
300 }
301
302 msgBody := msg.GetBody()
303 delGemPortMsg := &ic.InterAdapterDeleteGemPortMessage{}
304 if err := ptypes.UnmarshalAny(msgBody, delGemPortMsg); err != nil {
305 logger.Warnw("cannot-unmarshal-delete-gem-msg-body", log.Fields{
306 "device-id": dh.deviceID, "error": err})
307 return err
308 }
309
310 dh.pOnuTP.lockTpProcMutex()
311
312 deadline := time.Now().Add(10 * time.Second) //allowed run time to finish before execution
313 dctx, cancel := context.WithDeadline(context.Background(), deadline)
314
315 dh.pOnuTP.resetProcessingErrorIndication()
316 var wg sync.WaitGroup
317 wg.Add(1) // for the 1 go routine to finish
318 go dh.pOnuTP.deleteTpResource(dctx, delGemPortMsg.UniId, delGemPortMsg.TpPath,
319 cResourceGemPort, delGemPortMsg.GemPortId, &wg)
320 err := dh.pOnuTP.waitForTpCompletion(cancel, &wg) //let that also run off-line to let the IA messaging return!
321 return err
322}
323
324func (dh *deviceHandler) processInterAdapterDeleteTcontReqMessage(
325 msg *ic.InterAdapterMessage) error {
326 if dh.pOnuTP == nil {
327 //should normally not happen ...
328 logger.Warnw("onuTechProf instance not set up for DelTcont request - ignoring request",
329 log.Fields{"device-id": dh.deviceID})
330 return errors.New("techProfile DelTcont request while onuTechProf instance not setup")
331 }
332
333 msgBody := msg.GetBody()
334 delTcontMsg := &ic.InterAdapterDeleteTcontMessage{}
335 if err := ptypes.UnmarshalAny(msgBody, delTcontMsg); err != nil {
336 logger.Warnw("cannot-unmarshal-delete-tcont-msg-body", log.Fields{
337 "device-id": dh.deviceID, "error": err})
338 return err
339 }
340
341 dh.pOnuTP.lockTpProcMutex()
342 if bTpModify := dh.pOnuTP.updateOnuUniTpPath(delTcontMsg.UniId, ""); bTpModify {
343 // deadline context to ensure completion of background routines waited for
344 deadline := time.Now().Add(10 * time.Second) //allowed run time to finish before execution
345 dctx, cancel := context.WithDeadline(context.Background(), deadline)
346
347 dh.pOnuTP.resetProcessingErrorIndication()
348 var wg sync.WaitGroup
349 wg.Add(2) // for the 2 go routines to finish
350 go dh.pOnuTP.deleteTpResource(dctx, delTcontMsg.UniId, delTcontMsg.TpPath,
351 cResourceTcont, delTcontMsg.AllocId, &wg)
352 // Removal of the tcont/alloc id mapping represents the removal of the tech profile
353 go dh.pOnuTP.updateOnuTpPathKvStore(dctx, &wg)
354 //the wait.. function is responsible for tpProcMutex.Unlock()
355 err := dh.pOnuTP.waitForTpCompletion(cancel, &wg) //let that also run off-line to let the IA messaging return!
356 return err
357 }
358 dh.pOnuTP.unlockTpProcMutex()
359 return nil
360}
361
362//processInterAdapterMessage sends the proxied messages to the target device
363// If the proxy address is not found in the unmarshalled message, it first fetches the onu device for which the message
364// is meant, and then send the unmarshalled omci message to this onu
365func (dh *deviceHandler) processInterAdapterMessage(msg *ic.InterAdapterMessage) error {
366 msgID := msg.Header.Id
367 msgType := msg.Header.Type
368 fromTopic := msg.Header.FromTopic
369 toTopic := msg.Header.ToTopic
370 toDeviceID := msg.Header.ToDeviceId
371 proxyDeviceID := msg.Header.ProxyDeviceId
372 logger.Debugw("InterAdapter message header", log.Fields{"msgID": msgID, "msgType": msgType,
373 "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
374
375 switch msgType {
376 case ic.InterAdapterMessageType_OMCI_REQUEST:
377 {
378 return dh.processInterAdapterOMCIReqMessage(msg)
379 }
380 case ic.InterAdapterMessageType_ONU_IND_REQUEST:
381 {
382 return dh.processInterAdapterONUIndReqMessage(msg)
383 }
384 case ic.InterAdapterMessageType_TECH_PROFILE_DOWNLOAD_REQUEST:
385 {
386 return dh.processInterAdapterTechProfileDownloadReqMessage(msg)
387 }
388 case ic.InterAdapterMessageType_DELETE_GEM_PORT_REQUEST:
389 {
390 return dh.processInterAdapterDeleteGemPortReqMessage(msg)
391
392 }
393 case ic.InterAdapterMessageType_DELETE_TCONT_REQUEST:
394 {
395 return dh.processInterAdapterDeleteTcontReqMessage(msg)
396 }
397 default:
398 {
399 logger.Errorw("inter-adapter-unhandled-type", log.Fields{
400 "device-id": dh.deviceID, "msgType": msg.Header.Type})
401 return errors.New("unimplemented")
402 }
403 }
404}
405
406//FlowUpdateIncremental removes and/or adds the flow changes on a given device
407func (dh *deviceHandler) FlowUpdateIncremental(apOfFlowChanges *openflow_13.FlowChanges,
408 apOfGroupChanges *openflow_13.FlowGroupChanges, apFlowMetaData *voltha.FlowMetadata) error {
409
410 if apOfFlowChanges.ToRemove != nil {
411 for _, flowItem := range apOfFlowChanges.ToRemove.Items {
412 logger.Debugw("incremental flow item remove", log.Fields{"deviceId": dh.deviceID,
413 "Item": flowItem})
414 }
415 }
416 if apOfFlowChanges.ToAdd != nil {
417 for _, flowItem := range apOfFlowChanges.ToAdd.Items {
418 if flowItem.GetCookie() == 0 {
419 logger.Debugw("incremental flow add - no cookie - ignore", log.Fields{
420 "deviceId": dh.deviceID})
421 continue
422 }
423 flowInPort := flow.GetInPort(flowItem)
424 if flowInPort == uint32(of.OfpPortNo_OFPP_INVALID) {
425 logger.Errorw("flow inPort invalid", log.Fields{"deviceID": dh.deviceID})
426 return errors.New("flow inPort invalid")
427 } else if flowInPort == dh.ponPortNumber {
428 //this is some downstream flow
429 logger.Debugw("incremental flow ignore downstream", log.Fields{
430 "deviceId": dh.deviceID, "inPort": flowInPort})
431 continue
432 } else {
433 // this is the relevant upstream flow
434 var loUniPort *onuUniPort
435 if uniPort, exist := dh.uniEntityMap[flowInPort]; exist {
436 loUniPort = uniPort
437 } else {
438 logger.Errorw("flow inPort not found in UniPorts",
439 log.Fields{"deviceID": dh.deviceID, "inPort": flowInPort})
440 return fmt.Errorf("flow-parameter inPort %d not found in internal UniPorts", flowInPort)
441 }
442 flowOutPort := flow.GetOutPort(flowItem)
443 logger.Debugw("incremental flow-add port indications", log.Fields{
444 "deviceId": dh.deviceID, "inPort": flowInPort, "outPort": flowOutPort,
445 "uniPortName": loUniPort.name})
446 err := dh.addFlowItemToUniPort(flowItem, loUniPort)
447 //abort processing in error case
448 if err != nil {
449 return err
450 }
451 }
452 }
453 }
454 return nil
455}
456
457//disableDevice locks the ONU and its UNI/VEIP ports (admin lock via OMCI)
458// TODO!!! Clarify usage of this method, it is for sure not used within ONOS (OLT) device disable
459// maybe it is obsolete by now
460func (dh *deviceHandler) disableDevice(device *voltha.Device) {
461 logger.Debugw("disable-device", log.Fields{"device-id": device.Id, "SerialNumber": device.SerialNumber})
462
463 if dh.deviceReason != "omci-admin-lock" {
464 // disable UNI ports/ONU
465 // *** should generate UniAdminStateDone event - unrelated to DeviceProcStatusUpdate!!
466 // here the result of the processing is not checked (trusted in background) *****
467 if dh.pLockStateFsm == nil {
468 dh.createUniLockFsm(true, UniAdminStateDone)
469 } else { //LockStateFSM already init
470 dh.pLockStateFsm.setSuccessEvent(UniAdminStateDone)
471 dh.runUniLockFsm(true)
472 }
473
474 if err := dh.coreProxy.DeviceReasonUpdate(context.TODO(), dh.deviceID, "omci-admin-lock"); err != nil {
475 //TODO with VOL-3045/VOL-3046: return the error and stop further processing
476 logger.Errorw("error-updating-reason-state", log.Fields{"device-id": dh.deviceID, "error": err})
477 }
478 dh.deviceReason = "omci-admin-lock"
479 //200604: ConnState improved to 'unreachable' (was not set in python-code), OperState 'unknown' seems to be best choice
480 if err := dh.coreProxy.DeviceStateUpdate(context.TODO(), dh.deviceID, voltha.ConnectStatus_UNREACHABLE,
481 voltha.OperStatus_UNKNOWN); err != nil {
482 //TODO with VOL-3045/VOL-3046: return the error and stop further processing
483 logger.Errorw("error-updating-device-state", log.Fields{"device-id": dh.deviceID, "error": err})
484 }
485
486 // update json file
487 // onustat := &OnuStatus{
488 // Id: dh.deviceID,
489 // AdminState: vc.AdminState_Types_name[int32(dh.device.AdminState)],
490 // OpeState: vc.OperStatus_Types_name[int32(dh.device.OperStatus)],
491 // ConnectState: vc.ConnectStatus_Types_name[int32(dh.device.ConnectStatus)],
492 // MacAddress: dh.device.MacAddress,
493 // }
494 // logger.Debugw("UpdateOnu", log.Fields{"Id": dh.deviceID})
495 // err := UpdateOnu(onustat)
496 // logger.Debugw("UpdateOnu", log.Fields{"err": err})
497 }
498}
499
500//reEnableDevice unlocks the ONU and its UNI/VEIP ports (admin unlock via OMCI)
501// TODO!!! Clarify usage of this method, compare above DisableDevice, usage may clarify resulting states
502// maybe it is obsolete by now
503func (dh *deviceHandler) reEnableDevice(device *voltha.Device) {
504 logger.Debugw("reenable-device", log.Fields{"device-id": device.Id, "SerialNumber": device.SerialNumber})
505
506 if err := dh.coreProxy.DeviceStateUpdate(context.TODO(), dh.deviceID, voltha.ConnectStatus_REACHABLE,
507 voltha.OperStatus_ACTIVE); err != nil {
508 //TODO with VOL-3045/VOL-3046: return the error and stop further processing
509 logger.Errorw("error-updating-device-state", log.Fields{"device-id": dh.deviceID, "error": err})
510 }
511
512 if err := dh.coreProxy.DeviceReasonUpdate(context.TODO(), dh.deviceID, "initial-mib-downloaded"); err != nil {
513 //TODO with VOL-3045/VOL-3046: return the error and stop further processing
514 logger.Errorw("error-updating-reason-state", log.Fields{"device-id": dh.deviceID, "error": err})
515 }
516 dh.deviceReason = "initial-mib-downloaded"
517
518
519 if dh.pUnlockStateFsm == nil {
520 dh.createUniLockFsm(false, UniAdminStateDone)
521 } else { //UnlockStateFSM already init
522 dh.pUnlockStateFsm.setSuccessEvent(UniAdminStateDone)
523 dh.runUniLockFsm(false)
524 }
525}
526
527func (dh *deviceHandler) reconcileDeviceOnuInd() {
528 logger.Debugw("reconciling - simulate onu indication", log.Fields{"device-id": dh.deviceID})
529
530 if err := dh.pOnuTP.restoreFromOnuTpPathKvStore(context.TODO()); err != nil {
531 logger.Errorw("reconciling - restoring OnuTp-data failed - abort", log.Fields{"err": err, "device-id": dh.deviceID})
532 dh.reconciling = false
533 return
534 }
535 var onuIndication oop.OnuIndication
536 onuIndication.IntfId = dh.pOnuTP.sOnuPersistentData.PersIntfID
537 onuIndication.OnuId = dh.pOnuTP.sOnuPersistentData.PersOnuID
538 onuIndication.OperState = dh.pOnuTP.sOnuPersistentData.PersOperState
539 onuIndication.AdminState = dh.pOnuTP.sOnuPersistentData.PersAdminState
540 _ = dh.createInterface(&onuIndication)
541}
542
543func (dh *deviceHandler) reconcileDeviceTechProf() {
544 logger.Debugw("reconciling - trigger tech profile config", log.Fields{"device-id": dh.deviceID})
545
546 dh.pOnuTP.lockTpProcMutex()
547 for _, uniData := range dh.pOnuTP.sOnuPersistentData.PersUniTpPath {
548 //In order to allow concurrent calls to other dh instances we do not wait for execution here
549 //but doing so we can not indicate problems to the caller (who does what with that then?)
550 //by now we just assume straightforward successful execution
551 //TODO!!! Generally: In this scheme it would be good to have some means to indicate
552 // possible problems to the caller later autonomously
553
554 // deadline context to ensure completion of background routines waited for
555 //20200721: 10s proved to be less in 8*8 ONU test on local vbox machine with debug, might be further adapted
556 deadline := time.Now().Add(30 * time.Second) //allowed run time to finish before execution
557 dctx, cancel := context.WithDeadline(context.Background(), deadline)
558
559 dh.pOnuTP.resetProcessingErrorIndication()
560 var wg sync.WaitGroup
561 wg.Add(1) // for the 1 go routines to finish
562 // attention: deadline completion check and wg.Done is to be done in both routines
563 go dh.pOnuTP.configureUniTp(dctx, uint8(uniData.PersUniID), uniData.PersTpPath, &wg)
564 //the wait.. function is responsible for tpProcMutex.Unlock()
565 _ = dh.pOnuTP.waitForTpCompletion(cancel, &wg) //wait for background process to finish and collect their result
566 return
567 }
568 dh.pOnuTP.unlockTpProcMutex()
569 dh.reconciling = false
570}
571
572func (dh *deviceHandler) deleteDevice(device *voltha.Device) error {
573 logger.Debugw("delete-device", log.Fields{"device-id": device.Id, "SerialNumber": device.SerialNumber})
574 if err := dh.pOnuTP.deleteOnuTpPathKvStore(context.TODO()); err != nil {
575 return err
576 }
577 return nil
578}
579
580func (dh *deviceHandler) rebootDevice(device *voltha.Device) error {
581 logger.Debugw("reboot-device", log.Fields{"device-id": device.Id, "SerialNumber": device.SerialNumber})
582 if device.ConnectStatus != voltha.ConnectStatus_REACHABLE {
583 logger.Errorw("device-unreachable", log.Fields{"device-id": device.Id, "SerialNumber": device.SerialNumber})
584 return errors.New("device-unreachable")
585 }
586 if err := dh.coreProxy.DeviceStateUpdate(context.TODO(), dh.deviceID, voltha.ConnectStatus_UNREACHABLE,
587 voltha.OperStatus_DISCOVERED); err != nil {
588 //TODO with VOL-3045/VOL-3046: return the error and stop further processing
589 logger.Errorw("error-updating-device-state", log.Fields{"device-id": dh.deviceID, "error": err})
590 return err
591 }
592 if err := dh.coreProxy.DeviceReasonUpdate(context.TODO(), dh.deviceID, "rebooting-onu"); err != nil {
593 //TODO with VOL-3045/VOL-3046: return the error and stop further processing
594 logger.Errorw("error-updating-reason-state", log.Fields{"device-id": dh.deviceID, "error": err})
595 return err
596 }
597 dh.deviceReason = "rebooting-onu"
598
599 if err := dh.sendMsgToOlt(context.TODO(), "reboot", nil); err != nil {
600 logger.Debugw("error", log.Fields{"error": err})
601 }
602
603 return nil
604}
605
606// deviceHandler methods that implement the adapters interface requests## end #########
607// #####################################################################################
608
609// ################ to be updated acc. needs of ONU Device ########################
610// deviceHandler StateMachine related state transition methods ##### begin #########
611
612func (dh *deviceHandler) logStateChange(e *fsm.Event) {
613 logger.Debugw("Device FSM: ", log.Fields{"event name": string(e.Event), "src state": string(e.Src), "dst state": string(e.Dst), "device-id": dh.deviceID})
614}
615
616// doStateInit provides the device update to the core
617func (dh *deviceHandler) doStateInit(e *fsm.Event) {
618
619 logger.Debug("doStateInit-started")
620 var err error
621
622 dh.device.Root = false
623 dh.device.Vendor = "OpenONU"
624 dh.device.Model = "go"
625 dh.device.Reason = "activating-onu"
626 dh.deviceReason = "activating-onu"
627
628 dh.logicalDeviceID = dh.deviceID // really needed - what for ??? //TODO!!!
629
630 if !dh.reconciling {
631 _ = dh.coreProxy.DeviceUpdate(context.TODO(), dh.device)
632 } else {
633 logger.Debugw("reconciling - don't notify core about DeviceUpdate",
634 log.Fields{"device-id": dh.deviceID})
635 }
636
637 dh.parentID = dh.device.ParentId
638 dh.ponPortNumber = dh.device.ParentPortNo
639
640 dh.ProxyAddressID = dh.device.ProxyAddress.GetDeviceId()
641 dh.ProxyAddressType = dh.device.ProxyAddress.GetDeviceType()
642 logger.Debugw("device-updated", log.Fields{"device-id": dh.deviceID, "proxyAddressID": dh.ProxyAddressID,
643 "proxyAddressType": dh.ProxyAddressType, "SNR": dh.device.SerialNumber,
644 "ParentId": dh.parentID, "ParentPortNo": dh.ponPortNumber})
645
646 /*
647 self._pon = PonPort.create(self, self._pon_port_number)
648 self._pon.add_peer(self.parent_id, self._pon_port_number)
649 self.logger.debug('adding-pon-port-to-agent',
650 type=self._pon.get_port().type,
651 admin_state=self._pon.get_port().admin_state,
652 oper_status=self._pon.get_port().oper_status,
653 )
654 */
655 if !dh.reconciling {
656 logger.Debugw("adding-pon-port", log.Fields{"deviceID": dh.deviceID, "ponPortNo": dh.ponPortNumber})
657 var ponPortNo uint32 = 1
658 if dh.ponPortNumber != 0 {
659 ponPortNo = dh.ponPortNumber
660 }
661
662 pPonPort := &voltha.Port{
663 PortNo: ponPortNo,
664 //Label: fmt.Sprintf("pon-%d", ponPortNo),
665 Label: "PON port",
666 Type: voltha.Port_PON_ONU,
667 OperStatus: voltha.OperStatus_ACTIVE,
668 Peers: []*voltha.Port_PeerPort{{DeviceId: dh.parentID, // Peer device is OLT
669 PortNo: ponPortNo}}, // Peer port is parent's port number
670 }
671 if err = dh.coreProxy.PortCreated(context.TODO(), dh.deviceID, pPonPort); err != nil {
672 logger.Fatalf("Device FSM: PortCreated-failed-%s", err)
673 e.Cancel(err)
674 return
675 }
676 } else {
677 logger.Debugw("reconciling - pon-port already added", log.Fields{"device-id": dh.deviceID})
678 }
679 logger.Debug("doStateInit-done")
680}
681
682// postInit setups the DeviceEntry for the conerned device
683func (dh *deviceHandler) postInit(e *fsm.Event) {
684
685 logger.Debug("postInit-started")
686 var err error
687 /*
688 dh.Client = oop.NewOpenoltClient(dh.clientCon)
689 dh.pTransitionMap.Handle(ctx, GrpcConnected)
690 return nil
691 */
692 if err = dh.addOnuDeviceEntry(context.TODO()); err != nil {
693 logger.Fatalf("Device FSM: addOnuDeviceEntry-failed-%s", err)
694 e.Cancel(err)
695 return
696 }
697
698 if dh.reconciling {
699 go dh.reconcileDeviceOnuInd()
700 // reconcilement will be continued after mib download is done
701 }
702 /*
703 ############################################################################
704 # Setup Alarm handler
705 self.events = AdapterEvents(self.core_proxy, device.id, self.logical_device_id,
706 device.serial_number)
707 ############################################################################
708 # Setup PM configuration for this device
709 # Pass in ONU specific options
710 kwargs = {
711 OnuPmMetrics.DEFAULT_FREQUENCY_KEY: OnuPmMetrics.DEFAULT_ONU_COLLECTION_FREQUENCY,
712 'heartbeat': self.heartbeat,
713 OnuOmciPmMetrics.OMCI_DEV_KEY: self._onu_omci_device
714 }
715 self.logger.debug('create-pm-metrics', device_id=device.id, serial_number=device.serial_number)
716 self._pm_metrics = OnuPmMetrics(self.events, self.core_proxy, self.device_id,
717 self.logical_device_id, device.serial_number,
718 grouped=True, freq_override=False, **kwargs)
719 pm_config = self._pm_metrics.make_proto()
720 self._onu_omci_device.set_pm_config(self._pm_metrics.omci_pm.openomci_interval_pm)
721 self.logger.info("initial-pm-config", device_id=device.id, serial_number=device.serial_number)
722 yield self.core_proxy.device_pm_config_update(pm_config, init=True)
723
724 # Note, ONU ID and UNI intf set in add_uni_port method
725 self._onu_omci_device.alarm_synchronizer.set_alarm_params(mgr=self.events,
726 ani_ports=[self._pon])
727
728 # Code to Run OMCI Test Action
729 kwargs_omci_test_action = {
730 OmciTestRequest.DEFAULT_FREQUENCY_KEY:
731 OmciTestRequest.DEFAULT_COLLECTION_FREQUENCY
732 }
733 serial_number = device.serial_number
734 self._test_request = OmciTestRequest(self.core_proxy,
735 self.omci_agent, self.device_id,
736 AniG, serial_number,
737 self.logical_device_id,
738 exclusive=False,
739 **kwargs_omci_test_action)
740
741 self.enabled = True
742 else:
743 self.logger.info('onu-already-activated')
744 */
745 logger.Debug("postInit-done")
746}
747
748// doStateConnected get the device info and update to voltha core
749// for comparison of the original method (not that easy to uncomment): compare here:
750// voltha-openolt-adapter/adaptercore/device_handler.go
751// -> this one obviously initiates all communication interfaces of the device ...?
752func (dh *deviceHandler) doStateConnected(e *fsm.Event) {
753
754 logger.Debug("doStateConnected-started")
755 err := errors.New("device FSM: function not implemented yet")
756 e.Cancel(err)
757 logger.Debug("doStateConnected-done")
758}
759
760// doStateUp handle the onu up indication and update to voltha core
761func (dh *deviceHandler) doStateUp(e *fsm.Event) {
762
763 logger.Debug("doStateUp-started")
764 err := errors.New("device FSM: function not implemented yet")
765 e.Cancel(err)
766 logger.Debug("doStateUp-done")
767
768 /*
769 // Synchronous call to update device state - this method is run in its own go routine
770 if err := dh.coreProxy.DeviceStateUpdate(ctx, dh.device.Id, voltha.ConnectStatus_REACHABLE,
771 voltha.OperStatus_ACTIVE); err != nil {
772 logger.Errorw("Failed to update device with OLT UP indication", log.Fields{"deviceID": dh.device.Id, "error": err})
773 return err
774 }
775 return nil
776 */
777}
778
779// doStateDown handle the onu down indication
780func (dh *deviceHandler) doStateDown(e *fsm.Event) {
781
782 logger.Debug("doStateDown-started")
783 var err error
784
785 device := dh.device
786 if device == nil {
787 /*TODO: needs to handle error scenarios */
788 logger.Error("Failed to fetch handler device")
789 e.Cancel(err)
790 return
791 }
792
793 cloned := proto.Clone(device).(*voltha.Device)
794 logger.Debugw("do-state-down", log.Fields{"ClonedDeviceID": cloned.Id})
795 /*
796 // Update the all ports state on that device to disable
797 if er := dh.coreProxy.PortsStateUpdate(ctx, cloned.Id, voltha.OperStatus_UNKNOWN); er != nil {
798 logger.Errorw("updating-ports-failed", log.Fields{"deviceID": device.Id, "error": er})
799 return er
800 }
801
802 //Update the device oper state and connection status
803 cloned.OperStatus = voltha.OperStatus_UNKNOWN
804 cloned.ConnectStatus = common.ConnectStatus_UNREACHABLE
805 dh.device = cloned
806
807 if er := dh.coreProxy.DeviceStateUpdate(ctx, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); er != nil {
808 logger.Errorw("error-updating-device-state", log.Fields{"deviceID": device.Id, "error": er})
809 return er
810 }
811
812 //get the child device for the parent device
813 onuDevices, err := dh.coreProxy.GetChildDevices(ctx, dh.device.Id)
814 if err != nil {
815 logger.Errorw("failed to get child devices information", log.Fields{"deviceID": dh.device.Id, "error": err})
816 return err
817 }
818 for _, onuDevice := range onuDevices.Items {
819
820 // Update onu state as down in onu adapter
821 onuInd := oop.OnuIndication{}
822 onuInd.OperState = "down"
823 er := dh.AdapterProxy.SendInterAdapterMessage(ctx, &onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST,
824 "openolt", onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
825 if er != nil {
826 logger.Errorw("Failed to send inter-adapter-message", log.Fields{"OnuInd": onuInd,
827 "From Adapter": "openolt", "DevieType": onuDevice.Type, "DeviceID": onuDevice.Id})
828 //Do not return here and continue to process other ONUs
829 }
830 }
831 // * Discovered ONUs entries need to be cleared , since after OLT
832 // is up, it starts sending discovery indications again* /
833 dh.discOnus = sync.Map{}
834 logger.Debugw("do-state-down-end", log.Fields{"deviceID": device.Id})
835 return nil
836 */
837 err = errors.New("device FSM: function not implemented yet")
838 e.Cancel(err)
839 logger.Debug("doStateDown-done")
840}
841
842// deviceHandler StateMachine related state transition methods ##### end #########
843// #################################################################################
844
845// ###################################################
846// deviceHandler utility methods ##### begin #########
847
848//getOnuDeviceEntry getsthe ONU device entry and may wait until its value is defined
849func (dh *deviceHandler) getOnuDeviceEntry(aWait bool) *OnuDeviceEntry {
850 dh.lockDevice.RLock()
851 pOnuDeviceEntry := dh.pOnuOmciDevice
852 if aWait && pOnuDeviceEntry == nil {
853 //keep the read sema short to allow for subsequent write
854 dh.lockDevice.RUnlock()
855 logger.Debugw("Waiting for DeviceEntry to be set ...", log.Fields{"device-id": dh.deviceID})
856 // based on concurrent processing the deviceEntry setup may not yet be finished at his point
857 // so it might be needed to wait here for that event with some timeout
858 select {
859 case <-time.After(60 * time.Second): //timer may be discussed ...
860 logger.Errorw("No valid DeviceEntry set after maxTime", log.Fields{"device-id": dh.deviceID})
861 return nil
862 case <-dh.deviceEntrySet:
863 logger.Debugw("devicEntry ready now - continue", log.Fields{"device-id": dh.deviceID})
864 // if written now, we can return the written value without sema
865 return dh.pOnuOmciDevice
866 }
867 }
868 dh.lockDevice.RUnlock()
869 return pOnuDeviceEntry
870}
871
872//setOnuDeviceEntry sets the ONU device entry within the handler
873func (dh *deviceHandler) setOnuDeviceEntry(
874 apDeviceEntry *OnuDeviceEntry, apOnuTp *onuUniTechProf) {
875 dh.lockDevice.Lock()
876 defer dh.lockDevice.Unlock()
877 dh.pOnuOmciDevice = apDeviceEntry
878 dh.pOnuTP = apOnuTp
879}
880
881//addOnuDeviceEntry creates a new ONU device or returns the existing
882func (dh *deviceHandler) addOnuDeviceEntry(ctx context.Context) error {
883 logger.Debugw("adding-deviceEntry", log.Fields{"device-id": dh.deviceID})
884
885 deviceEntry := dh.getOnuDeviceEntry(false)
886 if deviceEntry == nil {
887 /* costum_me_map in python code seems always to be None,
888 we omit that here first (declaration unclear) -> todo at Adapter specialization ...*/
889 /* also no 'clock' argument - usage open ...*/
890 /* and no alarm_db yet (oo.alarm_db) */
891 deviceEntry = newOnuDeviceEntry(ctx, dh.deviceID, dh.pOpenOnuAc.KVStoreHost,
892 dh.pOpenOnuAc.KVStorePort, dh.pOpenOnuAc.KVStoreType,
893 dh, dh.coreProxy, dh.AdapterProxy,
894 dh.pOpenOnuAc.pSupportedFsms) //nil as FSM pointer would yield deviceEntry internal defaults ...
895 onuTechProfProc := newOnuUniTechProf(ctx, dh.deviceID, dh)
896 //error treatment possible //TODO!!!
897 dh.setOnuDeviceEntry(deviceEntry, onuTechProfProc)
898 // fire deviceEntry ready event to spread to possibly waiting processing
899 dh.deviceEntrySet <- true
900
901 //dh.sendMsgToOlt(ctx, "adopt", nil)
902 // // update json file
903 // onustat := &OnuStatus{
904 // Id: dh.deviceID,
905 // AdminState: vc.AdminState_Types_name[int32(dh.device.AdminState)],
906 // OpeState: vc.OperStatus_Types_name[int32(dh.device.OperStatus)],
907 // ConnectState: vc.ConnectStatus_Types_name[int32(dh.device.ConnectStatus)],
908 // MacAddress: dh.device.MacAddress,
909 // }
910 // logger.Debugw("AddOnu", log.Fields{"Id": onustat.Id, "Admin": onustat.AdminState, "Ope": onustat.OpeState, "Con": onustat.ConnectState, "Mac": onustat.MacAddress})
911 // err := AddOnu(onustat)
912 // logger.Debugw("AddOnu", log.Fields{"err": err})
913 // //
914 // go WatchStatus(ctx, dh.coreProxy)
915
916 // dh.device.Root = true
917 // dh.coreProxy.DeviceUpdate(context.TODO(), dh.device)
918
919 logger.Infow("onuDeviceEntry-added", log.Fields{"device-id": dh.deviceID})
920 } else {
921 logger.Infow("onuDeviceEntry-add: Device already exists", log.Fields{"device-id": dh.deviceID})
922 }
923 return nil
924}
925
926// doStateInit provides the device update to the core
927func (dh *deviceHandler) createInterface(onuind *oop.OnuIndication) error {
928 logger.Debugw("create_interface-started", log.Fields{"OnuId": onuind.GetOnuId(),
929 "OnuIntfId": onuind.GetIntfId(), "OnuSerialNumber": onuind.GetSerialNumber()})
930
931 dh.pOnuIndication = onuind // let's revise if storing the pointer is sufficient...
932
933 if !dh.reconciling {
934 if err := dh.coreProxy.DeviceStateUpdate(context.TODO(), dh.deviceID,
935 voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVATING); err != nil {
936 //TODO with VOL-3045/VOL-3046: return the error and stop further processing
937 logger.Errorw("error-updating-device-state", log.Fields{"device-id": dh.deviceID, "error": err})
938 }
939 } else {
940 logger.Debugw("reconciling - don't notify core about DeviceStateUpdate to ACTIVATING",
941 log.Fields{"device-id": dh.deviceID})
942 }
943 /*
944 device, err := dh.coreProxy.GetDevice(context.TODO(), dh.device.Id, dh.device.Id)
945 if err != nil || device == nil {
946 //TODO: needs to handle error scenarios
947 logger.Errorw("Failed to fetch device device at creating If", log.Fields{"err": err})
948 return errors.New("Voltha Device not found")
949 }
950 */
951
952 pDevEntry := dh.getOnuDeviceEntry(true)
953 if pDevEntry != nil {
954 if err := pDevEntry.start(context.TODO()); err != nil {
955 return err
956 }
957 } else {
958 logger.Errorw("No valid OnuDevice -aborting", log.Fields{"device-id": dh.deviceID})
959 return errors.New("no valid OnuDevice")
960 }
961 if !dh.reconciling {
962 if err := dh.coreProxy.DeviceReasonUpdate(context.TODO(), dh.deviceID, "starting-openomci"); err != nil {
963 //TODO with VOL-3045/VOL-3046: return the error and stop further processing
964 logger.Errorw("error-DeviceReasonUpdate to starting-openomci", log.Fields{"device-id": dh.deviceID, "error": err})
965 }
966 } else {
967 logger.Debugw("reconciling - don't notify core about DeviceReasonUpdate to starting-openomci",
968 log.Fields{"device-id": dh.deviceID})
969 }
970 dh.deviceReason = "starting-openomci"
971
972 /* this might be a good time for Omci Verify message? */
973 verifyExec := make(chan bool)
974 omciVerify := newOmciTestRequest(context.TODO(),
975 dh.device.Id, pDevEntry.PDevOmciCC,
976 true, true) //eclusive and allowFailure (anyway not yet checked)
977 omciVerify.performOmciTest(context.TODO(), verifyExec)
978
979 /* give the handler some time here to wait for the OMCi verification result
980 after Timeout start and try MibUpload FSM anyway
981 (to prevent stopping on just not supported OMCI verification from ONU) */
982 select {
983 case <-time.After(2 * time.Second):
984 logger.Warn("omci start-verification timed out (continue normal)")
985 case testresult := <-verifyExec:
986 logger.Infow("Omci start verification done", log.Fields{"result": testresult})
987 }
988
989 /* In py code it looks earlier (on activate ..)
990 # Code to Run OMCI Test Action
991 kwargs_omci_test_action = {
992 OmciTestRequest.DEFAULT_FREQUENCY_KEY:
993 OmciTestRequest.DEFAULT_COLLECTION_FREQUENCY
994 }
995 serial_number = device.serial_number
996 self._test_request = OmciTestRequest(self.core_proxy,
997 self.omci_agent, self.device_id,
998 AniG, serial_number,
999 self.logical_device_id,
1000 exclusive=False,
1001 **kwargs_omci_test_action)
1002 ...
1003 # Start test requests after a brief pause
1004 if not self._test_request_started:
1005 self._test_request_started = True
1006 tststart = _STARTUP_RETRY_WAIT * (random.randint(1, 5))
1007 reactor.callLater(tststart, self._test_request.start_collector)
1008
1009 */
1010 /* which is then: in omci_test_request.py : */
1011 /*
1012 def start_collector(self, callback=None):
1013 """
1014 Start the collection loop for an adapter if the frequency > 0
1015
1016 :param callback: (callable) Function to call to collect PM data
1017 """
1018 self.logger.info("starting-pm-collection", device_name=self.name, default_freq=self.default_freq)
1019 if callback is None:
1020 callback = self.perform_test_omci
1021
1022 if self.lc is None:
1023 self.lc = LoopingCall(callback)
1024
1025 if self.default_freq > 0:
1026 self.lc.start(interval=self.default_freq / 10)
1027
1028 def perform_test_omci(self):
1029 """
1030 Perform the initial test request
1031 """
1032 ani_g_entities = self._device.configuration.ani_g_entities
1033 ani_g_entities_ids = list(ani_g_entities.keys()) if ani_g_entities \
1034 is not None else None
1035 self._entity_id = ani_g_entities_ids[0]
1036 self.logger.info('perform-test', entity_class=self._entity_class,
1037 entity_id=self._entity_id)
1038 try:
1039 frame = MEFrame(self._entity_class, self._entity_id, []).test()
1040 result = yield self._device.omci_cc.send(frame)
1041 if not result.fields['omci_message'].fields['success_code']:
1042 self.logger.info('Self-Test Submitted Successfully',
1043 code=result.fields[
1044 'omci_message'].fields['success_code'])
1045 else:
1046 raise TestFailure('Test Failure: {}'.format(
1047 result.fields['omci_message'].fields['success_code']))
1048 except TimeoutError as e:
1049 self.deferred.errback(failure.Failure(e))
1050
1051 except Exception as e:
1052 self.logger.exception('perform-test-Error', e=e,
1053 class_id=self._entity_class,
1054 entity_id=self._entity_id)
1055 self.deferred.errback(failure.Failure(e))
1056
1057 */
1058
1059
1060 /* Note: Even though FSM calls look 'synchronous' here, FSM is running in background with the effect that possible errors
1061 * within the MibUpload are not notified in the OnuIndication response, this might be acceptable here,
1062 * as further OltAdapter processing may rely on the deviceReason event 'MibUploadDone' as a result of the FSM processing
1063 * otherwise some processing synchronization would be required - cmp. e.g TechProfile processing
1064 */
1065 pMibUlFsm := pDevEntry.pMibUploadFsm.pFsm
1066 if pMibUlFsm != nil {
1067 if pMibUlFsm.Is(ulStDisabled) {
1068 if err := pMibUlFsm.Event(ulEvStart); err != nil {
1069 logger.Errorw("MibSyncFsm: Can't go to state starting", log.Fields{"err": err})
1070 return errors.New("can't go to state starting")
1071 }
1072 logger.Debugw("MibSyncFsm", log.Fields{"state": string(pMibUlFsm.Current())})
1073 //Determine ONU status and start/re-start MIB Synchronization tasks
1074 //Determine if this ONU has ever synchronized
1075 if true { //TODO: insert valid check
1076 if err := pMibUlFsm.Event(ulEvResetMib); err != nil {
1077 logger.Errorw("MibSyncFsm: Can't go to state resetting_mib", log.Fields{"err": err})
1078 return errors.New("can't go to state resetting_mib")
1079 }
1080 } else {
1081 if err := pMibUlFsm.Event(ulEvExamineMds); err != nil {
1082 logger.Errorw("MibSyncFsm: Can't go to state examine_mds", log.Fields{"err": err})
1083 return errors.New("can't go to examine_mds")
1084 }
1085 logger.Debugw("state of MibSyncFsm", log.Fields{"state": string(pMibUlFsm.Current())})
1086 //Examine the MIB Data Sync
1087 // callbacks to be handled:
1088 // Event(ulEvSuccess)
1089 // Event(ulEvTimeout)
1090 // Event(ulEvMismatch)
1091 }
1092 } else {
1093 logger.Errorw("wrong state of MibSyncFsm - want: disabled", log.Fields{"have": string(pMibUlFsm.Current())})
1094 return errors.New("wrong state of MibSyncFsm")
1095 }
1096 } else {
1097 logger.Errorw("MibSyncFsm invalid - cannot be executed!!", log.Fields{"device-id": dh.deviceID})
1098 return errors.New("cannot execut MibSync")
1099 }
1100 return nil
1101}
1102
1103func (dh *deviceHandler) updateInterface(onuind *oop.OnuIndication) error {
1104 if dh.deviceReason != "stopping-openomci" {
1105 logger.Debugw("updateInterface-started - stopping-device", log.Fields{"device-id": dh.deviceID})
1106 //stop all running SM processing - make use of the DH-state as mirrored in the deviceReason
1107 pDevEntry := dh.getOnuDeviceEntry(false)
1108 if pDevEntry == nil {
1109 logger.Errorw("No valid OnuDevice -aborting", log.Fields{"device-id": dh.deviceID})
1110 return errors.New("no valid OnuDevice")
1111 }
1112
1113 switch dh.deviceReason {
1114 case "starting-openomci":
1115 { //MIBSync FSM may run
1116 pMibUlFsm := pDevEntry.pMibUploadFsm.pFsm
1117 if pMibUlFsm != nil {
1118 _ = pMibUlFsm.Event(ulEvStop) //TODO!! verify if MibSyncFsm stop-processing is sufficient (to allow it again afterwards)
1119 }
1120 }
1121 case "discovery-mibsync-complete":
1122 { //MibDownload may run
1123 pMibDlFsm := pDevEntry.pMibDownloadFsm.pFsm
1124 if pMibDlFsm != nil {
1125 _ = pMibDlFsm.Event(dlEvReset)
1126 }
1127 }
1128 default:
1129 {
1130 //port lock/unlock FSM's may be active
1131 if dh.pUnlockStateFsm != nil {
1132 _ = dh.pUnlockStateFsm.pAdaptFsm.pFsm.Event(uniEvReset)
1133 }
1134 if dh.pLockStateFsm != nil {
1135 _ = dh.pLockStateFsm.pAdaptFsm.pFsm.Event(uniEvReset)
1136 }
1137 //techProfile related PonAniConfigFsm FSM may be active
1138 // maybe encapsulated as OnuTP method - perhaps later in context of module splitting
1139 if dh.pOnuTP.pAniConfigFsm != nil {
1140 _ = dh.pOnuTP.pAniConfigFsm.pAdaptFsm.pFsm.Event(aniEvReset)
1141 }
1142 for _, uniPort := range dh.uniEntityMap {
1143 //reset the TechProfileConfig Done state for all (active) UNI's
1144 dh.pOnuTP.setConfigDone(uniPort.uniID, false)
1145 // reset the possibly existing VlanConfigFsm
1146 if pVlanFilterFsm, exist := dh.UniVlanConfigFsmMap[uniPort.uniID]; exist {
1147 //VlanFilterFsm exists and was already started
1148 pVlanFilterStatemachine := pVlanFilterFsm.pAdaptFsm.pFsm
1149 if pVlanFilterStatemachine != nil {
1150 _ = pVlanFilterStatemachine.Event(vlanEvReset)
1151 }
1152 }
1153 }
1154 }
1155 //TODO!!! care about PM/Alarm processing once started
1156 }
1157 //TODO: from here the deviceHandler FSM itself may be stuck in some of the initial states
1158 // (mainly the still separate 'Event states')
1159 // so it is questionable, how this is resolved after some possible re-enable
1160 // assumption there is obviously, that the system may continue with some 'after "mib-download-done" state'
1161
1162 //stop/remove(?) the device entry
1163 _ = pDevEntry.stop(context.TODO()) //maybe some more sophisticated context treatment should be used here?
1164
1165 //TODO!!! remove existing traffic profiles
1166 /* from py code, if TP's exist, remove them - not yet implemented
1167 self._tp = dict()
1168 # Let TP download happen again
1169 for uni_id in self._tp_service_specific_task:
1170 self._tp_service_specific_task[uni_id].clear()
1171 for uni_id in self._tech_profile_download_done:
1172 self._tech_profile_download_done[uni_id].clear()
1173 */
1174
1175 dh.disableUniPortStateUpdate()
1176
1177 if err := dh.coreProxy.DeviceReasonUpdate(context.TODO(), dh.deviceID, "stopping-openomci"); err != nil {
1178 //TODO with VOL-3045/VOL-3046: return the error and stop further processing
1179 logger.Errorw("error-DeviceReasonUpdate to 'stopping-openomci'",
1180 log.Fields{"device-id": dh.deviceID, "error": err})
1181 // abort: system behavior is just unstable ...
1182 return err
1183 }
1184 dh.deviceReason = "stopping-openomci"
1185
1186 if err := dh.coreProxy.DeviceStateUpdate(context.TODO(), dh.deviceID,
1187 voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_DISCOVERED); err != nil {
1188 //TODO with VOL-3045/VOL-3046: return the error and stop further processing
1189 logger.Errorw("error-updating-device-state unreachable-discovered",
1190 log.Fields{"device-id": dh.deviceID, "error": err})
1191 // abort: system behavior is just unstable ...
1192 return err
1193 }
1194 } else {
1195 logger.Debugw("updateInterface - device already stopped", log.Fields{"device-id": dh.deviceID})
1196 }
1197 return nil
1198}
1199
1200func (dh *deviceHandler) processMibDatabaseSyncEvent(devEvent OnuDeviceEvent) {
1201 logger.Debugw("MibInSync event received", log.Fields{"device-id": dh.deviceID})
1202 if !dh.reconciling {
1203 //initiate DevStateUpdate
1204 if err := dh.coreProxy.DeviceReasonUpdate(context.TODO(), dh.deviceID, "discovery-mibsync-complete"); err != nil {
1205 //TODO with VOL-3045/VOL-3046: return the error and stop further processing
1206 logger.Errorw("error-DeviceReasonUpdate to 'mibsync-complete'", log.Fields{
1207 "device-id": dh.deviceID, "error": err})
1208 } else {
1209 logger.Infow("dev reason updated to 'MibSync complete'", log.Fields{"deviceID": dh.deviceID})
1210 }
1211 } else {
1212 logger.Debugw("reconciling - don't notify core about DeviceReasonUpdate to mibsync-complete",
1213 log.Fields{"device-id": dh.deviceID})
1214 }
1215 dh.deviceReason = "discovery-mibsync-complete"
1216
1217 i := uint8(0) //UNI Port limit: see MaxUnisPerOnu (by now 16) (OMCI supports max 255 p.b.)
1218 pDevEntry := dh.getOnuDeviceEntry(false)
1219 if unigInstKeys := pDevEntry.pOnuDB.getSortedInstKeys(me.UniGClassID); len(unigInstKeys) > 0 {
1220 for _, mgmtEntityID := range unigInstKeys {
1221 logger.Debugw("Add UNI port for stored UniG instance:", log.Fields{
1222 "device-id": dh.deviceID, "UnigMe EntityID": mgmtEntityID})
1223 dh.addUniPort(mgmtEntityID, i, uniPPTP)
1224 i++
1225 }
1226 } else {
1227 logger.Debugw("No UniG instances found", log.Fields{"device-id": dh.deviceID})
1228 }
1229 if veipInstKeys := pDevEntry.pOnuDB.getSortedInstKeys(me.VirtualEthernetInterfacePointClassID); len(veipInstKeys) > 0 {
1230 for _, mgmtEntityID := range veipInstKeys {
1231 logger.Debugw("Add VEIP acc. to stored VEIP instance:", log.Fields{
1232 "device-id": dh.deviceID, "VEIP EntityID": mgmtEntityID})
1233 dh.addUniPort(mgmtEntityID, i, uniVEIP)
1234 i++
1235 }
1236 } else {
1237 logger.Debugw("No VEIP instances found", log.Fields{"device-id": dh.deviceID})
1238 }
1239 if i == 0 {
1240 logger.Warnw("No PPTP instances found", log.Fields{"device-id": dh.deviceID})
1241 }
1242
1243 /* 200605: lock processing after initial MIBUpload removed now as the ONU should be in the lock state per default here
1244 * left the code here as comment in case such processing should prove needed unexpectedly
1245 // Init Uni Ports to Admin locked state
1246 // maybe not really needed here as UNI ports should be locked by default, but still left as available in python code
1247 // *** should generate UniLockStateDone event *****
1248 if dh.pLockStateFsm == nil {
1249 dh.createUniLockFsm(true, UniLockStateDone)
1250 } else { //LockStateFSM already init
1251 dh.pLockStateFsm.SetSuccessEvent(UniLockStateDone)
1252 dh.runUniLockFsm(true)
1253 }
1254 }
1255 case UniLockStateDone:
1256 {
1257 logger.Infow("UniLockStateDone event: Starting MIB download", log.Fields{"device-id": dh.deviceID})
1258 * lockState processing commented out
1259 */
1260 /* Mib download procedure -
1261 ***** should run over 'downloaded' state and generate MibDownloadDone event *****
1262 */
1263 pMibDlFsm := pDevEntry.pMibDownloadFsm.pFsm
1264 if pMibDlFsm != nil {
1265 if pMibDlFsm.Is(dlStDisabled) {
1266 if err := pMibDlFsm.Event(dlEvStart); err != nil {
1267 logger.Errorw("MibDownloadFsm: Can't go to state starting", log.Fields{"err": err})
1268 // maybe try a FSM reset and then again ... - TODO!!!
1269 } else {
1270 logger.Debugw("MibDownloadFsm", log.Fields{"state": string(pMibDlFsm.Current())})
1271 // maybe use more specific states here for the specific download steps ...
1272 if err := pMibDlFsm.Event(dlEvCreateGal); err != nil {
1273 logger.Errorw("MibDownloadFsm: Can't start CreateGal", log.Fields{"err": err})
1274 } else {
1275 logger.Debugw("state of MibDownloadFsm", log.Fields{"state": string(pMibDlFsm.Current())})
1276 //Begin MIB data download (running autonomously)
1277 }
1278 }
1279 } else {
1280 logger.Errorw("wrong state of MibDownloadFsm - want: disabled", log.Fields{"have": string(pMibDlFsm.Current())})
1281 // maybe try a FSM reset and then again ... - TODO!!!
1282 }
1283 /***** Mib download started */
1284 } else {
1285 logger.Errorw("MibDownloadFsm invalid - cannot be executed!!", log.Fields{"device-id": dh.deviceID})
1286 }
1287}
1288
1289func (dh *deviceHandler) processMibDownloadDoneEvent(devEvent OnuDeviceEvent) {
1290 logger.Debugw("MibDownloadDone event received", log.Fields{"device-id": dh.deviceID})
1291 if !dh.reconciling {
1292 if err := dh.coreProxy.DeviceStateUpdate(context.TODO(), dh.deviceID,
1293 voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE); err != nil {
1294 //TODO with VOL-3045/VOL-3046: return the error and stop further processing
1295 logger.Errorw("error-updating-device-state", log.Fields{"device-id": dh.deviceID, "error": err})
1296 } else {
1297 logger.Debugw("dev state updated to 'Oper.Active'", log.Fields{"device-id": dh.deviceID})
1298 }
1299 } else {
1300 logger.Debugw("reconciling - don't notify core about DeviceStateUpdate to ACTIVE",
1301 log.Fields{"device-id": dh.deviceID})
1302 }
1303 if !dh.reconciling {
1304 if err := dh.coreProxy.DeviceReasonUpdate(context.TODO(), dh.deviceID, "initial-mib-downloaded"); err != nil {
1305 //TODO with VOL-3045/VOL-3046: return the error and stop further processing
1306 logger.Errorw("error-DeviceReasonUpdate to 'initial-mib-downloaded'",
1307 log.Fields{"device-id": dh.deviceID, "error": err})
1308 } else {
1309 logger.Infow("dev reason updated to 'initial-mib-downloaded'", log.Fields{"device-id": dh.deviceID})
1310 }
1311 } else {
1312 logger.Debugw("reconciling - don't notify core about DeviceReasonUpdate to initial-mib-downloaded",
1313 log.Fields{"device-id": dh.deviceID})
1314 }
1315 dh.deviceReason = "initial-mib-downloaded"
1316 if dh.pUnlockStateFsm == nil {
1317 dh.createUniLockFsm(false, UniUnlockStateDone)
1318 } else { //UnlockStateFSM already init
1319 dh.pUnlockStateFsm.setSuccessEvent(UniUnlockStateDone)
1320 dh.runUniLockFsm(false)
1321 }
1322}
1323
1324func (dh *deviceHandler) processUniUnlockStateDoneEvent(devEvent OnuDeviceEvent) {
1325 go dh.enableUniPortStateUpdate() //cmp python yield self.enable_ports()
1326
1327 if !dh.reconciling {
1328 logger.Infow("UniUnlockStateDone event: Sending OnuUp event", log.Fields{"device-id": dh.deviceID})
1329 raisedTs := time.Now().UnixNano()
1330 go dh.sendOnuOperStateEvent(voltha.OperStatus_ACTIVE, dh.deviceID, raisedTs) //cmp python onu_active_event
1331 } else {
1332 logger.Debugw("reconciling - don't notify core that onu went to active but trigger tech profile config",
1333 log.Fields{"device-id": dh.deviceID})
1334 go dh.reconcileDeviceTechProf()
1335 //TODO: further actions e.g. restore flows, metrics, ...
1336 }
1337}
1338
1339func (dh *deviceHandler) processOmciAniConfigDoneEvent(devEvent OnuDeviceEvent) {
1340 logger.Debugw("OmciAniConfigDone event received", log.Fields{"device-id": dh.deviceID})
1341 if dh.deviceReason != "tech-profile-config-download-success" {
1342 // which may be the case from some previous actvity on another UNI Port of the ONU
1343 if !dh.reconciling {
1344 if err := dh.coreProxy.DeviceReasonUpdate(context.TODO(), dh.deviceID, "tech-profile-config-download-success"); err != nil {
1345 //TODO with VOL-3045/VOL-3046: return the error and stop further processing
1346 logger.Errorw("error-DeviceReasonUpdate to 'tech-profile-config-download-success'",
1347 log.Fields{"device-id": dh.deviceID, "error": err})
1348 } else {
1349 logger.Infow("update dev reason to 'tech-profile-config-download-success'",
1350 log.Fields{"device-id": dh.deviceID})
1351 }
1352 } else {
1353 logger.Debugw("reconciling - don't notify core about DeviceReasonUpdate to tech-profile-config-download-success",
1354 log.Fields{"device-id": dh.deviceID})
1355 }
1356 //set internal state anyway - as it was done
1357 dh.deviceReason = "tech-profile-config-download-success"
1358 }
1359}
1360
1361func (dh *deviceHandler) processOmciVlanFilterDoneEvent(devEvent OnuDeviceEvent) {
1362 logger.Debugw("OmciVlanFilterDone event received",
1363 log.Fields{"device-id": dh.deviceID})
1364
1365 if dh.deviceReason != "omci-flows-pushed" {
1366 // which may be the case from some previous actvity on another UNI Port of the ONU
1367 // or even some previous flow add activity on the same port
1368 if err := dh.coreProxy.DeviceReasonUpdate(context.TODO(), dh.deviceID, "omci-flows-pushed"); err != nil {
1369 logger.Errorw("error-DeviceReasonUpdate to 'omci-flows-pushed'",
1370 log.Fields{"device-id": dh.deviceID, "error": err})
1371 } else {
1372 logger.Infow("updated dev reason to ''omci-flows-pushed'",
1373 log.Fields{"device-id": dh.deviceID})
1374 }
1375 //set internal state anyway - as it was done
1376 dh.deviceReason = "omci-flows-pushed"
1377 }
1378}
1379
1380//deviceProcStatusUpdate evaluates possible processing events and initiates according next activities
1381func (dh *deviceHandler) deviceProcStatusUpdate(devEvent OnuDeviceEvent) {
1382 switch devEvent {
1383 case MibDatabaseSync:
1384 {
1385 dh.processMibDatabaseSyncEvent(devEvent)
1386 }
1387 case MibDownloadDone:
1388 {
1389 dh.processMibDownloadDoneEvent(devEvent)
1390
1391 }
1392 case UniUnlockStateDone:
1393 {
1394 dh.processUniUnlockStateDoneEvent(devEvent)
1395
1396 }
1397 case OmciAniConfigDone:
1398 {
1399 dh.processOmciAniConfigDoneEvent(devEvent)
1400
1401 }
1402 case OmciVlanFilterDone:
1403 {
1404 dh.processOmciVlanFilterDoneEvent(devEvent)
1405
1406 }
1407 default:
1408 {
1409 logger.Warnw("unhandled-device-event", log.Fields{"device-id": dh.deviceID, "event": devEvent})
1410 }
1411 } //switch
1412}
1413
1414func (dh *deviceHandler) addUniPort(aUniInstNo uint16, aUniID uint8, aPortType uniPortType) {
1415 uniNo := mkUniPortNum(dh.pOnuIndication.GetIntfId(), dh.pOnuIndication.GetOnuId(),
1416 uint32(aUniID))
1417 if _, present := dh.uniEntityMap[uniNo]; present {
1418 logger.Warnw("onuUniPort-add: Port already exists", log.Fields{"for InstanceId": aUniInstNo})
1419 } else {
1420 //with arguments aUniID, a_portNo, aPortType
1421 pUniPort := newOnuUniPort(aUniID, uniNo, aUniInstNo, aPortType)
1422 if pUniPort == nil {
1423 logger.Warnw("onuUniPort-add: Could not create Port", log.Fields{"for InstanceId": aUniInstNo})
1424 } else {
1425 //store UniPort with the System-PortNumber key
1426 dh.uniEntityMap[uniNo] = pUniPort
1427 if !dh.reconciling {
1428 // create announce the UniPort to the core as VOLTHA Port object
1429 if err := pUniPort.createVolthaPort(dh); err == nil {
1430 logger.Infow("onuUniPort-added", log.Fields{"for PortNo": uniNo})
1431 } //error logging already within UniPort method
1432 } else {
1433 logger.Debugw("reconciling - onuUniPort already added", log.Fields{"for PortNo": uniNo, "device-id": dh.deviceID})
1434 }
1435 }
1436 }
1437}
1438
1439// enableUniPortStateUpdate enables UniPortState and update core port state accordingly
1440func (dh *deviceHandler) enableUniPortStateUpdate() {
1441
1442
1443 for uniNo, uniPort := range dh.uniEntityMap {
1444 // only if this port is validated for operState transfer
1445 if (1<<uniPort.uniID)&activeUniPortStateUpdateMask == (1 << uniPort.uniID) {
1446 logger.Infow("onuUniPort-forced-OperState-ACTIVE", log.Fields{"for PortNo": uniNo})
1447 uniPort.setOperState(vc.OperStatus_ACTIVE)
1448 if !dh.reconciling {
1449 //maybe also use getter functions on uniPort - perhaps later ...
1450 go dh.coreProxy.PortStateUpdate(context.TODO(), dh.deviceID, voltha.Port_ETHERNET_UNI, uniPort.portNo, uniPort.operState)
1451 } else {
1452 logger.Debugw("reconciling - don't notify core about PortStateUpdate", log.Fields{"device-id": dh.deviceID})
1453 }
1454 }
1455 }
1456}
1457
1458// Disable UniPortState and update core port state accordingly
1459func (dh *deviceHandler) disableUniPortStateUpdate() {
1460 for uniNo, uniPort := range dh.uniEntityMap {
1461 // only if this port is validated for operState transfer
1462 if (1<<uniPort.uniID)&activeUniPortStateUpdateMask == (1 << uniPort.uniID) {
1463 logger.Infow("onuUniPort-forced-OperState-UNKNOWN", log.Fields{"for PortNo": uniNo})
1464 uniPort.setOperState(vc.OperStatus_UNKNOWN)
1465 //maybe also use getter functions on uniPort - perhaps later ...
1466 go dh.coreProxy.PortStateUpdate(context.TODO(), dh.deviceID, voltha.Port_ETHERNET_UNI, uniPort.portNo, uniPort.operState)
1467 }
1468 }
1469}
1470
1471// ONU_Active/Inactive announcement on system KAFKA bus
1472// tried to re-use procedure of oltUpDownIndication from openolt_eventmgr.go with used values from Py code
1473func (dh *deviceHandler) sendOnuOperStateEvent(aOperState vc.OperStatus_Types, aDeviceID string, raisedTs int64) {
1474 var de voltha.DeviceEvent
1475 eventContext := make(map[string]string)
1476 parentDevice, err := dh.coreProxy.GetDevice(context.TODO(), dh.parentID, dh.parentID)
1477 if err != nil || parentDevice == nil {
1478 logger.Errorw("Failed to fetch parent device for OnuEvent",
1479 log.Fields{"parentID": dh.parentID, "err": err})
1480 }
1481 oltSerialNumber := parentDevice.SerialNumber
1482
1483 eventContext["pon-id"] = strconv.FormatUint(uint64(dh.pOnuIndication.IntfId), 10)
1484 eventContext["onu-id"] = strconv.FormatUint(uint64(dh.pOnuIndication.OnuId), 10)
1485 eventContext["serial-number"] = dh.device.SerialNumber
1486 eventContext["olt_serial_number"] = oltSerialNumber
1487 eventContext["device_id"] = aDeviceID
1488 eventContext["registration_id"] = aDeviceID //py: string(device_id)??
1489 logger.Debugw("prepare ONU_ACTIVATED event",
1490 log.Fields{"DeviceId": aDeviceID, "EventContext": eventContext})
1491
1492 /* Populating device event body */
1493 de.Context = eventContext
1494 de.ResourceId = aDeviceID
1495 if aOperState == voltha.OperStatus_ACTIVE {
1496 de.DeviceEventName = fmt.Sprintf("%s_%s", cOnuActivatedEvent, "RAISE_EVENT")
1497 de.Description = fmt.Sprintf("%s Event - %s - %s",
1498 cEventObjectType, cOnuActivatedEvent, "Raised")
1499 } else {
1500 de.DeviceEventName = fmt.Sprintf("%s_%s", cOnuActivatedEvent, "CLEAR_EVENT")
1501 de.Description = fmt.Sprintf("%s Event - %s - %s",
1502 cEventObjectType, cOnuActivatedEvent, "Cleared")
1503 }
1504 /* Send event to KAFKA */
1505 if err := dh.EventProxy.SendDeviceEvent(&de, equipment, pon, raisedTs); err != nil {
1506 logger.Warnw("could not send ONU_ACTIVATED event",
1507 log.Fields{"device-id": aDeviceID, "error": err})
1508 }
1509 logger.Debugw("ONU_ACTIVATED event sent to KAFKA",
1510 log.Fields{"device-id": aDeviceID, "with-EventName": de.DeviceEventName})
1511}
1512
1513// createUniLockFsm initializes and runs the UniLock FSM to transfer the OMCI related commands for port lock/unlock
1514func (dh *deviceHandler) createUniLockFsm(aAdminState bool, devEvent OnuDeviceEvent) {
1515 chLSFsm := make(chan Message, 2048)
1516 var sFsmName string
1517 if aAdminState {
1518 logger.Infow("createLockStateFSM", log.Fields{"device-id": dh.deviceID})
1519 sFsmName = "LockStateFSM"
1520 } else {
1521 logger.Infow("createUnlockStateFSM", log.Fields{"device-id": dh.deviceID})
1522 sFsmName = "UnLockStateFSM"
1523 }
1524
1525 pDevEntry := dh.getOnuDeviceEntry(true)
1526 if pDevEntry == nil {
1527 logger.Errorw("No valid OnuDevice -aborting", log.Fields{"device-id": dh.deviceID})
1528 return
1529 }
1530 pLSFsm := newLockStateFsm(pDevEntry.PDevOmciCC, aAdminState, devEvent,
1531 sFsmName, dh.deviceID, chLSFsm)
1532 if pLSFsm != nil {
1533 if aAdminState {
1534 dh.pLockStateFsm = pLSFsm
1535 } else {
1536 dh.pUnlockStateFsm = pLSFsm
1537 }
1538 dh.runUniLockFsm(aAdminState)
1539 } else {
1540 logger.Errorw("LockStateFSM could not be created - abort!!", log.Fields{"device-id": dh.deviceID})
1541 }
1542}
1543
1544// runUniLockFsm starts the UniLock FSM to transfer the OMCI related commands for port lock/unlock
1545func (dh *deviceHandler) runUniLockFsm(aAdminState bool) {
1546 /* Uni Port lock/unlock procedure -
1547 ***** should run via 'adminDone' state and generate the argument requested event *****
1548 */
1549 var pLSStatemachine *fsm.FSM
1550 if aAdminState {
1551 pLSStatemachine = dh.pLockStateFsm.pAdaptFsm.pFsm
1552 //make sure the opposite FSM is not running and if so, terminate it as not relevant anymore
1553 if (dh.pUnlockStateFsm != nil) &&
1554 (dh.pUnlockStateFsm.pAdaptFsm.pFsm.Current() != uniStDisabled) {
1555 _ = dh.pUnlockStateFsm.pAdaptFsm.pFsm.Event(uniEvReset)
1556 }
1557 } else {
1558 pLSStatemachine = dh.pUnlockStateFsm.pAdaptFsm.pFsm
1559 //make sure the opposite FSM is not running and if so, terminate it as not relevant anymore
1560 if (dh.pLockStateFsm != nil) &&
1561 (dh.pLockStateFsm.pAdaptFsm.pFsm.Current() != uniStDisabled) {
1562 _ = dh.pLockStateFsm.pAdaptFsm.pFsm.Event(uniEvReset)
1563 }
1564 }
1565 if pLSStatemachine != nil {
1566 if pLSStatemachine.Is(uniStDisabled) {
1567 if err := pLSStatemachine.Event(uniEvStart); err != nil {
1568 logger.Warnw("LockStateFSM: can't start", log.Fields{"err": err})
1569 // maybe try a FSM reset and then again ... - TODO!!!
1570 } else {
1571 /***** LockStateFSM started */
1572 logger.Debugw("LockStateFSM started", log.Fields{
1573 "state": pLSStatemachine.Current(), "device-id": dh.deviceID})
1574 }
1575 } else {
1576 logger.Warnw("wrong state of LockStateFSM - want: disabled", log.Fields{
1577 "have": pLSStatemachine.Current(), "device-id": dh.deviceID})
1578 // maybe try a FSM reset and then again ... - TODO!!!
1579 }
1580 } else {
1581 logger.Errorw("LockStateFSM StateMachine invalid - cannot be executed!!", log.Fields{"device-id": dh.deviceID})
1582 // maybe try a FSM reset and then again ... - TODO!!!
1583 }
1584}
1585
1586//setBackend provides a DB backend for the specified path on the existing KV client
1587func (dh *deviceHandler) setBackend(aBasePathKvStore string) *db.Backend {
1588 addr := dh.pOpenOnuAc.KVStoreHost + ":" + strconv.Itoa(dh.pOpenOnuAc.KVStorePort)
1589 logger.Debugw("SetKVStoreBackend", log.Fields{"IpTarget": addr,
1590 "BasePathKvStore": aBasePathKvStore, "device-id": dh.deviceID})
1591 kvbackend := &db.Backend{
1592 Client: dh.pOpenOnuAc.kvClient,
1593 StoreType: dh.pOpenOnuAc.KVStoreType,
1594 /* address config update acc. to [VOL-2736] */
1595 Address: addr,
1596 Timeout: dh.pOpenOnuAc.KVStoreTimeout,
1597 PathPrefix: aBasePathKvStore}
1598
1599 return kvbackend
1600}
1601func (dh *deviceHandler) getFlowOfbFields(apFlowItem *ofp.OfpFlowStats, loMatchVlan *uint16,
1602 loAddPcp *uint8, loIPProto *uint32) {
1603
1604 for _, field := range flow.GetOfbFields(apFlowItem) {
1605 switch field.Type {
1606 case of.OxmOfbFieldTypes_OFPXMT_OFB_ETH_TYPE:
1607 {
1608 logger.Debugw("FlowAdd type EthType", log.Fields{"device-id": dh.deviceID,
1609 "EthType": strconv.FormatInt(int64(field.GetEthType()), 16)})
1610 }
1611 case of.OxmOfbFieldTypes_OFPXMT_OFB_IP_PROTO:
1612 {
1613 *loIPProto = field.GetIpProto()
1614 logger.Debugw("FlowAdd type IpProto", log.Fields{"device-id": dh.deviceID,
1615 "IpProto": strconv.FormatInt(int64(*loIPProto), 16)})
1616 if *loIPProto == 2 {
1617 // some workaround for TT workflow at proto == 2 (IGMP trap) -> ignore the flow
1618 // avoids installing invalid EVTOCD rule
1619 logger.Debugw("FlowAdd type IpProto 2: TT workaround: ignore flow",
1620 log.Fields{"device-id": dh.deviceID,
1621 "IpProto": strconv.FormatInt(int64(*loIPProto), 16)})
1622 return
1623 }
1624 }
1625 case of.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_VID:
1626 {
1627 *loMatchVlan = uint16(field.GetVlanVid())
1628 loMatchVlanMask := uint16(field.GetVlanVidMask())
1629 if !(*loMatchVlan == uint16(of.OfpVlanId_OFPVID_PRESENT) &&
1630 loMatchVlanMask == uint16(of.OfpVlanId_OFPVID_PRESENT)) {
1631 *loMatchVlan = *loMatchVlan & 0xFFF // not transparent: copy only ID bits
1632 }
1633 logger.Debugw("FlowAdd field type", log.Fields{"device-id": dh.deviceID,
1634 "VID": strconv.FormatInt(int64(*loMatchVlan), 16)})
1635 }
1636 case of.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_PCP:
1637 {
1638 *loAddPcp = uint8(field.GetVlanPcp())
1639 logger.Debugw("FlowAdd field type", log.Fields{"device-id": dh.deviceID,
1640 "PCP": loAddPcp})
1641 }
1642 case of.OxmOfbFieldTypes_OFPXMT_OFB_UDP_DST:
1643 {
1644 logger.Debugw("FlowAdd field type", log.Fields{"device-id": dh.deviceID,
1645 "UDP-DST": strconv.FormatInt(int64(field.GetUdpDst()), 16)})
1646 }
1647 case of.OxmOfbFieldTypes_OFPXMT_OFB_UDP_SRC:
1648 {
1649 logger.Debugw("FlowAdd field type", log.Fields{"device-id": dh.deviceID,
1650 "UDP-SRC": strconv.FormatInt(int64(field.GetUdpSrc()), 16)})
1651 }
1652 case of.OxmOfbFieldTypes_OFPXMT_OFB_IPV4_DST:
1653 {
1654 logger.Debugw("FlowAdd field type", log.Fields{"device-id": dh.deviceID,
1655 "IPv4-DST": field.GetIpv4Dst()})
1656 }
1657 case of.OxmOfbFieldTypes_OFPXMT_OFB_IPV4_SRC:
1658 {
1659 logger.Debugw("FlowAdd field type", log.Fields{"device-id": dh.deviceID,
1660 "IPv4-SRC": field.GetIpv4Src()})
1661 }
1662 case of.OxmOfbFieldTypes_OFPXMT_OFB_METADATA:
1663 {
1664 logger.Debugw("FlowAdd field type", log.Fields{"device-id": dh.deviceID,
1665 "Metadata": field.GetTableMetadata()})
1666 }
1667 /*
1668 default:
1669 {
1670 //all other entires ignored
1671 }
1672 */
1673 }
1674 } //for all OfbFields
1675}
1676
1677func (dh *deviceHandler) getFlowActions(apFlowItem *ofp.OfpFlowStats, loSetPcp *uint8, loSetVlan *uint16) {
1678 for _, action := range flow.GetActions(apFlowItem) {
1679 switch action.Type {
1680 /* not used:
1681 case of.OfpActionType_OFPAT_OUTPUT:
1682 {
1683 logger.Debugw("FlowAdd action type", log.Fields{"device-id": dh.deviceID,
1684 "Output": action.GetOutput()})
1685 }
1686 */
1687 case of.OfpActionType_OFPAT_PUSH_VLAN:
1688 {
1689 logger.Debugw("FlowAdd action type", log.Fields{"device-id": dh.deviceID,
1690 "PushEthType": strconv.FormatInt(int64(action.GetPush().Ethertype), 16)})
1691 }
1692 case of.OfpActionType_OFPAT_SET_FIELD:
1693 {
1694 pActionSetField := action.GetSetField()
1695 if pActionSetField.Field.OxmClass != of.OfpOxmClass_OFPXMC_OPENFLOW_BASIC {
1696 logger.Warnw("FlowAdd action SetField invalid OxmClass (ignored)", log.Fields{"device-id": dh.deviceID,
1697 "OxcmClass": pActionSetField.Field.OxmClass})
1698 }
1699 if pActionSetField.Field.GetOfbField().Type == of.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_VID {
1700 *loSetVlan = uint16(pActionSetField.Field.GetOfbField().GetVlanVid())
1701 logger.Debugw("FlowAdd Set VLAN from SetField action", log.Fields{"device-id": dh.deviceID,
1702 "SetVlan": strconv.FormatInt(int64(*loSetVlan), 16)})
1703 } else if pActionSetField.Field.GetOfbField().Type == of.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_PCP {
1704 *loSetPcp = uint8(pActionSetField.Field.GetOfbField().GetVlanPcp())
1705 logger.Debugw("FlowAdd Set PCP from SetField action", log.Fields{"device-id": dh.deviceID,
1706 "SetPcp": *loSetPcp})
1707 } else {
1708 logger.Warnw("FlowAdd action SetField invalid FieldType", log.Fields{"device-id": dh.deviceID,
1709 "Type": pActionSetField.Field.GetOfbField().Type})
1710 }
1711 }
1712 /*
1713 default:
1714 {
1715 //all other entires ignored
1716 }
1717 */
1718 }
1719 } //for all Actions
1720}
1721
1722//addFlowItemToUniPort parses the actual flow item to add it to the UniPort
1723func (dh *deviceHandler) addFlowItemToUniPort(apFlowItem *ofp.OfpFlowStats, apUniPort *onuUniPort) error {
1724 var loSetVlan uint16 = uint16(of.OfpVlanId_OFPVID_NONE) //noValidEntry
1725 var loMatchVlan uint16 = uint16(of.OfpVlanId_OFPVID_PRESENT) //reserved VLANID entry
1726 var loAddPcp, loSetPcp uint8
1727 var loIPProto uint32
1728 /* the TechProfileId is part of the flow Metadata - compare also comment within
1729 * OLT-Adapter:openolt_flowmgr.go
1730 * Metadata 8 bytes:
1731 * Most Significant 2 Bytes = Inner VLAN
1732 * Next 2 Bytes = Tech Profile ID(TPID)
1733 * Least Significant 4 Bytes = Port ID
1734 * Flow Metadata carries Tech-Profile (TP) ID and is mandatory in all
1735 * subscriber related flows.
1736 */
1737
1738 metadata := flow.GetMetadataFromWriteMetadataAction(apFlowItem)
1739 if metadata == 0 {
1740 logger.Debugw("FlowAdd invalid metadata - abort",
1741 log.Fields{"device-id": dh.deviceID})
1742 return errors.New("flowAdd invalid metadata")
1743 }
1744 loTpID := flow.GetTechProfileIDFromWriteMetaData(metadata)
1745 logger.Debugw("FlowAdd TechProfileId", log.Fields{"device-id": dh.deviceID, "TP-Id": loTpID})
1746
1747 dh.getFlowOfbFields(apFlowItem, &loMatchVlan, &loAddPcp, &loIPProto)
1748 if loIPProto == 2 {
1749 // some workaround for TT workflow at proto == 2 (IGMP trap) -> ignore the flow
1750 // avoids installing invalid EVTOCD rule
1751 logger.Debugw("FlowAdd type IpProto 2: TT workaround: ignore flow",
1752 log.Fields{"device-id": dh.deviceID,
1753 "IpProto": strconv.FormatInt(int64(loIPProto), 16)})
1754 return nil
1755 }
1756 dh.getFlowActions(apFlowItem, &loSetPcp, &loSetVlan)
1757
1758 if loSetVlan == uint16(of.OfpVlanId_OFPVID_NONE) && loMatchVlan != uint16(of.OfpVlanId_OFPVID_PRESENT) {
1759 logger.Errorw("FlowAdd aborted - SetVlanId undefined, but MatchVid set", log.Fields{
1760 "device-id": dh.deviceID, "UniPort": apUniPort.portNo,
1761 "set_vid": strconv.FormatInt(int64(loSetVlan), 16),
1762 "match_vid": strconv.FormatInt(int64(loMatchVlan), 16)})
1763 //TODO!!: Use DeviceId within the error response to rwCore
1764 // likewise also in other error response cases to calling components as requested in [VOL-3458]
1765 return errors.New("flowAdd Set/Match VlanId inconsistent")
1766 }
1767 if loSetVlan == uint16(of.OfpVlanId_OFPVID_NONE) && loMatchVlan == uint16(of.OfpVlanId_OFPVID_PRESENT) {
1768 logger.Debugw("FlowAdd vlan-any/copy", log.Fields{"device-id": dh.deviceID})
1769 loSetVlan = loMatchVlan //both 'transparent' (copy any)
1770 } else {
1771 //looks like OMCI value 4097 (copyFromOuter - for Uni double tagged) is not supported here
1772 if loSetVlan != uint16(of.OfpVlanId_OFPVID_PRESENT) {
1773 // not set to transparent
1774 loSetVlan &= 0x0FFF //mask VID bits as prerequisite for vlanConfigFsm
1775 }
1776 logger.Debugw("FlowAdd vlan-set", log.Fields{"device-id": dh.deviceID})
1777 }
1778 if _, exist := dh.UniVlanConfigFsmMap[apUniPort.uniID]; exist {
1779 logger.Errorw("FlowAdd aborted - FSM already running", log.Fields{
1780 "device-id": dh.deviceID, "UniPort": apUniPort.portNo})
1781 return errors.New("flowAdd FSM already running")
1782 }
1783 return dh.createVlanFilterFsm(apUniPort,
1784 loTpID, loMatchVlan, loSetVlan, loSetPcp, OmciVlanFilterDone)
1785}
1786
1787// createVlanFilterFsm initializes and runs the VlanFilter FSM to transfer OMCI related VLAN config
1788func (dh *deviceHandler) createVlanFilterFsm(apUniPort *onuUniPort,
1789 aTpID uint16, aMatchVlan uint16, aSetVlan uint16, aSetPcp uint8, aDevEvent OnuDeviceEvent) error {
1790 chVlanFilterFsm := make(chan Message, 2048)
1791
1792 pDevEntry := dh.getOnuDeviceEntry(true)
1793 if pDevEntry == nil {
1794 logger.Errorw("No valid OnuDevice -aborting", log.Fields{"device-id": dh.deviceID})
1795 return fmt.Errorf("no valid OnuDevice for device-id %x - aborting", dh.deviceID)
1796 }
1797
1798 pVlanFilterFsm := NewUniVlanConfigFsm(dh, pDevEntry.PDevOmciCC, apUniPort, dh.pOnuTP,
1799 pDevEntry.pOnuDB, aTpID, aDevEvent, "UniVlanConfigFsm", dh.deviceID, chVlanFilterFsm,
1800 dh.pOpenOnuAc.AcceptIncrementalEvto, aMatchVlan, aSetVlan, aSetPcp)
1801 if pVlanFilterFsm != nil {
1802 dh.UniVlanConfigFsmMap[apUniPort.uniID] = pVlanFilterFsm
1803 pVlanFilterStatemachine := pVlanFilterFsm.pAdaptFsm.pFsm
1804 if pVlanFilterStatemachine != nil {
1805 if pVlanFilterStatemachine.Is(vlanStDisabled) {
1806 if err := pVlanFilterStatemachine.Event(vlanEvStart); err != nil {
1807 logger.Warnw("UniVlanConfigFsm: can't start", log.Fields{"err": err})
1808 return fmt.Errorf("can't start UniVlanConfigFsm for device-id %x", dh.deviceID)
1809 }
1810 /***** UniVlanConfigFsm started */
1811 logger.Debugw("UniVlanConfigFsm started", log.Fields{
1812 "state": pVlanFilterStatemachine.Current(), "device-id": dh.deviceID,
1813 "UniPort": apUniPort.portNo})
1814 } else {
1815 logger.Warnw("wrong state of UniVlanConfigFsm - want: disabled", log.Fields{
1816 "have": pVlanFilterStatemachine.Current(), "device-id": dh.deviceID})
1817 return fmt.Errorf("uniVlanConfigFsm not in expected disabled state for device-id %x", dh.deviceID)
1818 }
1819 } else {
1820 logger.Errorw("UniVlanConfigFsm StateMachine invalid - cannot be executed!!", log.Fields{
1821 "device-id": dh.deviceID})
1822 return fmt.Errorf("uniVlanConfigFsm invalid for device-id %x", dh.deviceID)
1823 }
1824 } else {
1825 logger.Errorw("UniVlanConfigFsm could not be created - abort!!", log.Fields{
1826 "device-id": dh.deviceID, "UniPort": apUniPort.portNo})
1827 return fmt.Errorf("uniVlanConfigFsm could not be created for device-id %x", dh.deviceID)
1828 }
1829 return nil
1830}
1831
1832//verifyUniVlanConfigRequest checks on existence of flow configuration and starts it accordingly
1833func (dh *deviceHandler) verifyUniVlanConfigRequest(apUniPort *onuUniPort) {
1834 if pVlanFilterFsm, exist := dh.UniVlanConfigFsmMap[apUniPort.uniID]; exist {
1835 //VlanFilterFsm exists and was already started (assumed to wait for TechProfile execution here)
1836 pVlanFilterStatemachine := pVlanFilterFsm.pAdaptFsm.pFsm
1837 if pVlanFilterStatemachine != nil {
1838 if pVlanFilterStatemachine.Is(vlanStWaitingTechProf) {
1839 if err := pVlanFilterStatemachine.Event(vlanEvContinueConfig); err != nil {
1840 logger.Warnw("UniVlanConfigFsm: can't continue processing", log.Fields{"err": err})
1841 } else {
1842 /***** UniVlanConfigFsm continued */
1843 logger.Debugw("UniVlanConfigFsm continued", log.Fields{
1844 "state": pVlanFilterStatemachine.Current(), "device-id": dh.deviceID,
1845 "UniPort": apUniPort.portNo})
1846 }
1847 } else {
1848 logger.Debugw("no state of UniVlanConfigFsm to be continued", log.Fields{
1849 "have": pVlanFilterStatemachine.Current(), "device-id": dh.deviceID})
1850 }
1851 } else {
1852 logger.Debugw("UniVlanConfigFsm StateMachine does not exist, no flow processing", log.Fields{
1853 "device-id": dh.deviceID})
1854 }
1855
1856 } // else: nothing to do
1857}
1858
1859//RemoveVlanFilterFsm deletes the stored pointer to the VlanConfigFsm
1860// intention is to provide this method to be called from VlanConfigFsm itself, when resources (and methods!) are cleaned up
1861func (dh *deviceHandler) RemoveVlanFilterFsm(apUniPort *onuUniPort) {
1862 logger.Debugw("remove UniVlanConfigFsm StateMachine", log.Fields{
1863 "device-id": dh.deviceID, "uniPort": apUniPort.portNo})
1864 delete(dh.UniVlanConfigFsmMap, apUniPort.uniID)
1865}