general update for library compatibility, [VOL-3202], some TechProf processing restructuring
Signed-off-by: mpagenko <michael.pagenkopf@adtran.com>
Change-Id: I451c663fea4dc3ea5acd141069e64a1ad2a68d58
diff --git a/internal/pkg/onuadaptercore/device_handler.go b/internal/pkg/onuadaptercore/device_handler.go
index 1aca121..bbd7660 100644
--- a/internal/pkg/onuadaptercore/device_handler.go
+++ b/internal/pkg/onuadaptercore/device_handler.go
@@ -23,7 +23,6 @@
"errors"
"fmt"
"strconv"
- "strings"
"sync"
"time"
@@ -32,10 +31,10 @@
"github.com/looplab/fsm"
me "github.com/opencord/omci-lib-go/generated"
"github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
vc "github.com/opencord/voltha-protos/v3/go/common"
ic "github.com/opencord/voltha-protos/v3/go/inter_container"
- of "github.com/opencord/voltha-protos/v3/go/openflow_13"
oop "github.com/opencord/voltha-protos/v3/go/openolt"
"github.com/opencord/voltha-protos/v3/go/voltha"
)
@@ -70,27 +69,6 @@
cOnuActivatedEvent = "ONU_ACTIVATED"
)
-type resourceEntry int
-
-const (
- cResourceGemPort resourceEntry = 1
- cResourceTcont resourceEntry = 2
-)
-
-type OnuSerialNumber struct {
- VendorId []byte
- VendorSpecific []byte
-}
-
-type onuPersistentData struct {
- persOnuID uint32
- persIntfID uint32
- persSnr OnuSerialNumber
- persAdminState string
- persOperState string
- persUniTpPath map[uint32]string
-}
-
//DeviceHandler will interact with the ONU ? device.
type DeviceHandler struct {
deviceID string
@@ -107,14 +85,12 @@
AdapterProxy adapterif.AdapterProxy
EventProxy adapterif.EventProxy
- tpProcMutex sync.RWMutex
- sOnuPersistentData onuPersistentData
-
pOpenOnuAc *OpenONUAC
pDeviceStateFsm *fsm.FSM
pPonPort *voltha.Port
deviceEntrySet chan bool //channel for DeviceEntry set event
pOnuOmciDevice *OnuDeviceEntry
+ pOnuTP *OnuUniTechProf
exitChannel chan int
lockDevice sync.RWMutex
pOnuIndication *oop.OnuIndication
@@ -147,8 +123,6 @@
dh.DeviceType = cloned.Type
dh.adminState = "up"
dh.device = cloned
- dh.tpProcMutex = sync.RWMutex{}
- dh.sOnuPersistentData.persUniTpPath = make(map[uint32]string)
dh.pOpenOnuAc = adapter
dh.exitChannel = make(chan int, 1)
dh.lockDevice = sync.RWMutex{}
@@ -181,6 +155,7 @@
"before_DeviceDownInd": func(e *fsm.Event) { dh.doStateDown(e) },
},
)
+
return &dh
}
@@ -287,85 +262,106 @@
return errors.New("InvalidOperState")
}
}
- // TODO: temporarily commented out - see https://gerrit.opencord.org/#/c/19330/
- // case ic.InterAdapterMessageType_TECH_PROFILE_DOWNLOAD_REQUEST:
- // {
- // msgBody := msg.GetBody()
- // techProfMsg := &ic.InterAdapterTechProfileDownloadMessage{}
- // if err := ptypes.UnmarshalAny(msgBody, techProfMsg); err != nil {
- // logger.Warnw("cannot-unmarshal-techprof-msg-body", log.Fields{
- // "deviceID": dh.deviceID, "error": err})
- // return err
- // }
- // // we have to lock access to TechProfile processing based on different messageType calls or
- // // even to fast subsequent calls of the same messageType
- // dh.tpProcMutex.Lock()
- // // lock hangs as long as below decoupled or other related TechProfile processing is active
- // if bTpModify := dh.updateOnuUniTpPath(techProfMsg.UniId, techProfMsg.Path); bTpModify == true {
- // // if there has been some change for some uni TechProfilePath
- // //in order to allow concurrent calls to other dh instances we do not wait for execution here
- // //but doing so we can not indicate problems to the caller (who does what with that then?)
- // //by now we just assume straightforward successful execution
- // //TODO!!! Generally: In this scheme it would be good to have some means to indicate
- // // possible problems to the caller later autonomously
+ case ic.InterAdapterMessageType_TECH_PROFILE_DOWNLOAD_REQUEST:
+ {
+ if dh.pOnuTP == nil {
+ //should normally not happen ...
+ logger.Warnw("onuTechProf instance not set up for DLMsg request - ignoring request",
+ log.Fields{"deviceID": dh.deviceID})
+ return errors.New("TechProfile DLMsg request while onuTechProf instance not setup")
+ }
- // // some code to coordinate TP 'run to completion'
- // // attention: completion and wg.Add is assumed to be doen in both routines,
- // // no timeout control so far (needed)
- // var wg sync.WaitGroup
- // wg.Add(2) // for the 2 go routines to finish
- // go dh.configureUniTp(techProfMsg.UniId, techProfMsg.Path, &wg)
- // go dh.updateOnuTpPathKvStore(&wg)
- // //the wait.. function is responsible for tpProcMutex.Unlock()
- // go dh.waitForTpCompletion(&wg) //let that also run off-line to let the IA messaging return!
- // } else {
- // dh.tpProcMutex.Unlock()
- // }
- // }
- // case ic.InterAdapterMessageType_DELETE_GEM_PORT_REQUEST:
- // {
- // msgBody := msg.GetBody()
- // delGemPortMsg := &ic.InterAdapterDeleteGemPortMessage{}
- // if err := ptypes.UnmarshalAny(msgBody, delGemPortMsg); err != nil {
- // logger.Warnw("cannot-unmarshal-delete-gem-msg-body", log.Fields{
- // "deviceID": dh.deviceID, "error": err})
- // return err
- // }
+ msgBody := msg.GetBody()
+ techProfMsg := &ic.InterAdapterTechProfileDownloadMessage{}
+ if err := ptypes.UnmarshalAny(msgBody, techProfMsg); err != nil {
+ logger.Warnw("cannot-unmarshal-techprof-msg-body", log.Fields{
+ "deviceID": dh.deviceID, "error": err})
+ return err
+ }
- // //compare TECH_PROFILE_DOWNLOAD_REQUEST
- // dh.tpProcMutex.Lock()
- // var wg sync.WaitGroup
- // wg.Add(1) // for the 1 go routine to finish
- // go dh.deleteTpRessource(delGemPortMsg.UniId, delGemPortMsg.TpPath,
- // cResourceGemPort, delGemPortMsg.GemPortId, &wg)
- // //the wait.. function is responsible for tpProcMutex.Unlock()
- // go dh.waitForTpCompletion(&wg) //let that also run off-line to let the IA messaging return!
- // }
- // case ic.InterAdapterMessageType_DELETE_TCONT_REQUEST:
- // {
- // msgBody := msg.GetBody()
- // delTcontMsg := &ic.InterAdapterDeleteTcontMessage{}
- // if err := ptypes.UnmarshalAny(msgBody, delTcontMsg); err != nil {
- // logger.Warnw("cannot-unmarshal-delete-tcont-msg-body", log.Fields{
- // "deviceID": dh.deviceID, "error": err})
- // return err
- // }
+ // we have to lock access to TechProfile processing based on different messageType calls or
+ // even to fast subsequent calls of the same messageType
+ dh.pOnuTP.lockTpProcMutex()
+ // lock hangs as long as below decoupled or other related TechProfile processing is active
+ if bTpModify := dh.pOnuTP.updateOnuUniTpPath(techProfMsg.UniId, techProfMsg.Path); bTpModify == true {
+ // if there has been some change for some uni TechProfilePath
+ //in order to allow concurrent calls to other dh instances we do not wait for execution here
+ //but doing so we can not indicate problems to the caller (who does what with that then?)
+ //by now we just assume straightforward successful execution
+ //TODO!!! Generally: In this scheme it would be good to have some means to indicate
+ // possible problems to the caller later autonomously
- // //compare TECH_PROFILE_DOWNLOAD_REQUEST
- // dh.tpProcMutex.Lock()
- // if bTpModify := dh.updateOnuUniTpPath(delTcontMsg.UniId, ""); bTpModify == true {
- // var wg sync.WaitGroup
- // wg.Add(2) // for the 1 go routine to finish
- // go dh.deleteTpRessource(delTcontMsg.UniId, delTcontMsg.TpPath,
- // cResourceTcont, delTcontMsg.AllocId, &wg)
- // // Removal of the tcont/alloc id mapping represents the removal of the tech profile
- // go dh.updateOnuTpPathKvStore(&wg)
- // //the wait.. function is responsible for tpProcMutex.Unlock()
- // go dh.waitForTpCompletion(&wg) //let that also run off-line to let the IA messaging return!
- // } else {
- // dh.tpProcMutex.Unlock()
- // }
- // }
+ // some code to coordinate TP 'run to completion'
+ // attention: completion and wg.Add is assumed to be done in both routines,
+ // no timeout control so far (needed)
+ var wg sync.WaitGroup
+ wg.Add(2) // for the 2 go routines to finish
+ go dh.pOnuTP.configureUniTp(techProfMsg.UniId, techProfMsg.Path, &wg)
+ go dh.pOnuTP.updateOnuTpPathKvStore(&wg)
+ //the wait.. function is responsible for tpProcMutex.Unlock()
+ go dh.pOnuTP.waitForTpCompletion(&wg) //let that also run off-line to let the IA messaging return!
+ } else {
+ dh.pOnuTP.unlockTpProcMutex()
+ }
+ }
+ case ic.InterAdapterMessageType_DELETE_GEM_PORT_REQUEST:
+ {
+ if dh.pOnuTP == nil {
+ //should normally not happen ...
+ logger.Warnw("onuTechProf instance not set up for DelGem request - ignoring request",
+ log.Fields{"deviceID": dh.deviceID})
+ return errors.New("TechProfile DelGem request while onuTechProf instance not setup")
+ }
+
+ msgBody := msg.GetBody()
+ delGemPortMsg := &ic.InterAdapterDeleteGemPortMessage{}
+ if err := ptypes.UnmarshalAny(msgBody, delGemPortMsg); err != nil {
+ logger.Warnw("cannot-unmarshal-delete-gem-msg-body", log.Fields{
+ "deviceID": dh.deviceID, "error": err})
+ return err
+ }
+
+ //compare TECH_PROFILE_DOWNLOAD_REQUEST
+ dh.pOnuTP.lockTpProcMutex()
+ var wg sync.WaitGroup
+ wg.Add(1) // for the 1 go routine to finish
+ go dh.pOnuTP.deleteTpRessource(delGemPortMsg.UniId, delGemPortMsg.TpPath,
+ cResourceGemPort, delGemPortMsg.GemPortId, &wg)
+ //the wait.. function is responsible for tpProcMutex.Unlock()
+ go dh.pOnuTP.waitForTpCompletion(&wg) //let that also run off-line to let the IA messaging return!
+ }
+ case ic.InterAdapterMessageType_DELETE_TCONT_REQUEST:
+ {
+ if dh.pOnuTP == nil {
+ //should normally not happen ...
+ logger.Warnw("onuTechProf instance not set up for DelTcont request - ignoring request",
+ log.Fields{"deviceID": dh.deviceID})
+ return errors.New("TechProfile DelTcont request while onuTechProf instance not setup")
+ }
+
+ msgBody := msg.GetBody()
+ delTcontMsg := &ic.InterAdapterDeleteTcontMessage{}
+ if err := ptypes.UnmarshalAny(msgBody, delTcontMsg); err != nil {
+ logger.Warnw("cannot-unmarshal-delete-tcont-msg-body", log.Fields{
+ "deviceID": dh.deviceID, "error": err})
+ return err
+ }
+
+ //compare TECH_PROFILE_DOWNLOAD_REQUEST
+ dh.pOnuTP.lockTpProcMutex()
+ if bTpModify := dh.pOnuTP.updateOnuUniTpPath(delTcontMsg.UniId, ""); bTpModify == true {
+ var wg sync.WaitGroup
+ wg.Add(2) // for the 1 go routine to finish
+ go dh.pOnuTP.deleteTpRessource(delTcontMsg.UniId, delTcontMsg.TpPath,
+ cResourceTcont, delTcontMsg.AllocId, &wg)
+ // Removal of the tcont/alloc id mapping represents the removal of the tech profile
+ go dh.pOnuTP.updateOnuTpPathKvStore(&wg)
+ //the wait.. function is responsible for tpProcMutex.Unlock()
+ go dh.pOnuTP.waitForTpCompletion(&wg) //let that also run off-line to let the IA messaging return!
+ } else {
+ dh.pOnuTP.unlockTpProcMutex()
+ }
+ }
default:
{
logger.Errorw("inter-adapter-unhandled-type", log.Fields{
@@ -373,35 +369,6 @@
return errors.New("unimplemented")
}
}
-
- /* form py code:
- elif request.header.type == InterAdapterMessageType.TECH_PROFILE_DOWNLOAD_REQUEST:
- tech_msg = InterAdapterTechProfileDownloadMessage()
- request.body.Unpack(tech_msg)
- self.logger.debug('inter-adapter-recv-tech-profile', tech_msg=tech_msg)
-
- self.load_and_configure_tech_profile(tech_msg.uni_id, tech_msg.path)
-
- elif request.header.type == InterAdapterMessageType.DELETE_GEM_PORT_REQUEST:
- del_gem_msg = InterAdapterDeleteGemPortMessage()
- request.body.Unpack(del_gem_msg)
- self.logger.debug('inter-adapter-recv-del-gem', gem_del_msg=del_gem_msg)
-
- self.delete_tech_profile(uni_id=del_gem_msg.uni_id,
- gem_port_id=del_gem_msg.gem_port_id,
- tp_path=del_gem_msg.tp_path)
-
- elif request.header.type == InterAdapterMessageType.DELETE_TCONT_REQUEST:
- del_tcont_msg = InterAdapterDeleteTcontMessage()
- request.body.Unpack(del_tcont_msg)
- self.logger.debug('inter-adapter-recv-del-tcont', del_tcont_msg=del_tcont_msg)
-
- self.delete_tech_profile(uni_id=del_tcont_msg.uni_id,
- alloc_id=del_tcont_msg.alloc_id,
- tp_path=del_tcont_msg.tp_path)
- else:
- self.logger.error("inter-adapter-unhandled-type", request=request)
- */
return nil
}
@@ -465,54 +432,6 @@
}
}
-//GetOfpPortInfo returns the Voltha PortCapabilty with the logical port
-func (dh *DeviceHandler) GetOfpPortInfo(device *voltha.Device,
- portNo int64) (*ic.PortCapability, error) {
- logger.Debugw("GetOfpPortInfo start", log.Fields{"deviceID": device.Id, "portNo": portNo})
-
- //function body as per OLTAdapter handler code
- // adapted with values from py dapter code
- if pUniPort, exist := dh.uniEntityMap[uint32(portNo)]; exist {
- var macOctets [6]uint8
- macOctets[5] = 0x08
- macOctets[4] = uint8(dh.ponPortNumber >> 8)
- macOctets[3] = uint8(dh.ponPortNumber)
- macOctets[2] = uint8(portNo >> 16)
- macOctets[1] = uint8(portNo >> 8)
- macOctets[0] = uint8(portNo)
- hwAddr := genMacFromOctets(macOctets)
- capacity := uint32(of.OfpPortFeatures_OFPPF_1GB_FD | of.OfpPortFeatures_OFPPF_FIBER)
- name := device.SerialNumber + "-" + strconv.FormatUint(uint64(pUniPort.macBpNo), 10)
- ofUniPortState := of.OfpPortState_OFPPS_LINK_DOWN
- if pUniPort.operState == vc.OperStatus_ACTIVE {
- ofUniPortState = of.OfpPortState_OFPPS_LIVE
- }
- logger.Debugw("setting LogicalPort", log.Fields{"with-name": name,
- "withUniPort": pUniPort.name, "withMacBase": hwAddr, "OperState": ofUniPortState})
-
- return &ic.PortCapability{
- Port: &voltha.LogicalPort{
- OfpPort: &of.OfpPort{
- Name: name,
- //HwAddr: macAddressToUint32Array(dh.device.MacAddress),
- HwAddr: macAddressToUint32Array(hwAddr),
- Config: 0,
- State: uint32(ofUniPortState),
- Curr: capacity,
- Advertised: capacity,
- Peer: capacity,
- CurrSpeed: uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
- MaxSpeed: uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
- },
- DeviceId: device.Id,
- DevicePortNo: uint32(portNo),
- },
- }, nil
- }
- logger.Warnw("No UniPort found - abort", log.Fields{"for PortNo": uint32(portNo)})
- return nil, errors.New("UniPort not found")
-}
-
// DeviceHandler methods that implement the adapters interface requests## end #########
// #####################################################################################
@@ -772,10 +691,12 @@
}
//SetOnuDeviceEntry sets the ONU device entry within the handler
-func (dh *DeviceHandler) SetOnuDeviceEntry(pDeviceEntry *OnuDeviceEntry) error {
+func (dh *DeviceHandler) SetOnuDeviceEntry(
+ apDeviceEntry *OnuDeviceEntry, apOnuTp *OnuUniTechProf) error {
dh.lockDevice.Lock()
defer dh.lockDevice.Unlock()
- dh.pOnuOmciDevice = pDeviceEntry
+ dh.pOnuOmciDevice = apDeviceEntry
+ dh.pOnuTP = apOnuTp
return nil
}
@@ -789,11 +710,13 @@
we omit that here first (declaration unclear) -> todo at Adapter specialization ...*/
/* also no 'clock' argument - usage open ...*/
/* and no alarm_db yet (oo.alarm_db) */
- deviceEntry = NewOnuDeviceEntry(ctx, dh.deviceID, dh.pOpenOnuAc.KVStoreHost, dh.pOpenOnuAc.KVStorePort, dh.pOpenOnuAc.KVStoreType,
+ deviceEntry = NewOnuDeviceEntry(ctx, dh.deviceID, dh.pOpenOnuAc.KVStoreHost,
+ dh.pOpenOnuAc.KVStorePort, dh.pOpenOnuAc.KVStoreType,
dh, dh.coreProxy, dh.AdapterProxy,
dh.pOpenOnuAc.pSupportedFsms) //nil as FSM pointer would yield deviceEntry internal defaults ...
+ onuTechProfProc := NewOnuUniTechProf(ctx, dh.deviceID, dh)
//error treatment possible //TODO!!!
- dh.SetOnuDeviceEntry(deviceEntry)
+ dh.SetOnuDeviceEntry(deviceEntry, onuTechProfProc)
// fire deviceEntry ready event to spread to possibly waiting processing
dh.deviceEntrySet <- true
logger.Infow("onuDeviceEntry-added", log.Fields{"for deviceId": dh.deviceID})
@@ -818,7 +741,7 @@
// It does not look to me as if makes sense to work with the real core device here, (not the stored clone)?
// in this code the GetDevice would just make a check if the DeviceID's Device still exists in core
// in python code it looks as the started onu_omci_device might have been updated with some new instance state of the core device
- // but I would not know why, and the go code anyway dows not work with the device directly anymore in the OnuDeviceEntry
+ // but I would not know why, and the go code anyway does not work with the device directly anymore in the OnuDeviceEntry
// so let's just try to keep it simple ...
/*
device, err := dh.coreProxy.GetDevice(context.TODO(), dh.device.Id, dh.device.Id)
@@ -1348,101 +1271,18 @@
}
}
-/* **** Traffic Profile related processing **********/
-// updateOnuUniTpPath verifies and updates changes in the dh.onuUniTpPath
-func (dh *DeviceHandler) updateOnuUniTpPath(aUniID uint32, aPathString string) bool {
- /* within some specific InterAdapter processing request write/read access to data is ensured to be sequentially,
- as also the complete sequence is ensured to 'run to completion' before some new request is accepted
- no specific concurrency protection to sOnuPersistentData is required here
- */
- if existingPath, present := dh.sOnuPersistentData.persUniTpPath[aUniID]; present {
- // uni entry already exists
- //logger.Debugw(" already exists", log.Fields{"for InstanceId": a_uniInstNo})
- if existingPath != aPathString {
- if aPathString == "" {
- //existing entry to be deleted
- logger.Debugw("UniTp path delete", log.Fields{
- "deviceID": dh.deviceID, "uniID": aUniID, "path": aPathString})
- delete(dh.sOnuPersistentData.persUniTpPath, aUniID)
- } else {
- //existing entry to be modified
- logger.Debugw("UniTp path modify", log.Fields{
- "deviceID": dh.deviceID, "uniID": aUniID, "path": aPathString})
- dh.sOnuPersistentData.persUniTpPath[aUniID] = aPathString
- }
- return true
- }
- //entry already exists
- logger.Debugw("UniTp path already exists", log.Fields{
- "deviceID": dh.deviceID, "uniID": aUniID, "path": aPathString})
- return false
- } else {
- //uni entry does not exist
- if aPathString == "" {
- //delete request in non-existing state , accept as no change
- logger.Debugw("UniTp path already removed", log.Fields{
- "deviceID": dh.deviceID, "uniID": aUniID})
- return false
- }
- //new entry to be set
- logger.Debugw("New UniTp path set", log.Fields{
- "deviceID": dh.deviceID, "uniID": aUniID, "path": aPathString})
- dh.sOnuPersistentData.persUniTpPath[aUniID] = aPathString
- return true
- }
-}
+//SetBackend provides a DB backend for the specified path on the existing KV client
+func (dh *DeviceHandler) SetBackend(aBasePathKvStore string) *db.Backend {
+ addr := dh.pOpenOnuAc.KVStoreHost + ":" + strconv.Itoa(dh.pOpenOnuAc.KVStorePort)
+ logger.Debugw("SetKVStoreBackend", log.Fields{"IpTarget": addr,
+ "BasePathKvStore": aBasePathKvStore, "deviceId": dh.deviceID})
+ kvbackend := &db.Backend{
+ Client: dh.pOpenOnuAc.kvClient,
+ StoreType: dh.pOpenOnuAc.KVStoreType,
+ /* address config update acc. to [VOL-2736] */
+ Address: addr,
+ Timeout: dh.pOpenOnuAc.KVStoreTimeout,
+ PathPrefix: aBasePathKvStore}
-func (dh *DeviceHandler) configureUniTp(aUniID uint32, aPathString string, wg *sync.WaitGroup) {
- defer wg.Done()
- logger.Debugw("this would configure the Uni according to TpPath", log.Fields{
- "deviceID": dh.deviceID, "uniID": aUniID, "path": aPathString})
- //TODO!!!
- //this processing requires reading of the TechProfile config data from KV-Store,
- // to evaluate the configuration and to start the corresponding OMCI configuation of the UNI port
-}
-
-func (dh *DeviceHandler) updateOnuTpPathKvStore(wg *sync.WaitGroup) {
- defer wg.Done()
- logger.Debugw("this would update the ONU's TpPath in KVStore", log.Fields{
- "deviceID": dh.deviceID})
- //TODO!!!
- //make use of dh.sOnuPersistentData to store the TpPath to KVStore
-}
-
-// deleteTpRessource removes ressources from the ONU's specified Uni
-func (dh *DeviceHandler) deleteTpRessource(aUniID uint32, aPathString string,
- aRessource resourceEntry, aEntryID uint32, wg *sync.WaitGroup) {
- defer wg.Done()
- logger.Debugw("this would remove TP resources from ONU's UNI", log.Fields{
- "deviceID": dh.deviceID, "uniID": aUniID, "path": aPathString, "ressource": aRessource})
- //TODO!!!
-}
-
-func (dh *DeviceHandler) waitForTpCompletion(wg *sync.WaitGroup) {
- wg.Wait()
- logger.Debug("some TechProfile Processing completed")
- dh.tpProcMutex.Unlock() //allow further TP related processing
-}
-
-/* *********************************************************** */
-
-func genMacFromOctets(aOctets [6]uint8) string {
- return fmt.Sprintf("%02x:%02x:%02x:%02x:%02x:%02x",
- aOctets[5], aOctets[4], aOctets[3],
- aOctets[2], aOctets[1], aOctets[0])
-}
-
-//copied from OLT Adapter: unify centrally ?
-func macAddressToUint32Array(mac string) []uint32 {
- slist := strings.Split(mac, ":")
- result := make([]uint32, len(slist))
- var err error
- var tmp int64
- for index, val := range slist {
- if tmp, err = strconv.ParseInt(val, 16, 32); err != nil {
- return []uint32{1, 2, 3, 4, 5, 6}
- }
- result[index] = uint32(tmp)
- }
- return result
+ return kvbackend
}
diff --git a/internal/pkg/onuadaptercore/mib_sync.go b/internal/pkg/onuadaptercore/mib_sync.go
index 80470ed..88c8620 100644
--- a/internal/pkg/onuadaptercore/mib_sync.go
+++ b/internal/pkg/onuadaptercore/mib_sync.go
@@ -34,7 +34,6 @@
//"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
"github.com/opencord/omci-lib-go"
me "github.com/opencord/omci-lib-go/generated"
- "github.com/opencord/voltha-lib-go/v3/pkg/db"
"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
//ic "github.com/opencord/voltha-protos/v3/go/inter_container"
@@ -134,7 +133,7 @@
}
meStoredFromTemplate := false
- path := fmt.Sprintf(SuffixMibTemplateKvStore, onuDeviceEntry.vendorID, onuDeviceEntry.equipmentID, onuDeviceEntry.activeSwVersion)
+ path := fmt.Sprintf(cSuffixMibTemplateKvStore, onuDeviceEntry.vendorID, onuDeviceEntry.equipmentID, onuDeviceEntry.activeSwVersion)
logger.Debugw("MibSync FSM - MibTemplate - etcd search string", log.Fields{"path": path})
Value, err := onuDeviceEntry.mibTemplateKVStore.Get(context.TODO(), path)
if err == nil {
@@ -444,41 +443,6 @@
}
}
-func (onuDeviceEntry *OnuDeviceEntry) newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {
- logger.Infow("kv-store-type", log.Fields{"store": storeType})
- switch storeType {
- case "consul":
- return kvstore.NewConsulClient(address, timeout)
- case "etcd":
- return kvstore.NewEtcdClient(address, timeout, log.FatalLevel)
- }
- return nil, errors.New("unsupported-kv-store")
-}
-
-func (onuDeviceEntry *OnuDeviceEntry) SetKVClient(backend string, Host string, Port int, BasePathKvStore string) *db.Backend {
- logger.Debugw("SetKVClient with params:", log.Fields{"backend": backend, "Host": Host, "Port": Port,
- "BasePathKvStore": BasePathKvStore, "deviceId": onuDeviceEntry.deviceID})
-
- addr := Host + ":" + strconv.Itoa(Port)
- // TODO : Make sure direct call to NewBackend is working fine with backend , currently there is some
- // issue between kv store and backend , core is not calling NewBackend directly
- kvClient, err := onuDeviceEntry.newKVClient(backend, addr, KvstoreTimeout)
- if err != nil {
- logger.Fatalw("Failed to init KV client\n", log.Fields{"err": err})
- return nil
- }
-
- kvbackend := &db.Backend{
- Client: kvClient,
- StoreType: backend,
- Host: Host,
- Port: Port,
- Timeout: KvstoreTimeout,
- PathPrefix: BasePathKvStore}
-
- return kvbackend
-}
-
func IsSupportedClassId(meClassId me.ClassID) bool {
for _, v := range supportedClassIds {
if v == meClassId {
diff --git a/internal/pkg/onuadaptercore/onu_device_entry.go b/internal/pkg/onuadaptercore/onu_device_entry.go
index 4d31f77..9b453ce 100644
--- a/internal/pkg/onuadaptercore/onu_device_entry.go
+++ b/internal/pkg/onuadaptercore/onu_device_entry.go
@@ -36,9 +36,8 @@
)
const (
- KvstoreTimeout = 5 //in seconds
- BasePathMibTemplateKvStore = "service/voltha/omci_mibs/templates"
- SuffixMibTemplateKvStore = "%s/%s/%s"
+ cBasePathMibTemplateKvStore = "service/voltha/omci_mibs/templates"
+ cSuffixMibTemplateKvStore = "%s/%s/%s"
)
type OnuDeviceEvent int
@@ -286,9 +285,9 @@
// some specifc error treatment - or waiting for crash ???
}
- onuDeviceEntry.mibTemplateKVStore = onuDeviceEntry.SetKVClient(kvStoreType, kVStoreHost, kVStorePort, BasePathMibTemplateKvStore)
+ onuDeviceEntry.mibTemplateKVStore = onuDeviceEntry.baseDeviceHandler.SetBackend(cBasePathMibTemplateKvStore)
if onuDeviceEntry.mibTemplateKVStore == nil {
- logger.Error("Failed to setup mibTemplateKVStore")
+ logger.Errorw("Failed to setup mibTemplateKVStore", log.Fields{"deviceID": device_id})
}
// Alarm Synchronization Database
diff --git a/internal/pkg/onuadaptercore/onu_uni_port.go b/internal/pkg/onuadaptercore/onu_uni_port.go
index a888207..2deecc3 100644
--- a/internal/pkg/onuadaptercore/onu_uni_port.go
+++ b/internal/pkg/onuadaptercore/onu_uni_port.go
@@ -20,7 +20,9 @@
import (
"context"
"errors"
+ "fmt"
"strconv"
+ "strings"
//"sync"
//"time"
@@ -28,10 +30,8 @@
//"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
vc "github.com/opencord/voltha-protos/v3/go/common"
+ of "github.com/opencord/voltha-protos/v3/go/openflow_13"
"github.com/opencord/voltha-protos/v3/go/voltha"
- //ic "github.com/opencord/voltha-protos/v3/go/inter_container"
- //"github.com/opencord/voltha-protos/v3/go/openflow_13"
- //"github.com/opencord/voltha-protos/v3/go/voltha"
)
type UniPortType uint8
@@ -80,9 +80,37 @@
return &onuUniPort
}
-//creates the Voltha port based on ONU UNI Port
-func (oo *OnuUniPort) CreateVolthaPort(a_pDeviceHandler *DeviceHandler) error {
- logger.Debug("adding-uni-port")
+//CreateVolthaPort creates the Voltha port based on ONU UNI Port and informs the core about it
+func (oo *OnuUniPort) CreateVolthaPort(apDeviceHandler *DeviceHandler) error {
+ logger.Debugw("creating-voltha-uni-port", log.Fields{
+ "deviceID": apDeviceHandler.device.Id, "portNo": oo.portNo})
+ //200630: per [VOL-3202] OF port info is now to be delivered within UniPort create
+ // not doing so crashes rw_core processing (at least still in 200630 version)
+ name := apDeviceHandler.device.SerialNumber + "-" + strconv.FormatUint(uint64(oo.macBpNo), 10)
+ var macOctets [6]uint8
+ macOctets[5] = 0x08
+ //ponPortNumber was copied from device.ParentPortNo
+ macOctets[4] = uint8(apDeviceHandler.ponPortNumber >> 8)
+ macOctets[3] = uint8(apDeviceHandler.ponPortNumber)
+ macOctets[2] = uint8(oo.portNo >> 16)
+ macOctets[1] = uint8(oo.portNo >> 8)
+ macOctets[0] = uint8(oo.portNo)
+ hwAddr := genMacFromOctets(macOctets)
+ ofHwAddr := macAddressToUint32Array(hwAddr)
+ capacity := uint32(of.OfpPortFeatures_OFPPF_1GB_FD | of.OfpPortFeatures_OFPPF_FIBER)
+ ofUniPortState := of.OfpPortState_OFPPS_LINK_DOWN
+ /* as the VOLTHA port create is only called directly after Uni Port create
+ the OfPortOperState is always Down
+ Note: this way the OfPortOperState won't ever change (directly in adapter)
+ maybe that was already always the case, but looks a bit weird - to be kept in mind ...
+ if pUniPort.operState == vc.OperStatus_ACTIVE {
+ ofUniPortState = of.OfpPortState_OFPPS_LIVE
+ }
+ */
+ logger.Debugw("ofPort values", log.Fields{
+ "forUniPortName": oo.name, "forMacBase": hwAddr,
+ "name": name, "hwAddr": ofHwAddr, "OperState": ofUniPortState})
+
pUniPort := &voltha.Port{
PortNo: oo.portNo,
Label: oo.name,
@@ -90,25 +118,59 @@
AdminState: oo.adminState,
OperStatus: oo.operState,
// obviously empty peer setting
+ OfpPort: &of.OfpPort{
+ Name: name,
+ HwAddr: ofHwAddr,
+ Config: 0,
+ State: uint32(ofUniPortState),
+ Curr: capacity,
+ Advertised: capacity,
+ Peer: capacity,
+ CurrSpeed: uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
+ MaxSpeed: uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
+ },
}
if pUniPort != nil {
- if err := a_pDeviceHandler.coreProxy.PortCreated(context.TODO(),
- a_pDeviceHandler.deviceID, pUniPort); err != nil {
+ if err := apDeviceHandler.coreProxy.PortCreated(context.TODO(),
+ apDeviceHandler.deviceID, pUniPort); err != nil {
logger.Fatalf("adding-uni-port: create-VOLTHA-Port-failed-%s", err)
return err
}
- logger.Infow("Voltha onuUniPort-added", log.Fields{"for PortNo": oo.portNo})
+ logger.Infow("Voltha onuUniPort-added", log.Fields{
+ "deviceID": apDeviceHandler.device.Id, "PortNo": oo.portNo})
oo.pPort = pUniPort
oo.operState = vc.OperStatus_DISCOVERED
} else {
- logger.Warnw("could not create Voltha UniPort - nil pointer",
- log.Fields{"for PortNo": oo.portNo})
+ logger.Warnw("could not create Voltha UniPort", log.Fields{
+ "deviceID": apDeviceHandler.device.Id, "PortNo": oo.portNo})
return errors.New("create Voltha UniPort failed")
}
return nil
}
-//mofify OperState of the the UniPort
-func (oo *OnuUniPort) SetOperState(a_NewOperState vc.OperStatus_Types) {
- oo.operState = a_NewOperState
+//SetOperState modifies OperState of the the UniPort
+func (oo *OnuUniPort) SetOperState(aNewOperState vc.OperStatus_Types) {
+ oo.operState = aNewOperState
+}
+
+// uni port related utility functions (so far only used here)
+func genMacFromOctets(aOctets [6]uint8) string {
+ return fmt.Sprintf("%02x:%02x:%02x:%02x:%02x:%02x",
+ aOctets[5], aOctets[4], aOctets[3],
+ aOctets[2], aOctets[1], aOctets[0])
+}
+
+//copied from OLT Adapter: unify centrally ?
+func macAddressToUint32Array(mac string) []uint32 {
+ slist := strings.Split(mac, ":")
+ result := make([]uint32, len(slist))
+ var err error
+ var tmp int64
+ for index, val := range slist {
+ if tmp, err = strconv.ParseInt(val, 16, 32); err != nil {
+ return []uint32{1, 2, 3, 4, 5, 6}
+ }
+ result[index] = uint32(tmp)
+ }
+ return result
}
diff --git a/internal/pkg/onuadaptercore/onu_uni_tp.go b/internal/pkg/onuadaptercore/onu_uni_tp.go
new file mode 100644
index 0000000..1539b9a
--- /dev/null
+++ b/internal/pkg/onuadaptercore/onu_uni_tp.go
@@ -0,0 +1,203 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//Package adaptercoreonu provides the utility for onu devices, flows and statistics
+package adaptercoreonu
+
+import (
+ "context"
+ "encoding/json"
+ "sync"
+
+ "github.com/opencord/voltha-lib-go/v3/pkg/db"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ tp "github.com/opencord/voltha-lib-go/v3/pkg/techprofile"
+)
+
+const cBasePathTechProfileKVStore = "service/voltha/technology_profiles"
+
+type resourceEntry int
+
+const (
+ cResourceGemPort resourceEntry = 1
+ cResourceTcont resourceEntry = 2
+)
+
+type onuSerialNumber struct {
+ sliceVendorID []byte
+ sliceVendorSpecific []byte
+}
+
+type onuPersistentData struct {
+ persOnuID uint32
+ persIntfID uint32
+ persSnr onuSerialNumber
+ persAdminState string
+ persOperState string
+ persUniTpPath map[uint32]string
+ persUniTpData map[uint32]tp.TechProfile
+}
+
+//OnuUniTechProf structure holds information about the TechProfiles attached to Uni Ports of the ONU
+type OnuUniTechProf struct {
+ deviceID string
+ baseDeviceHandler *DeviceHandler
+ tpProcMutex sync.RWMutex
+ sOnuPersistentData onuPersistentData
+ techProfileKVStore *db.Backend
+}
+
+//NewOnuUniTechProf returns the instance of a OnuUniTechProf
+//(one instance per ONU/deviceHandler for all possible UNI's)
+func NewOnuUniTechProf(ctx context.Context, aDeviceID string, aDeviceHandler *DeviceHandler) *OnuUniTechProf {
+ logger.Infow("init-OnuUniTechProf", log.Fields{"deviceId": aDeviceID})
+ var onuTP OnuUniTechProf
+ onuTP.deviceID = aDeviceID
+ onuTP.baseDeviceHandler = aDeviceHandler
+ onuTP.tpProcMutex = sync.RWMutex{}
+ onuTP.sOnuPersistentData.persUniTpPath = make(map[uint32]string)
+ onuTP.sOnuPersistentData.persUniTpData = make(map[uint32]tp.TechProfile)
+
+ onuTP.techProfileKVStore = aDeviceHandler.SetBackend(cBasePathTechProfileKVStore)
+ if onuTP.techProfileKVStore == nil {
+ logger.Errorw("Can't access techProfileKVStore - no backend connection to service",
+ log.Fields{"deviceID": aDeviceID, "service": cBasePathTechProfileKVStore})
+ }
+ return &onuTP
+}
+
+// lockTpProcMutex locks OnuUniTechProf processing mutex
+func (onuTP *OnuUniTechProf) lockTpProcMutex() {
+ onuTP.tpProcMutex.Lock()
+}
+
+// unlockTpProcMutex unlocks OnuUniTechProf processing mutex
+func (onuTP *OnuUniTechProf) unlockTpProcMutex() {
+ onuTP.tpProcMutex.Unlock()
+}
+
+// updateOnuUniTpPath verifies and updates changes in the kvStore onuUniTpPath
+func (onuTP *OnuUniTechProf) updateOnuUniTpPath(aUniID uint32, aPathString string) bool {
+ /* within some specific InterAdapter processing request write/read access to data is ensured to be sequentially,
+ as also the complete sequence is ensured to 'run to completion' before some new request is accepted
+ no specific concurrency protection to sOnuPersistentData is required here
+ */
+ if existingPath, present := onuTP.sOnuPersistentData.persUniTpPath[aUniID]; present {
+ // uni entry already exists
+ //logger.Debugw(" already exists", log.Fields{"for InstanceId": a_uniInstNo})
+ if existingPath != aPathString {
+ if aPathString == "" {
+ //existing entry to be deleted
+ logger.Debugw("UniTp path delete", log.Fields{
+ "deviceID": onuTP.deviceID, "uniID": aUniID, "path": aPathString})
+ delete(onuTP.sOnuPersistentData.persUniTpPath, aUniID)
+ } else {
+ //existing entry to be modified
+ logger.Debugw("UniTp path modify", log.Fields{
+ "deviceID": onuTP.deviceID, "uniID": aUniID, "path": aPathString})
+ onuTP.sOnuPersistentData.persUniTpPath[aUniID] = aPathString
+ }
+ return true
+ }
+ //entry already exists
+ logger.Debugw("UniTp path already exists", log.Fields{
+ "deviceID": onuTP.deviceID, "uniID": aUniID, "path": aPathString})
+ return false
+ }
+ //uni entry does not exist
+ if aPathString == "" {
+ //delete request in non-existing state , accept as no change
+ logger.Debugw("UniTp path already removed", log.Fields{
+ "deviceID": onuTP.deviceID, "uniID": aUniID})
+ return false
+ }
+ //new entry to be set
+ logger.Debugw("New UniTp path set", log.Fields{
+ "deviceID": onuTP.deviceID, "uniID": aUniID, "path": aPathString})
+ onuTP.sOnuPersistentData.persUniTpPath[aUniID] = aPathString
+ return true
+}
+
+func (onuTP *OnuUniTechProf) configureUniTp(aUniID uint32, aPathString string, wg *sync.WaitGroup) {
+ defer wg.Done()
+ logger.Debugw("configure the Uni according to TpPath", log.Fields{
+ "deviceID": onuTP.deviceID, "uniID": aUniID, "path": aPathString})
+
+ //TODO!!!
+ // reaction on existing tp, deletion of tp, start the corresponding OMCI configuation of the UNI port
+
+ if onuTP.techProfileKVStore == nil {
+ logger.Debug("techProfileKVStore not set - abort")
+ return
+ }
+
+ Value, err := onuTP.techProfileKVStore.Get(context.TODO(), aPathString)
+ if err == nil {
+ if Value != nil {
+ logger.Debugw("tech-profile read",
+ log.Fields{"Key": Value.Key, "Value": Value.Value})
+ tpTmpBytes, _ := kvstore.ToByte(Value.Value)
+
+ var tpInst tp.TechProfile
+ if err = json.Unmarshal(tpTmpBytes, &tpInst); err != nil {
+ logger.Errorw("TechProf - Failed to unmarshal tech-profile into tpInst",
+ log.Fields{"error": err, "device-id": onuTP.deviceID})
+ } else {
+ logger.Debugw("TechProf - tpInst", log.Fields{"tpInst": tpInst})
+ onuTP.sOnuPersistentData.persUniTpData[aUniID] = tpInst
+
+ // access examples
+ logger.Debugw("TechProf - name",
+ log.Fields{"onuTP.sOnuPersistentData.persUniTpData[aUniID].Name": onuTP.sOnuPersistentData.persUniTpData[aUniID].Name})
+ //
+ logger.Debugw("TechProf - instance_control.max_gem_payload_size",
+ log.Fields{"onuTP.sOnuPersistentData.persUniTpData[aUniID].InstanceCtrl.MaxGemPayloadSize": onuTP.sOnuPersistentData.persUniTpData[aUniID].InstanceCtrl.MaxGemPayloadSize})
+ //
+ logger.Debugw("TechProf - downstream_gem_port_attribute_list.discard_config.max_threshold",
+ log.Fields{"onuTP.sOnuPersistentData.persUniTpData[aUniID].DownstreamGemPortAttributeList[0].DiscardConfig.MaxThreshold": onuTP.sOnuPersistentData.persUniTpData[aUniID].DownstreamGemPortAttributeList[0].DiscardConfig.MaxThreshold})
+ }
+ } else {
+ logger.Debugw("No tech-profile found", log.Fields{"path": aPathString, "device-id": onuTP.deviceID})
+ }
+ } else {
+ logger.Errorw("kvstore-get failed for path",
+ log.Fields{"path": aPathString, "device-id": onuTP.deviceID})
+ }
+}
+
+func (onuTP *OnuUniTechProf) updateOnuTpPathKvStore(wg *sync.WaitGroup) {
+ defer wg.Done()
+ logger.Debugw("this would update the ONU's TpPath in KVStore", log.Fields{
+ "deviceID": onuTP.deviceID})
+ //TODO!!!
+ //make use of onuTP.sOnuPersistentData to store the TpPath to KVStore
+}
+
+// deleteTpRessource removes ressources from the ONU's specified Uni
+func (onuTP *OnuUniTechProf) deleteTpRessource(aUniID uint32, aPathString string,
+ aRessource resourceEntry, aEntryID uint32, wg *sync.WaitGroup) {
+ defer wg.Done()
+ logger.Debugw("this would remove TP resources from ONU's UNI", log.Fields{
+ "deviceID": onuTP.deviceID, "uniID": aUniID, "path": aPathString, "ressource": aRessource})
+ //TODO!!!
+}
+
+func (onuTP *OnuUniTechProf) waitForTpCompletion(wg *sync.WaitGroup) {
+ wg.Wait()
+ logger.Debug("some TechProfile Processing completed")
+ onuTP.tpProcMutex.Unlock() //allow further TP related processing
+}
diff --git a/internal/pkg/onuadaptercore/openonu.go b/internal/pkg/onuadaptercore/openonu.go
index 253f901..1c72b4c 100644
--- a/internal/pkg/onuadaptercore/openonu.go
+++ b/internal/pkg/onuadaptercore/openonu.go
@@ -25,6 +25,7 @@
"time"
"github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
ic "github.com/opencord/voltha-protos/v3/go/inter_container"
@@ -41,11 +42,13 @@
adapterProxy adapterif.AdapterProxy
eventProxy adapterif.EventProxy
kafkaICProxy kafka.InterContainerProxy
+ kvClient kvstore.Client
config *config.AdapterFlags
numOnus int
KVStoreHost string
KVStorePort int
KVStoreType string
+ KVStoreTimeout time.Duration
exitChannel chan int
HeartbeatCheckInterval time.Duration
HeartbeatFailReportInterval time.Duration
@@ -57,7 +60,7 @@
//NewOpenONUAC returns a new instance of OpenONU_AC
func NewOpenONUAC(ctx context.Context, kafkaICProxy kafka.InterContainerProxy,
coreProxy adapterif.CoreProxy, adapterProxy adapterif.AdapterProxy,
- eventProxy adapterif.EventProxy, cfg *config.AdapterFlags) *OpenONUAC {
+ eventProxy adapterif.EventProxy, kvClient kvstore.Client, cfg *config.AdapterFlags) *OpenONUAC {
var openOnuAc OpenONUAC
openOnuAc.exitChannel = make(chan int, 1)
openOnuAc.deviceHandlers = make(map[string]*DeviceHandler)
@@ -67,9 +70,11 @@
openOnuAc.coreProxy = coreProxy
openOnuAc.adapterProxy = adapterProxy
openOnuAc.eventProxy = eventProxy
+ openOnuAc.kvClient = kvClient
openOnuAc.KVStoreHost = cfg.KVStoreHost
openOnuAc.KVStorePort = cfg.KVStorePort
openOnuAc.KVStoreType = cfg.KVStoreType
+ openOnuAc.KVStoreTimeout = cfg.KVStoreTimeout
openOnuAc.HeartbeatCheckInterval = cfg.HeartbeatCheckInterval
openOnuAc.HeartbeatFailReportInterval = cfg.HeartbeatFailReportInterval
//openOnuAc.GrpcTimeoutInterval = cfg.GrpcTimeoutInterval
@@ -166,19 +171,8 @@
}
//Get_ofp_port_info returns OFP port information for the given device
-func (oo *OpenONUAC) Get_ofp_port_info(device *voltha.Device, portNo int64) (*ic.PortCapability, error) {
- //this method expects a return value to be sent to the core
- // and internal processing should not take that long
- // so it makes no sense to try to work asynchronously here
- logger.Infow("get-ofp-port-info started", log.Fields{"deviceId": device.Id, "portNo": portNo})
- // basically the same code as in openOlt.go - unify???
- if handler := oo.getDeviceHandler(device.Id); handler != nil {
- return handler.GetOfpPortInfo(device, portNo)
- // error treatment might be more sophisticated, but indeed it would be logged within handler
- }
- return nil, fmt.Errorf(fmt.Sprintf("handler-not-found for deviceId %s", device.Id))
- //return nil, olterrors.NewErrNotFound("device-handler", log.Fields{"device-id": device.Id}, nil)
-}
+//200630: method removed as per [VOL-3202]: OF port info is now to be delivered within UniPort create
+// cmp changes in onu_uni_port.go::CreateVolthaPort()
//Process_inter_adapter_message sends messages to a target device (between adapters)
func (oo *OpenONUAC) Process_inter_adapter_message(msg *ic.InterAdapterMessage) error {