VOL-2909 - Disaggregating rw_core/core/.
This breaks the core package into logical components. (adapter manager, adapter proxy, devices, nbi/api), as well as the "core" which aggregates all these.
Change-Id: I257ac64024a1cf3efe3f5d89d508e60e6e681fb1
diff --git a/rw_core/core/adapter/agent.go b/rw_core/core/adapter/agent.go
new file mode 100644
index 0000000..2143a84
--- /dev/null
+++ b/rw_core/core/adapter/agent.go
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package adapter
+
+import (
+ "github.com/golang/protobuf/ptypes"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+ "sync"
+ "time"
+)
+
+// agent represents adapter agent
+type agent struct {
+ adapter *voltha.Adapter
+ lock sync.RWMutex
+}
+
+func newAdapterAgent(adapter *voltha.Adapter) *agent {
+ return &agent{
+ adapter: adapter,
+ }
+}
+
+func (aa *agent) getAdapter() *voltha.Adapter {
+ aa.lock.RLock()
+ defer aa.lock.RUnlock()
+ logger.Debugw("getAdapter", log.Fields{"adapter": aa.adapter})
+ return aa.adapter
+}
+
+// updateCommunicationTime updates the message to the specified time.
+// No attempt is made to save the time to the db, so only recent times are guaranteed to be accurate.
+func (aa *agent) updateCommunicationTime(new time.Time) {
+ // only update if new time is not in the future, and either the old time is invalid or new time > old time
+ if last, err := ptypes.Timestamp(aa.adapter.LastCommunication); !new.After(time.Now()) && (err != nil || new.After(last)) {
+ timestamp, err := ptypes.TimestampProto(new)
+ if err != nil {
+ return // if the new time cannot be encoded, just ignore it
+ }
+
+ aa.lock.Lock()
+ defer aa.lock.Unlock()
+ aa.adapter.LastCommunication = timestamp
+ }
+}
diff --git a/rw_core/core/adapter/common.go b/rw_core/core/adapter/common.go
new file mode 100644
index 0000000..d539a56
--- /dev/null
+++ b/rw_core/core/adapter/common.go
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package core Common Logger initialization
+package adapter
+
+import (
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+)
+
+var logger log.Logger
+
+func init() {
+ // Setup this package so that it's log level can be modified at run time
+ var err error
+ logger, err = log.AddPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "adapter"})
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/rw_core/core/adapter_manager.go b/rw_core/core/adapter/manager.go
similarity index 74%
rename from rw_core/core/adapter_manager.go
rename to rw_core/core/adapter/manager.go
index 889ad74..b6caea2 100644
--- a/rw_core/core/adapter_manager.go
+++ b/rw_core/core/adapter/manager.go
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package core
+package adapter
import (
"context"
@@ -22,7 +22,6 @@
"sync"
"time"
- "github.com/golang/protobuf/ptypes"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
"github.com/gogo/protobuf/proto"
@@ -32,75 +31,39 @@
"github.com/opencord/voltha-protos/v3/go/voltha"
)
-// AdapterAgent represents adapter agent
-type AdapterAgent struct {
- adapter *voltha.Adapter
- deviceTypes map[string]*voltha.DeviceType
- lock sync.RWMutex
-}
-
-func newAdapterAgent(adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) *AdapterAgent {
- var adapterAgent AdapterAgent
- adapterAgent.adapter = adapter
- adapterAgent.lock = sync.RWMutex{}
- adapterAgent.deviceTypes = make(map[string]*voltha.DeviceType)
- if deviceTypes != nil {
- for _, dType := range deviceTypes.Items {
- adapterAgent.deviceTypes[dType.Id] = dType
- }
- }
- return &adapterAgent
-}
-
-func (aa *AdapterAgent) getAdapter() *voltha.Adapter {
- aa.lock.RLock()
- defer aa.lock.RUnlock()
- logger.Debugw("getAdapter", log.Fields{"adapter": aa.adapter})
- return aa.adapter
-}
-
-// updateCommunicationTime updates the message to the specified time.
-// No attempt is made to save the time to the db, so only recent times are guaranteed to be accurate.
-func (aa *AdapterAgent) updateCommunicationTime(new time.Time) {
- // only update if new time is not in the future, and either the old time is invalid or new time > old time
- if last, err := ptypes.Timestamp(aa.adapter.LastCommunication); !new.After(time.Now()) && (err != nil || new.After(last)) {
- timestamp, err := ptypes.TimestampProto(new)
- if err != nil {
- return // if the new time cannot be encoded, just ignore it
- }
-
- aa.lock.Lock()
- defer aa.lock.Unlock()
- aa.adapter.LastCommunication = timestamp
- }
-}
-
-// AdapterManager represents adapter manager attributes
-type AdapterManager struct {
- adapterAgents map[string]*AdapterAgent
+// Manager represents adapter manager attributes
+type Manager struct {
+ adapterAgents map[string]*agent
deviceTypes map[string]*voltha.DeviceType
clusterDataProxy *model.Proxy
- deviceMgr *DeviceManager
+ onAdapterRestart adapterRestartedHandler
coreInstanceID string
exitChannel chan int
lockAdaptersMap sync.RWMutex
lockdDeviceTypeToAdapterMap sync.RWMutex
}
-func newAdapterManager(cdProxy *model.Proxy, coreInstanceID string, kafkaClient kafka.Client, deviceMgr *DeviceManager) *AdapterManager {
- aMgr := &AdapterManager{
+func NewAdapterManager(cdProxy *model.Proxy, coreInstanceID string, kafkaClient kafka.Client) *Manager {
+ aMgr := &Manager{
exitChannel: make(chan int, 1),
coreInstanceID: coreInstanceID,
clusterDataProxy: cdProxy,
deviceTypes: make(map[string]*voltha.DeviceType),
- adapterAgents: make(map[string]*AdapterAgent),
- deviceMgr: deviceMgr,
+ adapterAgents: make(map[string]*agent),
}
kafkaClient.SubscribeForMetadata(aMgr.updateLastAdapterCommunication)
return aMgr
}
-func (aMgr *AdapterManager) start(ctx context.Context) error {
+// an interface type for callbacks
+// if more than one callback is required, this should be converted to a proper interface
+type adapterRestartedHandler func(ctx context.Context, adapter *voltha.Adapter) error
+
+func (aMgr *Manager) SetAdapterRestartedCallback(onAdapterRestart adapterRestartedHandler) {
+ aMgr.onAdapterRestart = onAdapterRestart
+}
+
+func (aMgr *Manager) Start(ctx context.Context) error {
logger.Info("starting-adapter-manager")
// Load the existing adapterAgents and device types - this will also ensure the correct paths have been
@@ -117,7 +80,7 @@
}
//loadAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
-func (aMgr *AdapterManager) loadAdaptersAndDevicetypesInMemory() error {
+func (aMgr *Manager) loadAdaptersAndDevicetypesInMemory() error {
// Load the adapters
var adapters []*voltha.Adapter
if err := aMgr.clusterDataProxy.List(context.Background(), "adapters", &adapters); err != nil {
@@ -154,7 +117,7 @@
return nil
}
-func (aMgr *AdapterManager) updateLastAdapterCommunication(adapterID string, timestamp int64) {
+func (aMgr *Manager) updateLastAdapterCommunication(adapterID string, timestamp int64) {
aMgr.lockAdaptersMap.RLock()
adapterAgent, have := aMgr.adapterAgents[adapterID]
aMgr.lockAdaptersMap.RUnlock()
@@ -164,7 +127,7 @@
}
}
-func (aMgr *AdapterManager) addAdapter(adapter *voltha.Adapter, saveToDb bool) error {
+func (aMgr *Manager) addAdapter(adapter *voltha.Adapter, saveToDb bool) error {
aMgr.lockAdaptersMap.Lock()
defer aMgr.lockAdaptersMap.Unlock()
logger.Debugw("adding-adapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
@@ -191,12 +154,12 @@
}
}
clonedAdapter := (proto.Clone(adapter)).(*voltha.Adapter)
- aMgr.adapterAgents[adapter.Id] = newAdapterAgent(clonedAdapter, nil)
+ aMgr.adapterAgents[adapter.Id] = newAdapterAgent(clonedAdapter)
}
return nil
}
-func (aMgr *AdapterManager) addDeviceTypes(deviceTypes *voltha.DeviceTypes, saveToDb bool) error {
+func (aMgr *Manager) addDeviceTypes(deviceTypes *voltha.DeviceTypes, saveToDb bool) error {
if deviceTypes == nil {
return fmt.Errorf("no-device-type")
}
@@ -232,7 +195,7 @@
return nil
}
-func (aMgr *AdapterManager) listAdapters(ctx context.Context) (*voltha.Adapters, error) {
+func (aMgr *Manager) ListAdapters(ctx context.Context) (*voltha.Adapters, error) {
result := &voltha.Adapters{Items: []*voltha.Adapter{}}
aMgr.lockAdaptersMap.RLock()
defer aMgr.lockAdaptersMap.RUnlock()
@@ -244,7 +207,7 @@
return result, nil
}
-func (aMgr *AdapterManager) getAdapter(adapterID string) *voltha.Adapter {
+func (aMgr *Manager) getAdapter(adapterID string) *voltha.Adapter {
aMgr.lockAdaptersMap.RLock()
defer aMgr.lockAdaptersMap.RUnlock()
if adapterAgent, ok := aMgr.adapterAgents[adapterID]; ok {
@@ -253,14 +216,14 @@
return nil
}
-func (aMgr *AdapterManager) registerAdapter(adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) (*voltha.CoreInstance, error) {
- logger.Debugw("registerAdapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+func (aMgr *Manager) RegisterAdapter(adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) (*voltha.CoreInstance, error) {
+ logger.Debugw("RegisterAdapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint, "deviceTypes": deviceTypes.Items})
if aMgr.getAdapter(adapter.Id) != nil {
// Already registered - Adapter may have restarted. Trigger the reconcile process for that adapter
go func() {
- err := aMgr.deviceMgr.adapterRestarted(context.Background(), adapter)
+ err := aMgr.onAdapterRestart(context.Background(), adapter)
if err != nil {
logger.Errorw("unable-to-restart-adapter", log.Fields{"error": err})
}
@@ -283,8 +246,8 @@
return &voltha.CoreInstance{InstanceId: aMgr.coreInstanceID}, nil
}
-// getAdapterType returns the name of the device adapter that service this device type
-func (aMgr *AdapterManager) getAdapterType(deviceType string) (string, error) {
+// GetAdapterType returns the name of the device adapter that service this device type
+func (aMgr *Manager) GetAdapterType(deviceType string) (string, error) {
aMgr.lockdDeviceTypeToAdapterMap.Lock()
defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
for _, adapterAgent := range aMgr.adapterAgents {
@@ -295,7 +258,7 @@
return "", fmt.Errorf("Adapter-not-registered-for-device-type %s", deviceType)
}
-func (aMgr *AdapterManager) listDeviceTypes() []*voltha.DeviceType {
+func (aMgr *Manager) ListDeviceTypes() []*voltha.DeviceType {
aMgr.lockdDeviceTypeToAdapterMap.Lock()
defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
@@ -309,7 +272,7 @@
}
// getDeviceType returns the device type proto definition given the name of the device type
-func (aMgr *AdapterManager) getDeviceType(deviceType string) *voltha.DeviceType {
+func (aMgr *Manager) GetDeviceType(deviceType string) *voltha.DeviceType {
aMgr.lockdDeviceTypeToAdapterMap.Lock()
defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/api/adapter_request_handler.go
similarity index 94%
rename from rw_core/core/adapter_request_handler.go
rename to rw_core/core/api/adapter_request_handler.go
index 9ea44a1..7175a2b 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/api/adapter_request_handler.go
@@ -14,11 +14,13 @@
* limitations under the License.
*/
-package core
+package api
import (
"context"
"errors"
+ "github.com/opencord/voltha-go/rw_core/core/adapter"
+ "github.com/opencord/voltha-go/rw_core/core/device"
"time"
"github.com/golang/protobuf/ptypes"
@@ -33,25 +35,21 @@
// AdapterRequestHandlerProxy represent adapter request handler proxy attributes
type AdapterRequestHandlerProxy struct {
coreInstanceID string
- deviceMgr *DeviceManager
- lDeviceMgr *LogicalDeviceManager
- adapterMgr *AdapterManager
+ deviceMgr *device.Manager
+ adapterMgr *adapter.Manager
localDataProxy *model.Proxy
clusterDataProxy *model.Proxy
defaultRequestTimeout time.Duration
longRunningRequestTimeout time.Duration
- core *Core
}
// NewAdapterRequestHandlerProxy assigns values for adapter request handler proxy attributes and returns the new instance
-func NewAdapterRequestHandlerProxy(core *Core, coreInstanceID string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager,
- aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy, longRunningRequestTimeout time.Duration,
+func NewAdapterRequestHandlerProxy(coreInstanceID string, dMgr *device.Manager,
+ aMgr *adapter.Manager, cdProxy *model.Proxy, ldProxy *model.Proxy, longRunningRequestTimeout time.Duration,
defaultRequestTimeout time.Duration) *AdapterRequestHandlerProxy {
var proxy AdapterRequestHandlerProxy
- proxy.core = core
proxy.coreInstanceID = coreInstanceID
proxy.deviceMgr = dMgr
- proxy.lDeviceMgr = ldMgr
proxy.clusterDataProxy = cdProxy
proxy.localDataProxy = ldProxy
proxy.adapterMgr = aMgr
@@ -90,7 +88,7 @@
}
logger.Debugw("Register", log.Fields{"adapter": *adapter, "device-types": deviceTypes, "transaction-id": transactionID.Val, "core-id": rhp.coreInstanceID})
- return rhp.adapterMgr.registerAdapter(adapter, deviceTypes)
+ return rhp.adapterMgr.RegisterAdapter(adapter, deviceTypes)
}
// GetDevice returns device info
@@ -153,7 +151,7 @@
}
logger.Debugw("DeviceUpdate", log.Fields{"deviceID": device.Id, "transactionID": transactionID.Val})
- if err := rhp.deviceMgr.updateDeviceUsingAdapterData(context.TODO(), device); err != nil {
+ if err := rhp.deviceMgr.UpdateDeviceUsingAdapterData(context.TODO(), device); err != nil {
logger.Debugw("unable-to-update-device-using-adapter-data", log.Fields{"error": err})
return nil, err
}
@@ -267,7 +265,7 @@
}
logger.Debugw("GetPorts", log.Fields{"deviceID": deviceID.Id, "portype": pt.Val, "transactionID": transactionID.Val})
- return rhp.deviceMgr.getPorts(context.TODO(), deviceID.Id, voltha.Port_PortType(pt.Val))
+ return rhp.deviceMgr.GetPorts(context.TODO(), deviceID.Id, voltha.Port_PortType(pt.Val))
}
// GetChildDevices gets all the child device IDs from the device passed as parameter
@@ -296,7 +294,7 @@
}
logger.Debugw("GetChildDevices", log.Fields{"deviceID": pID.Id, "transactionID": transactionID.Val})
- return rhp.deviceMgr.getAllChildDevices(context.TODO(), pID.Id)
+ return rhp.deviceMgr.GetAllChildDevices(context.TODO(), pID.Id)
}
// ChildDeviceDetected is invoked when a child device is detected. The following parameters are expected:
@@ -364,7 +362,7 @@
"deviceType": dt.Val, "channelID": chnlID.Val, "serialNumber": serialNumber.Val,
"vendorID": vendorID.Val, "onuID": onuID.Val, "transactionID": transactionID.Val})
- device, err := rhp.deviceMgr.childDeviceDetected(context.TODO(), pID.Id, portNo.Val, dt.Val, chnlID.Val, vendorID.Val, serialNumber.Val, onuID.Val)
+ device, err := rhp.deviceMgr.ChildDeviceDetected(context.TODO(), pID.Id, portNo.Val, dt.Val, chnlID.Val, vendorID.Val, serialNumber.Val, onuID.Val)
if err != nil {
logger.Debugw("child-detection-failed", log.Fields{"parentID": pID.Id, "onuID": onuID.Val, "error": err})
}
@@ -409,7 +407,7 @@
logger.Debugw("DeviceStateUpdate", log.Fields{"deviceID": deviceID.Id, "oper-status": operStatus,
"conn-status": connStatus, "transactionID": transactionID.Val})
- if err := rhp.deviceMgr.updateDeviceStatus(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val),
+ if err := rhp.deviceMgr.UpdateDeviceStatus(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val),
voltha.ConnectStatus_Types(connStatus.Val)); err != nil {
logger.Debugw("unable-to-update-device-status", log.Fields{"error": err})
return nil, err
@@ -456,7 +454,7 @@
"conn-status": connStatus, "transactionID": transactionID.Val})
// When the enum is not set (i.e. -1), Go still convert to the Enum type with the value being -1
- if err := rhp.deviceMgr.updateChildrenStatus(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val),
+ if err := rhp.deviceMgr.UpdateChildrenStatus(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val),
voltha.ConnectStatus_Types(connStatus.Val)); err != nil {
logger.Debugw("unable-to-update-children-status", log.Fields{"error": err})
return nil, err
@@ -495,7 +493,7 @@
}
logger.Debugw("PortsStateUpdate", log.Fields{"deviceID": deviceID.Id, "operStatus": operStatus, "transactionID": transactionID.Val})
- if err := rhp.deviceMgr.updatePortsState(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val)); err != nil {
+ if err := rhp.deviceMgr.UpdatePortsState(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val)); err != nil {
logger.Debugw("unable-to-update-ports-state", log.Fields{"error": err})
return nil, err
}
@@ -546,7 +544,7 @@
logger.Debugw("PortStateUpdate", log.Fields{"deviceID": deviceID.Id, "operStatus": operStatus,
"portType": portType, "portNo": portNo, "transactionID": transactionID.Val})
- if err := rhp.deviceMgr.updatePortState(context.TODO(), deviceID.Id, voltha.Port_PortType(portType.Val), uint32(portNo.Val),
+ if err := rhp.deviceMgr.UpdatePortState(context.TODO(), deviceID.Id, voltha.Port_PortType(portType.Val), uint32(portNo.Val),
voltha.OperStatus_Types(operStatus.Val)); err != nil {
// If the error doesn't change behavior and is essentially ignored, it is not an error, it is a
// warning.
@@ -582,7 +580,7 @@
}
logger.Debugw("DeleteAllPorts", log.Fields{"deviceID": deviceID.Id, "transactionID": transactionID.Val})
- if err := rhp.deviceMgr.deleteAllPorts(context.TODO(), deviceID.Id); err != nil {
+ if err := rhp.deviceMgr.DeleteAllPorts(context.TODO(), deviceID.Id); err != nil {
logger.Debugw("unable-to-delete-ports", log.Fields{"error": err})
return nil, err
}
@@ -615,7 +613,7 @@
}
logger.Debugw("ChildDevicesLost", log.Fields{"deviceID": parentDeviceID.Id, "transactionID": transactionID.Val})
- if err := rhp.deviceMgr.childDevicesLost(context.TODO(), parentDeviceID.Id); err != nil {
+ if err := rhp.deviceMgr.ChildDevicesLost(context.TODO(), parentDeviceID.Id); err != nil {
logger.Debugw("unable-to-disable-child-devices", log.Fields{"error": err})
return nil, err
}
@@ -648,7 +646,7 @@
}
logger.Debugw("ChildDevicesDetected", log.Fields{"deviceID": parentDeviceID.Id, "transactionID": transactionID.Val})
- if err := rhp.deviceMgr.childDevicesDetected(context.TODO(), parentDeviceID.Id); err != nil {
+ if err := rhp.deviceMgr.ChildDevicesDetected(context.TODO(), parentDeviceID.Id); err != nil {
logger.Debugw("child-devices-detection-failed", log.Fields{"parentID": parentDeviceID.Id, "error": err})
return nil, err
}
@@ -686,7 +684,7 @@
}
logger.Debugw("PortCreated", log.Fields{"deviceID": deviceID.Id, "port": port, "transactionID": transactionID.Val})
- if err := rhp.deviceMgr.addPort(context.TODO(), deviceID.Id, port); err != nil {
+ if err := rhp.deviceMgr.AddPort(context.TODO(), deviceID.Id, port); err != nil {
logger.Debugw("unable-to-add-port", log.Fields{"error": err})
return nil, err
}
@@ -719,7 +717,7 @@
logger.Debugw("DevicePMConfigUpdate", log.Fields{"deviceID": pmConfigs.Id, "configs": pmConfigs,
"transactionID": transactionID.Val})
- if err := rhp.deviceMgr.initPmConfigs(context.TODO(), pmConfigs.Id, pmConfigs); err != nil {
+ if err := rhp.deviceMgr.InitPmConfigs(context.TODO(), pmConfigs.Id, pmConfigs); err != nil {
logger.Debugw("unable-to-initialize-pm-configs", log.Fields{"error": err})
return nil, err
}
@@ -804,7 +802,7 @@
logger.Debugw("UpdateImageDownload", log.Fields{"deviceID": deviceID.Id, "image-download": img,
"transactionID": transactionID.Val})
- if err := rhp.deviceMgr.updateImageDownload(context.TODO(), deviceID.Id, img); err != nil {
+ if err := rhp.deviceMgr.UpdateImageDownload(context.TODO(), deviceID.Id, img); err != nil {
logger.Debugw("unable-to-update-image-download", log.Fields{"error": err})
return nil, err
}
@@ -836,7 +834,7 @@
}
logger.Debugw("ReconcileChildDevices", log.Fields{"deviceID": parentDeviceID.Id, "transactionID": transactionID.Val})
- if err := rhp.deviceMgr.reconcileChildDevices(context.TODO(), parentDeviceID.Id); err != nil {
+ if err := rhp.deviceMgr.ReconcileChildDevices(context.TODO(), parentDeviceID.Id); err != nil {
logger.Debugw("unable-to-reconcile-child-devices", log.Fields{"error": err})
return nil, err
}
@@ -875,7 +873,7 @@
logger.Debugw("DeviceReasonUpdate", log.Fields{"deviceId": deviceID.Id, "reason": reason.Val,
"transactionID": transactionID.Val})
- if err := rhp.deviceMgr.updateDeviceReason(context.TODO(), deviceID.Id, reason.Val); err != nil {
+ if err := rhp.deviceMgr.UpdateDeviceReason(context.TODO(), deviceID.Id, reason.Val); err != nil {
logger.Debugw("unable-to-update-device-reason", log.Fields{"error": err})
return nil, err
diff --git a/rw_core/core/api/common.go b/rw_core/core/api/common.go
new file mode 100644
index 0000000..fd2910a
--- /dev/null
+++ b/rw_core/core/api/common.go
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package core Common Logger initialization
+package api
+
+import (
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+)
+
+var logger log.Logger
+
+func init() {
+ // Setup this package so that it's log level can be modified at run time
+ var err error
+ logger, err = log.AddPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "nbi"})
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/rw_core/core/common_test.go b/rw_core/core/api/common_test.go
similarity index 96%
rename from rw_core/core/common_test.go
rename to rw_core/core/api/common_test.go
index 7591c18..f6f07f5 100644
--- a/rw_core/core/common_test.go
+++ b/rw_core/core/api/common_test.go
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package core
+package api
import (
"context"
@@ -138,7 +138,7 @@
func waitUntilDeviceReadiness(deviceID string,
timeout time.Duration,
verificationFunction isDeviceConditionSatisfied,
- nbi *APIHandler) error {
+ nbi *NBIHandler) error {
ch := make(chan int, 1)
done := false
go func() {
@@ -167,7 +167,7 @@
func waitUntilLogicalDeviceReadiness(oltDeviceID string,
timeout time.Duration,
- nbi *APIHandler,
+ nbi *NBIHandler,
verificationFunction isLogicalDeviceConditionSatisfied,
) error {
ch := make(chan int, 1)
@@ -208,7 +208,7 @@
}
}
-func waitUntilConditionForDevices(timeout time.Duration, nbi *APIHandler, verificationFunction isDevicesConditionSatisfied) error {
+func waitUntilConditionForDevices(timeout time.Duration, nbi *NBIHandler, verificationFunction isDevicesConditionSatisfied) error {
ch := make(chan int, 1)
done := false
go func() {
@@ -236,7 +236,7 @@
}
}
-func waitUntilConditionForLogicalDevices(timeout time.Duration, nbi *APIHandler, verificationFunction isLogicalDevicesConditionSatisfied) error {
+func waitUntilConditionForLogicalDevices(timeout time.Duration, nbi *NBIHandler, verificationFunction isLogicalDevicesConditionSatisfied) error {
ch := make(chan int, 1)
done := false
go func() {
@@ -264,7 +264,7 @@
}
}
-func waitUntilCondition(timeout time.Duration, nbi *APIHandler, verificationFunction isConditionSatisfied) error {
+func waitUntilCondition(timeout time.Duration, nbi *NBIHandler, verificationFunction isConditionSatisfied) error {
ch := make(chan int, 1)
done := false
go func() {
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/api/grpc_nbi_handler.go
similarity index 75%
rename from rw_core/core/grpc_nbi_api_handler.go
rename to rw_core/core/api/grpc_nbi_handler.go
index 30ea5ed..1c296dd 100755
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/api/grpc_nbi_handler.go
@@ -14,19 +14,17 @@
* limitations under the License.
*/
-package core
+package api
import (
"context"
"encoding/hex"
"encoding/json"
"errors"
- "io"
- "sync"
- "time"
-
"github.com/golang/protobuf/ptypes/empty"
da "github.com/opencord/voltha-go/common/core/northbound/grpc"
+ "github.com/opencord/voltha-go/rw_core/core/adapter"
+ "github.com/opencord/voltha-go/rw_core/core/device"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
"github.com/opencord/voltha-lib-go/v3/pkg/version"
"github.com/opencord/voltha-protos/v3/go/common"
@@ -35,6 +33,8 @@
"github.com/opencord/voltha-protos/v3/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ "io"
+ "sync"
)
// Image related constants
@@ -45,38 +45,29 @@
RevertImage = iota
)
-// APIHandler represent attributes of API handler
-type APIHandler struct {
- deviceMgr *DeviceManager
- logicalDeviceMgr *LogicalDeviceManager
- adapterMgr *AdapterManager
- packetInQueue chan openflow_13.PacketIn
- changeEventQueue chan openflow_13.ChangeEvent
- packetInQueueDone chan bool
- changeEventQueueDone chan bool
- coreInCompetingMode bool
- longRunningRequestTimeout time.Duration
- defaultRequestTimeout time.Duration
+// NBIHandler represent attributes of API handler
+type NBIHandler struct {
+ deviceMgr *device.Manager
+ logicalDeviceMgr *device.LogicalManager
+ adapterMgr *adapter.Manager
+ packetInQueue chan openflow_13.PacketIn
+ changeEventQueue chan openflow_13.ChangeEvent
+ packetInQueueDone chan bool
+ changeEventQueueDone chan bool
da.DefaultAPIHandler
- core *Core
}
// NewAPIHandler creates API handler instance
-func NewAPIHandler(core *Core) *APIHandler {
- handler := &APIHandler{
- deviceMgr: core.deviceMgr,
- logicalDeviceMgr: core.logicalDeviceMgr,
- adapterMgr: core.adapterMgr,
- coreInCompetingMode: core.config.InCompetingMode,
- longRunningRequestTimeout: core.config.LongRunningRequestTimeout,
- defaultRequestTimeout: core.config.DefaultRequestTimeout,
- packetInQueue: make(chan openflow_13.PacketIn, 100),
- changeEventQueue: make(chan openflow_13.ChangeEvent, 100),
- packetInQueueDone: make(chan bool, 1),
- changeEventQueueDone: make(chan bool, 1),
- core: core,
+func NewAPIHandler(deviceMgr *device.Manager, logicalDeviceMgr *device.LogicalManager, adapterMgr *adapter.Manager) *NBIHandler {
+ return &NBIHandler{
+ deviceMgr: deviceMgr,
+ logicalDeviceMgr: logicalDeviceMgr,
+ adapterMgr: adapterMgr,
+ packetInQueue: make(chan openflow_13.PacketIn, 100),
+ changeEventQueue: make(chan openflow_13.ChangeEvent, 100),
+ packetInQueueDone: make(chan bool, 1),
+ changeEventQueueDone: make(chan bool, 1),
}
- return handler
}
// waitForNilResponseOnSuccess is a helper function to wait for a response on channel monitorCh where an nil
@@ -100,67 +91,67 @@
}
// ListCoreInstances returns details on the running core containers
-func (handler *APIHandler) ListCoreInstances(ctx context.Context, empty *empty.Empty) (*voltha.CoreInstances, error) {
+func (handler *NBIHandler) ListCoreInstances(ctx context.Context, empty *empty.Empty) (*voltha.CoreInstances, error) {
logger.Debug("ListCoreInstances")
// TODO: unused stub
return &voltha.CoreInstances{}, status.Errorf(codes.NotFound, "no-core-instances")
}
// GetCoreInstance returns the details of a specific core container
-func (handler *APIHandler) GetCoreInstance(ctx context.Context, id *voltha.ID) (*voltha.CoreInstance, error) {
+func (handler *NBIHandler) GetCoreInstance(ctx context.Context, id *voltha.ID) (*voltha.CoreInstance, error) {
logger.Debugw("GetCoreInstance", log.Fields{"id": id})
//TODO: unused stub
return &voltha.CoreInstance{}, status.Errorf(codes.NotFound, "core-instance-%s", id.Id)
}
// GetLogicalDevicePort returns logical device port details
-func (handler *APIHandler) GetLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*voltha.LogicalPort, error) {
+func (handler *NBIHandler) GetLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*voltha.LogicalPort, error) {
logger.Debugw("GetLogicalDevicePort-request", log.Fields{"id": *id})
- return handler.logicalDeviceMgr.getLogicalPort(ctx, id)
+ return handler.logicalDeviceMgr.GetLogicalPort(ctx, id)
}
// EnableLogicalDevicePort enables logical device port
-func (handler *APIHandler) EnableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
+func (handler *NBIHandler) EnableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
logger.Debugw("EnableLogicalDevicePort-request", log.Fields{"id": id, "test": common.TestModeKeys_api_test.String()})
ch := make(chan interface{})
defer close(ch)
- go handler.logicalDeviceMgr.enableLogicalPort(ctx, id, ch)
+ go handler.logicalDeviceMgr.EnableLogicalPort(ctx, id, ch)
return waitForNilResponseOnSuccess(ctx, ch)
}
// DisableLogicalDevicePort disables logical device port
-func (handler *APIHandler) DisableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
+func (handler *NBIHandler) DisableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
logger.Debugw("DisableLogicalDevicePort-request", log.Fields{"id": id, "test": common.TestModeKeys_api_test.String()})
ch := make(chan interface{})
defer close(ch)
- go handler.logicalDeviceMgr.disableLogicalPort(ctx, id, ch)
+ go handler.logicalDeviceMgr.DisableLogicalPort(ctx, id, ch)
return waitForNilResponseOnSuccess(ctx, ch)
}
// UpdateLogicalDeviceFlowTable updates logical device flow table
-func (handler *APIHandler) UpdateLogicalDeviceFlowTable(ctx context.Context, flow *openflow_13.FlowTableUpdate) (*empty.Empty, error) {
+func (handler *NBIHandler) UpdateLogicalDeviceFlowTable(ctx context.Context, flow *openflow_13.FlowTableUpdate) (*empty.Empty, error) {
logger.Debugw("UpdateLogicalDeviceFlowTable-request", log.Fields{"flow": flow, "test": common.TestModeKeys_api_test.String()})
ch := make(chan interface{})
defer close(ch)
- go handler.logicalDeviceMgr.updateFlowTable(ctx, flow.Id, flow.FlowMod, ch)
+ go handler.logicalDeviceMgr.UpdateFlowTable(ctx, flow.Id, flow.FlowMod, ch)
return waitForNilResponseOnSuccess(ctx, ch)
}
// UpdateLogicalDeviceFlowGroupTable updates logical device flow group table
-func (handler *APIHandler) UpdateLogicalDeviceFlowGroupTable(ctx context.Context, flow *openflow_13.FlowGroupTableUpdate) (*empty.Empty, error) {
+func (handler *NBIHandler) UpdateLogicalDeviceFlowGroupTable(ctx context.Context, flow *openflow_13.FlowGroupTableUpdate) (*empty.Empty, error) {
logger.Debugw("UpdateLogicalDeviceFlowGroupTable-request", log.Fields{"flow": flow, "test": common.TestModeKeys_api_test.String()})
ch := make(chan interface{})
defer close(ch)
- go handler.logicalDeviceMgr.updateGroupTable(ctx, flow.Id, flow.GroupMod, ch)
+ go handler.logicalDeviceMgr.UpdateGroupTable(ctx, flow.Id, flow.GroupMod, ch)
return waitForNilResponseOnSuccess(ctx, ch)
}
// GetDevice must be implemented in the read-only containers - should it also be implemented here?
-func (handler *APIHandler) GetDevice(ctx context.Context, id *voltha.ID) (*voltha.Device, error) {
+func (handler *NBIHandler) GetDevice(ctx context.Context, id *voltha.ID) (*voltha.Device, error) {
logger.Debugw("GetDevice-request", log.Fields{"id": id})
return handler.deviceMgr.GetDevice(ctx, id.Id)
}
@@ -168,7 +159,7 @@
// GetDevice must be implemented in the read-only containers - should it also be implemented here?
// ListDevices retrieves the latest devices from the data model
-func (handler *APIHandler) ListDevices(ctx context.Context, empty *empty.Empty) (*voltha.Devices, error) {
+func (handler *NBIHandler) ListDevices(ctx context.Context, empty *empty.Empty) (*voltha.Devices, error) {
logger.Debug("ListDevices")
devices, err := handler.deviceMgr.ListDevices(ctx)
if err != nil {
@@ -179,13 +170,13 @@
}
// ListDeviceIds returns the list of device ids managed by a voltha core
-func (handler *APIHandler) ListDeviceIds(ctx context.Context, empty *empty.Empty) (*voltha.IDs, error) {
+func (handler *NBIHandler) ListDeviceIds(ctx context.Context, empty *empty.Empty) (*voltha.IDs, error) {
logger.Debug("ListDeviceIDs")
return handler.deviceMgr.ListDeviceIds()
}
//ReconcileDevices is a request to a voltha core to managed a list of devices based on their IDs
-func (handler *APIHandler) ReconcileDevices(ctx context.Context, ids *voltha.IDs) (*empty.Empty, error) {
+func (handler *NBIHandler) ReconcileDevices(ctx context.Context, ids *voltha.IDs) (*empty.Empty, error) {
logger.Debug("ReconcileDevices")
ch := make(chan interface{})
@@ -195,43 +186,43 @@
}
// GetLogicalDevice provides a cloned most up to date logical device
-func (handler *APIHandler) GetLogicalDevice(ctx context.Context, id *voltha.ID) (*voltha.LogicalDevice, error) {
+func (handler *NBIHandler) GetLogicalDevice(ctx context.Context, id *voltha.ID) (*voltha.LogicalDevice, error) {
logger.Debugw("GetLogicalDevice-request", log.Fields{"id": id})
- return handler.logicalDeviceMgr.getLogicalDevice(ctx, id.Id)
+ return handler.logicalDeviceMgr.GetLogicalDevice(ctx, id.Id)
}
// ListLogicalDevices returns the list of all logical devices
-func (handler *APIHandler) ListLogicalDevices(ctx context.Context, empty *empty.Empty) (*voltha.LogicalDevices, error) {
+func (handler *NBIHandler) ListLogicalDevices(ctx context.Context, empty *empty.Empty) (*voltha.LogicalDevices, error) {
logger.Debug("ListLogicalDevices-request")
- return handler.logicalDeviceMgr.listLogicalDevices(ctx)
+ return handler.logicalDeviceMgr.ListLogicalDevices(ctx)
}
// ListAdapters returns the contents of all adapters known to the system
-func (handler *APIHandler) ListAdapters(ctx context.Context, empty *empty.Empty) (*voltha.Adapters, error) {
+func (handler *NBIHandler) ListAdapters(ctx context.Context, empty *empty.Empty) (*voltha.Adapters, error) {
logger.Debug("ListAdapters")
- return handler.adapterMgr.listAdapters(ctx)
+ return handler.adapterMgr.ListAdapters(ctx)
}
// ListLogicalDeviceFlows returns the flows of logical device
-func (handler *APIHandler) ListLogicalDeviceFlows(ctx context.Context, id *voltha.ID) (*openflow_13.Flows, error) {
+func (handler *NBIHandler) ListLogicalDeviceFlows(ctx context.Context, id *voltha.ID) (*openflow_13.Flows, error) {
logger.Debugw("ListLogicalDeviceFlows", log.Fields{"id": *id})
return handler.logicalDeviceMgr.ListLogicalDeviceFlows(ctx, id.Id)
}
// ListLogicalDeviceFlowGroups returns logical device flow groups
-func (handler *APIHandler) ListLogicalDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*openflow_13.FlowGroups, error) {
+func (handler *NBIHandler) ListLogicalDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*openflow_13.FlowGroups, error) {
logger.Debugw("ListLogicalDeviceFlowGroups", log.Fields{"id": *id})
return handler.logicalDeviceMgr.ListLogicalDeviceFlowGroups(ctx, id.Id)
}
// ListLogicalDevicePorts returns ports of logical device
-func (handler *APIHandler) ListLogicalDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.LogicalPorts, error) {
+func (handler *NBIHandler) ListLogicalDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.LogicalPorts, error) {
logger.Debugw("ListLogicalDevicePorts", log.Fields{"logicaldeviceid": id})
return handler.logicalDeviceMgr.ListLogicalDevicePorts(ctx, id.Id)
}
// CreateDevice creates a new parent device in the data model
-func (handler *APIHandler) CreateDevice(ctx context.Context, device *voltha.Device) (*voltha.Device, error) {
+func (handler *NBIHandler) CreateDevice(ctx context.Context, device *voltha.Device) (*voltha.Device, error) {
if device.MacAddress == "" && device.GetHostAndPort() == "" {
logger.Errorf("No Device Info Present")
return &voltha.Device{}, errors.New("no-device-info-present; MAC or HOSTIP&PORT")
@@ -240,7 +231,7 @@
ch := make(chan interface{})
defer close(ch)
- go handler.deviceMgr.createDevice(ctx, device, ch)
+ go handler.deviceMgr.CreateDevice(ctx, device, ch)
select {
case res := <-ch:
if res != nil {
@@ -262,47 +253,47 @@
}
// EnableDevice activates a device by invoking the adopt_device API on the appropriate adapter
-func (handler *APIHandler) EnableDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
+func (handler *NBIHandler) EnableDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
logger.Debugw("enabledevice", log.Fields{"id": id})
ch := make(chan interface{})
defer close(ch)
- go handler.deviceMgr.enableDevice(ctx, id, ch)
+ go handler.deviceMgr.EnableDevice(ctx, id, ch)
return waitForNilResponseOnSuccess(ctx, ch)
}
// DisableDevice disables a device along with any child device it may have
-func (handler *APIHandler) DisableDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
+func (handler *NBIHandler) DisableDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
logger.Debugw("disabledevice-request", log.Fields{"id": id})
ch := make(chan interface{})
defer close(ch)
- go handler.deviceMgr.disableDevice(ctx, id, ch)
+ go handler.deviceMgr.DisableDevice(ctx, id, ch)
return waitForNilResponseOnSuccess(ctx, ch)
}
//RebootDevice invoked the reboot API to the corresponding adapter
-func (handler *APIHandler) RebootDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
+func (handler *NBIHandler) RebootDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
logger.Debugw("rebootDevice-request", log.Fields{"id": id})
ch := make(chan interface{})
defer close(ch)
- go handler.deviceMgr.rebootDevice(ctx, id, ch)
+ go handler.deviceMgr.RebootDevice(ctx, id, ch)
return waitForNilResponseOnSuccess(ctx, ch)
}
// DeleteDevice removes a device from the data model
-func (handler *APIHandler) DeleteDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
+func (handler *NBIHandler) DeleteDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
logger.Debugw("deletedevice-request", log.Fields{"id": id})
ch := make(chan interface{})
defer close(ch)
- go handler.deviceMgr.deleteDevice(ctx, id, ch)
+ go handler.deviceMgr.DeleteDevice(ctx, id, ch)
return waitForNilResponseOnSuccess(ctx, ch)
}
// ListDevicePorts returns the ports details for a specific device entry
-func (handler *APIHandler) ListDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.Ports, error) {
+func (handler *NBIHandler) ListDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.Ports, error) {
logger.Debugw("listdeviceports-request", log.Fields{"id": id})
device, err := handler.deviceMgr.GetDevice(ctx, id.Id)
if err != nil {
@@ -314,7 +305,7 @@
}
// ListDeviceFlows returns the flow details for a specific device entry
-func (handler *APIHandler) ListDeviceFlows(ctx context.Context, id *voltha.ID) (*openflow_13.Flows, error) {
+func (handler *NBIHandler) ListDeviceFlows(ctx context.Context, id *voltha.ID) (*openflow_13.Flows, error) {
logger.Debugw("listdeviceflows-request", log.Fields{"id": id})
device, err := handler.deviceMgr.GetDevice(ctx, id.Id)
@@ -327,7 +318,7 @@
}
// ListDeviceFlowGroups returns the flow group details for a specific device entry
-func (handler *APIHandler) ListDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*voltha.FlowGroups, error) {
+func (handler *NBIHandler) ListDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*voltha.FlowGroups, error) {
logger.Debugw("ListDeviceFlowGroups", log.Fields{"deviceid": id})
if device, _ := handler.deviceMgr.GetDevice(ctx, id.Id); device != nil {
@@ -337,36 +328,36 @@
}
// ListDeviceGroups returns all the device groups known to the system
-func (handler *APIHandler) ListDeviceGroups(ctx context.Context, empty *empty.Empty) (*voltha.DeviceGroups, error) {
+func (handler *NBIHandler) ListDeviceGroups(ctx context.Context, empty *empty.Empty) (*voltha.DeviceGroups, error) {
logger.Debug("ListDeviceGroups")
return &voltha.DeviceGroups{}, errors.New("UnImplemented")
}
// GetDeviceGroup returns a specific device group entry
-func (handler *APIHandler) GetDeviceGroup(ctx context.Context, id *voltha.ID) (*voltha.DeviceGroup, error) {
+func (handler *NBIHandler) GetDeviceGroup(ctx context.Context, id *voltha.ID) (*voltha.DeviceGroup, error) {
logger.Debug("GetDeviceGroup")
return &voltha.DeviceGroup{}, errors.New("UnImplemented")
}
// ListDeviceTypes returns all the device types known to the system
-func (handler *APIHandler) ListDeviceTypes(ctx context.Context, _ *empty.Empty) (*voltha.DeviceTypes, error) {
+func (handler *NBIHandler) ListDeviceTypes(ctx context.Context, _ *empty.Empty) (*voltha.DeviceTypes, error) {
logger.Debug("ListDeviceTypes")
- return &voltha.DeviceTypes{Items: handler.adapterMgr.listDeviceTypes()}, nil
+ return &voltha.DeviceTypes{Items: handler.adapterMgr.ListDeviceTypes()}, nil
}
// GetDeviceType returns the device type for a specific device entry
-func (handler *APIHandler) GetDeviceType(ctx context.Context, id *voltha.ID) (*voltha.DeviceType, error) {
+func (handler *NBIHandler) GetDeviceType(ctx context.Context, id *voltha.ID) (*voltha.DeviceType, error) {
logger.Debugw("GetDeviceType", log.Fields{"typeid": id})
- if deviceType := handler.adapterMgr.getDeviceType(id.Id); deviceType != nil {
+ if deviceType := handler.adapterMgr.GetDeviceType(id.Id); deviceType != nil {
return deviceType, nil
}
return &voltha.DeviceType{}, status.Errorf(codes.NotFound, "device_type-%s", id.Id)
}
// GetVoltha returns the contents of all components (i.e. devices, logical_devices, ...)
-func (handler *APIHandler) GetVoltha(ctx context.Context, empty *empty.Empty) (*voltha.Voltha, error) {
+func (handler *NBIHandler) GetVoltha(ctx context.Context, empty *empty.Empty) (*voltha.Voltha, error) {
logger.Debug("GetVoltha")
/*
@@ -389,7 +380,7 @@
}
// processImageRequest is a helper method to execute an image download request
-func (handler *APIHandler) processImageRequest(ctx context.Context, img *voltha.ImageDownload, requestType int) (*common.OperationResp, error) {
+func (handler *NBIHandler) processImageRequest(ctx context.Context, img *voltha.ImageDownload, requestType int) (*common.OperationResp, error) {
logger.Debugw("processImageDownload", log.Fields{"img": *img, "requestType": requestType})
failedresponse := &common.OperationResp{Code: voltha.OperationResp_OPERATION_FAILURE}
@@ -398,13 +389,13 @@
defer close(ch)
switch requestType {
case ImageDownload:
- go handler.deviceMgr.downloadImage(ctx, img, ch)
+ go handler.deviceMgr.DownloadImage(ctx, img, ch)
case CancelImageDownload:
- go handler.deviceMgr.cancelImageDownload(ctx, img, ch)
+ go handler.deviceMgr.CancelImageDownload(ctx, img, ch)
case ActivateImage:
- go handler.deviceMgr.activateImage(ctx, img, ch)
+ go handler.deviceMgr.ActivateImage(ctx, img, ch)
case RevertImage:
- go handler.deviceMgr.revertImage(ctx, img, ch)
+ go handler.deviceMgr.RevertImage(ctx, img, ch)
default:
logger.Warn("invalid-request-type", log.Fields{"requestType": requestType})
return failedresponse, status.Errorf(codes.InvalidArgument, "%d", requestType)
@@ -428,39 +419,39 @@
}
// DownloadImage execute an image download request
-func (handler *APIHandler) DownloadImage(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
+func (handler *NBIHandler) DownloadImage(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
logger.Debugw("DownloadImage-request", log.Fields{"img": *img})
return handler.processImageRequest(ctx, img, ImageDownload)
}
// CancelImageDownload cancels image download request
-func (handler *APIHandler) CancelImageDownload(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
+func (handler *NBIHandler) CancelImageDownload(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
logger.Debugw("cancelImageDownload-request", log.Fields{"img": *img})
return handler.processImageRequest(ctx, img, CancelImageDownload)
}
// ActivateImageUpdate activates image update request
-func (handler *APIHandler) ActivateImageUpdate(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
+func (handler *NBIHandler) ActivateImageUpdate(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
logger.Debugw("activateImageUpdate-request", log.Fields{"img": *img})
return handler.processImageRequest(ctx, img, ActivateImage)
}
// RevertImageUpdate reverts image update
-func (handler *APIHandler) RevertImageUpdate(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
+func (handler *NBIHandler) RevertImageUpdate(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
logger.Debugw("revertImageUpdate-request", log.Fields{"img": *img})
return handler.processImageRequest(ctx, img, RevertImage)
}
// GetImageDownloadStatus returns status of image download
-func (handler *APIHandler) GetImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+func (handler *NBIHandler) GetImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
logger.Debugw("getImageDownloadStatus-request", log.Fields{"img": *img})
failedresponse := &voltha.ImageDownload{DownloadState: voltha.ImageDownload_DOWNLOAD_UNKNOWN}
ch := make(chan interface{})
defer close(ch)
- go handler.deviceMgr.getImageDownloadStatus(ctx, img, ch)
+ go handler.deviceMgr.GetImageDownloadStatus(ctx, img, ch)
select {
case res := <-ch:
@@ -481,10 +472,10 @@
}
// GetImageDownload returns image download
-func (handler *APIHandler) GetImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+func (handler *NBIHandler) GetImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
logger.Debugw("GetImageDownload-request", log.Fields{"img": *img})
- download, err := handler.deviceMgr.getImageDownload(ctx, img)
+ download, err := handler.deviceMgr.GetImageDownload(ctx, img)
if err != nil {
return &voltha.ImageDownload{DownloadState: voltha.ImageDownload_DOWNLOAD_UNKNOWN}, err
}
@@ -492,10 +483,10 @@
}
// ListImageDownloads returns image downloads
-func (handler *APIHandler) ListImageDownloads(ctx context.Context, id *voltha.ID) (*voltha.ImageDownloads, error) {
+func (handler *NBIHandler) ListImageDownloads(ctx context.Context, id *voltha.ID) (*voltha.ImageDownloads, error) {
logger.Debugw("ListImageDownloads-request", log.Fields{"deviceId": id.Id})
- downloads, err := handler.deviceMgr.listImageDownloads(ctx, id.Id)
+ downloads, err := handler.deviceMgr.ListImageDownloads(ctx, id.Id)
if err != nil {
failedResp := &voltha.ImageDownloads{
Items: []*voltha.ImageDownload{
@@ -508,7 +499,7 @@
}
// GetImages returns all images for a specific device entry
-func (handler *APIHandler) GetImages(ctx context.Context, id *voltha.ID) (*voltha.Images, error) {
+func (handler *NBIHandler) GetImages(ctx context.Context, id *voltha.ID) (*voltha.Images, error) {
logger.Debugw("GetImages", log.Fields{"deviceid": id.Id})
device, err := handler.deviceMgr.GetDevice(ctx, id.Id)
if err != nil {
@@ -518,67 +509,55 @@
}
// UpdateDevicePmConfigs updates the PM configs
-func (handler *APIHandler) UpdateDevicePmConfigs(ctx context.Context, configs *voltha.PmConfigs) (*empty.Empty, error) {
+func (handler *NBIHandler) UpdateDevicePmConfigs(ctx context.Context, configs *voltha.PmConfigs) (*empty.Empty, error) {
logger.Debugw("UpdateDevicePmConfigs-request", log.Fields{"configs": *configs})
ch := make(chan interface{})
defer close(ch)
- go handler.deviceMgr.updatePmConfigs(ctx, configs, ch)
+ go handler.deviceMgr.UpdatePmConfigs(ctx, configs, ch)
return waitForNilResponseOnSuccess(ctx, ch)
}
// ListDevicePmConfigs returns pm configs of device
-func (handler *APIHandler) ListDevicePmConfigs(ctx context.Context, id *voltha.ID) (*voltha.PmConfigs, error) {
+func (handler *NBIHandler) ListDevicePmConfigs(ctx context.Context, id *voltha.ID) (*voltha.PmConfigs, error) {
logger.Debugw("ListDevicePmConfigs-request", log.Fields{"deviceId": *id})
- return handler.deviceMgr.listPmConfigs(ctx, id.Id)
+ return handler.deviceMgr.ListPmConfigs(ctx, id.Id)
}
-func (handler *APIHandler) CreateEventFilter(ctx context.Context, filter *voltha.EventFilter) (*voltha.EventFilter, error) {
+func (handler *NBIHandler) CreateEventFilter(ctx context.Context, filter *voltha.EventFilter) (*voltha.EventFilter, error) {
logger.Debugw("CreateEventFilter-request", log.Fields{"filter": *filter})
return nil, errors.New("UnImplemented")
}
-func (handler *APIHandler) UpdateEventFilter(ctx context.Context, filter *voltha.EventFilter) (*voltha.EventFilter, error) {
+func (handler *NBIHandler) UpdateEventFilter(ctx context.Context, filter *voltha.EventFilter) (*voltha.EventFilter, error) {
logger.Debugw("UpdateEventFilter-request", log.Fields{"filter": *filter})
return nil, errors.New("UnImplemented")
}
-func (handler *APIHandler) DeleteEventFilter(ctx context.Context, filterInfo *voltha.EventFilter) (*empty.Empty, error) {
+func (handler *NBIHandler) DeleteEventFilter(ctx context.Context, filterInfo *voltha.EventFilter) (*empty.Empty, error) {
logger.Debugw("DeleteEventFilter-request", log.Fields{"device-id": filterInfo.DeviceId, "filter-id": filterInfo.Id})
return nil, errors.New("UnImplemented")
}
// GetEventFilter returns all the filters present for a device
-func (handler *APIHandler) GetEventFilter(ctx context.Context, id *voltha.ID) (*voltha.EventFilters, error) {
+func (handler *NBIHandler) GetEventFilter(ctx context.Context, id *voltha.ID) (*voltha.EventFilters, error) {
logger.Debugw("GetEventFilter-request", log.Fields{"device-id": id})
return nil, errors.New("UnImplemented")
}
// ListEventFilters returns all the filters known to the system
-func (handler *APIHandler) ListEventFilters(ctx context.Context, empty *empty.Empty) (*voltha.EventFilters, error) {
+func (handler *NBIHandler) ListEventFilters(ctx context.Context, empty *empty.Empty) (*voltha.EventFilters, error) {
logger.Debug("ListEventFilter-request")
return nil, errors.New("UnImplemented")
}
-func (handler *APIHandler) SelfTest(ctx context.Context, id *voltha.ID) (*voltha.SelfTestResponse, error) {
+func (handler *NBIHandler) SelfTest(ctx context.Context, id *voltha.ID) (*voltha.SelfTestResponse, error) {
logger.Debugw("SelfTest-request", log.Fields{"id": id})
return &voltha.SelfTestResponse{}, errors.New("UnImplemented")
}
-func (handler *APIHandler) forwardPacketOut(ctx context.Context, packet *openflow_13.PacketOut) {
- logger.Debugw("forwardPacketOut-request", log.Fields{"packet": packet})
- //TODO: Update this logic once the OF Controller (OFAgent in this case) can include a transaction Id in its
- // request. For performance reason we can let both Cores in a Core-Pair forward the Packet to the adapters and
- // let once of the shim layer (kafka proxy or adapter request handler filters out the duplicate packet)
- if agent := handler.logicalDeviceMgr.getLogicalDeviceAgent(ctx, packet.Id); agent != nil {
- agent.packetOut(ctx, packet.PacketOut)
- } else {
- logger.Errorf("No logical device agent present", log.Fields{"logicaldeviceID": packet.Id})
- }
-}
-
// StreamPacketsOut sends packets to adapter
-func (handler *APIHandler) StreamPacketsOut(packets voltha.VolthaService_StreamPacketsOutServer) error {
+func (handler *NBIHandler) StreamPacketsOut(packets voltha.VolthaService_StreamPacketsOutServer) error {
logger.Debugw("StreamPacketsOut-request", log.Fields{"packets": packets})
loop:
for {
@@ -601,17 +580,17 @@
continue
}
- handler.forwardPacketOut(packets.Context(), packet)
+ handler.logicalDeviceMgr.PacketOut(packets.Context(), packet)
}
logger.Debugw("StreamPacketsOut-request-done", log.Fields{"packets": packets})
return nil
}
-func (handler *APIHandler) sendPacketIn(deviceID string, transationID string, packet *openflow_13.OfpPacketIn) {
+func (handler *NBIHandler) SendPacketIn(deviceID string, transationID string, packet *openflow_13.OfpPacketIn) {
// TODO: Augment the OF PacketIn to include the transactionId
packetIn := openflow_13.PacketIn{Id: deviceID, PacketIn: packet}
- logger.Debugw("sendPacketIn", log.Fields{"packetIn": packetIn})
+ logger.Debugw("SendPacketIn", log.Fields{"packetIn": packetIn})
handler.packetInQueue <- packetIn
}
@@ -625,7 +604,7 @@
var streamingTracker = &streamTracker{calls: make(map[string]*callTracker)}
-func (handler *APIHandler) getStreamingTracker(method string, done chan<- bool) *callTracker {
+func (handler *NBIHandler) getStreamingTracker(method string, done chan<- bool) *callTracker {
streamingTracker.Lock()
defer streamingTracker.Unlock()
if _, ok := streamingTracker.calls[method]; ok {
@@ -639,7 +618,7 @@
return streamingTracker.calls[method]
}
-func (handler *APIHandler) flushFailedPackets(tracker *callTracker) error {
+func (handler *NBIHandler) flushFailedPackets(tracker *callTracker) error {
if tracker.failedPacket != nil {
switch tracker.failedPacket.(type) {
case openflow_13.PacketIn:
@@ -654,7 +633,7 @@
}
// ReceivePacketsIn receives packets from adapter
-func (handler *APIHandler) ReceivePacketsIn(empty *empty.Empty, packetsIn voltha.VolthaService_ReceivePacketsInServer) error {
+func (handler *NBIHandler) ReceivePacketsIn(empty *empty.Empty, packetsIn voltha.VolthaService_ReceivePacketsInServer) error {
var streamingTracker = handler.getStreamingTracker("ReceivePacketsIn", handler.packetInQueueDone)
logger.Debugw("ReceivePacketsIn-request", log.Fields{"packetsIn": packetsIn})
@@ -690,17 +669,17 @@
return nil
}
-func (handler *APIHandler) sendChangeEvent(deviceID string, portStatus *openflow_13.OfpPortStatus) {
+func (handler *NBIHandler) SendChangeEvent(deviceID string, portStatus *openflow_13.OfpPortStatus) {
// TODO: validate the type of portStatus parameter
//if _, ok := portStatus.(*openflow_13.OfpPortStatus); ok {
//}
event := openflow_13.ChangeEvent{Id: deviceID, Event: &openflow_13.ChangeEvent_PortStatus{PortStatus: portStatus}}
- logger.Debugw("sendChangeEvent", log.Fields{"event": event})
+ logger.Debugw("SendChangeEvent", log.Fields{"event": event})
handler.changeEventQueue <- event
}
// ReceiveChangeEvents receives change in events
-func (handler *APIHandler) ReceiveChangeEvents(empty *empty.Empty, changeEvents voltha.VolthaService_ReceiveChangeEventsServer) error {
+func (handler *NBIHandler) ReceiveChangeEvents(empty *empty.Empty, changeEvents voltha.VolthaService_ReceiveChangeEventsServer) error {
var streamingTracker = handler.getStreamingTracker("ReceiveChangeEvents", handler.changeEventQueueDone)
logger.Debugw("ReceiveChangeEvents-request", log.Fields{"changeEvents": changeEvents})
@@ -734,8 +713,12 @@
return nil
}
+func (handler *NBIHandler) GetChangeEventsQueueForTest() <-chan openflow_13.ChangeEvent {
+ return handler.changeEventQueue
+}
+
// Subscribe subscribing request of ofagent
-func (handler *APIHandler) Subscribe(
+func (handler *NBIHandler) Subscribe(
ctx context.Context,
ofAgent *voltha.OfAgentSubscriber,
) (*voltha.OfAgentSubscriber, error) {
@@ -744,32 +727,32 @@
}
// GetAlarmDeviceData @TODO useless stub, what should this actually do?
-func (handler *APIHandler) GetAlarmDeviceData(ctx context.Context, in *common.ID) (*omci.AlarmDeviceData, error) {
+func (handler *NBIHandler) GetAlarmDeviceData(ctx context.Context, in *common.ID) (*omci.AlarmDeviceData, error) {
logger.Debug("GetAlarmDeviceData-stub")
return &omci.AlarmDeviceData{}, errors.New("UnImplemented")
}
// ListLogicalDeviceMeters returns logical device meters
-func (handler *APIHandler) ListLogicalDeviceMeters(ctx context.Context, id *voltha.ID) (*openflow_13.Meters, error) {
+func (handler *NBIHandler) ListLogicalDeviceMeters(ctx context.Context, id *voltha.ID) (*openflow_13.Meters, error) {
logger.Debugw("ListLogicalDeviceMeters", log.Fields{"id": *id})
return handler.logicalDeviceMgr.ListLogicalDeviceMeters(ctx, id.Id)
}
// GetMeterStatsOfLogicalDevice @TODO useless stub, what should this actually do?
-func (handler *APIHandler) GetMeterStatsOfLogicalDevice(ctx context.Context, in *common.ID) (*openflow_13.MeterStatsReply, error) {
+func (handler *NBIHandler) GetMeterStatsOfLogicalDevice(ctx context.Context, in *common.ID) (*openflow_13.MeterStatsReply, error) {
logger.Debug("GetMeterStatsOfLogicalDevice")
return &openflow_13.MeterStatsReply{}, errors.New("UnImplemented")
}
// GetMibDeviceData @TODO useless stub, what should this actually do?
-func (handler *APIHandler) GetMibDeviceData(ctx context.Context, in *common.ID) (*omci.MibDeviceData, error) {
+func (handler *NBIHandler) GetMibDeviceData(ctx context.Context, in *common.ID) (*omci.MibDeviceData, error) {
logger.Debug("GetMibDeviceData")
return &omci.MibDeviceData{}, errors.New("UnImplemented")
}
// SimulateAlarm sends simulate alarm request
-func (handler *APIHandler) SimulateAlarm(
+func (handler *NBIHandler) SimulateAlarm(
ctx context.Context,
in *voltha.SimulateAlarmRequest,
) (*common.OperationResp, error) {
@@ -777,48 +760,48 @@
successResp := &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
ch := make(chan interface{})
defer close(ch)
- go handler.deviceMgr.simulateAlarm(ctx, in, ch)
+ go handler.deviceMgr.SimulateAlarm(ctx, in, ch)
return successResp, nil
}
// UpdateLogicalDeviceMeterTable - This function sends meter mod request to logical device manager and waits for response
-func (handler *APIHandler) UpdateLogicalDeviceMeterTable(ctx context.Context, meter *openflow_13.MeterModUpdate) (*empty.Empty, error) {
+func (handler *NBIHandler) UpdateLogicalDeviceMeterTable(ctx context.Context, meter *openflow_13.MeterModUpdate) (*empty.Empty, error) {
logger.Debugw("UpdateLogicalDeviceMeterTable-request",
log.Fields{"meter": meter, "test": common.TestModeKeys_api_test.String()})
ch := make(chan interface{})
defer close(ch)
- go handler.logicalDeviceMgr.updateMeterTable(ctx, meter.Id, meter.MeterMod, ch)
+ go handler.logicalDeviceMgr.UpdateMeterTable(ctx, meter.Id, meter.MeterMod, ch)
return waitForNilResponseOnSuccess(ctx, ch)
}
// GetMembership returns membership
-func (handler *APIHandler) GetMembership(context.Context, *empty.Empty) (*voltha.Membership, error) {
+func (handler *NBIHandler) GetMembership(context.Context, *empty.Empty) (*voltha.Membership, error) {
return &voltha.Membership{}, errors.New("UnImplemented")
}
// UpdateMembership updates membership
-func (handler *APIHandler) UpdateMembership(context.Context, *voltha.Membership) (*empty.Empty, error) {
+func (handler *NBIHandler) UpdateMembership(context.Context, *voltha.Membership) (*empty.Empty, error) {
return &empty.Empty{}, errors.New("UnImplemented")
}
-func (handler *APIHandler) EnablePort(ctx context.Context, port *voltha.Port) (*empty.Empty, error) {
+func (handler *NBIHandler) EnablePort(ctx context.Context, port *voltha.Port) (*empty.Empty, error) {
logger.Debugw("EnablePort-request", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
ch := make(chan interface{})
defer close(ch)
- go handler.deviceMgr.enablePort(ctx, port, ch)
+ go handler.deviceMgr.EnablePort(ctx, port, ch)
return waitForNilResponseOnSuccess(ctx, ch)
}
-func (handler *APIHandler) DisablePort(ctx context.Context, port *voltha.Port) (*empty.Empty, error) {
+func (handler *NBIHandler) DisablePort(ctx context.Context, port *voltha.Port) (*empty.Empty, error) {
logger.Debugw("DisablePort-request", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
ch := make(chan interface{})
defer close(ch)
- go handler.deviceMgr.disablePort(ctx, port, ch)
+ go handler.deviceMgr.DisablePort(ctx, port, ch)
return waitForNilResponseOnSuccess(ctx, ch)
}
-func (handler *APIHandler) StartOmciTestAction(ctx context.Context, omcitestrequest *voltha.OmciTestRequest) (*voltha.TestResponse, error) {
+func (handler *NBIHandler) StartOmciTestAction(ctx context.Context, omcitestrequest *voltha.OmciTestRequest) (*voltha.TestResponse, error) {
logger.Debugw("Omci_test_Request", log.Fields{"id": omcitestrequest.Id, "uuid": omcitestrequest.Uuid})
- return handler.deviceMgr.startOmciTest(ctx, omcitestrequest)
+ return handler.deviceMgr.StartOmciTest(ctx, omcitestrequest)
}
diff --git a/rw_core/core/grpc_nbi_api_handler_test.go b/rw_core/core/api/grpc_nbi_handler_test.go
similarity index 89%
rename from rw_core/core/grpc_nbi_api_handler_test.go
rename to rw_core/core/api/grpc_nbi_handler_test.go
index e54c14c..0579f94 100755
--- a/rw_core/core/grpc_nbi_api_handler_test.go
+++ b/rw_core/core/api/grpc_nbi_handler_test.go
@@ -13,13 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package core
+package api
import (
"context"
"errors"
"fmt"
- "github.com/opencord/voltha-lib-go/v3/pkg/flows"
"math/rand"
"os"
"runtime"
@@ -30,8 +29,13 @@
"time"
"github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/rw_core/config"
+ "github.com/opencord/voltha-go/rw_core/core/adapter"
+ "github.com/opencord/voltha-go/rw_core/core/device"
cm "github.com/opencord/voltha-go/rw_core/mocks"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db"
+ "github.com/opencord/voltha-lib-go/v3/pkg/flows"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
@@ -45,9 +49,16 @@
"google.golang.org/grpc/status"
)
+const (
+ coreName = "rw_core"
+)
+
type NBTest struct {
etcdServer *mock_etcd.EtcdServer
- core *Core
+ deviceMgr *device.Manager
+ logicalDeviceMgr *device.LogicalManager
+ adapterMgr *adapter.Manager
+ kmp kafka.InterContainerProxy
kClient kafka.Client
kvClientPort int
numONUPerOLT int
@@ -96,10 +107,40 @@
cfg.GrpcHost = "127.0.0.1"
setCoreCompeteMode(inCompeteMode)
client := setupKVClient(cfg, nb.coreInstanceID)
- nb.core = NewCore(ctx, nb.coreInstanceID, cfg, client, nb.kClient)
- err = nb.core.Start(context.Background())
- if err != nil {
- logger.Fatal("Cannot start core")
+ backend := &db.Backend{
+ Client: client,
+ StoreType: cfg.KVStoreType,
+ Host: cfg.KVStoreHost,
+ Port: cfg.KVStorePort,
+ Timeout: cfg.KVStoreTimeout,
+ LivenessChannelInterval: cfg.LiveProbeInterval / 2,
+ PathPrefix: cfg.KVStoreDataPrefix}
+ nb.kmp = kafka.NewInterContainerProxy(
+ kafka.InterContainerHost(cfg.KafkaAdapterHost),
+ kafka.InterContainerPort(cfg.KafkaAdapterPort),
+ kafka.MsgClient(nb.kClient),
+ kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}),
+ kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: cfg.AffinityRouterTopic}))
+
+ endpointMgr := kafka.NewEndpointManager(backend)
+ proxy := model.NewProxy(backend, "/")
+ nb.adapterMgr = adapter.NewAdapterManager(proxy, nb.coreInstanceID, nb.kClient)
+ nb.deviceMgr, nb.logicalDeviceMgr = device.NewDeviceManagers(proxy, nb.adapterMgr, nb.kmp, endpointMgr, cfg.CorePairTopic, nb.coreInstanceID, cfg.DefaultCoreTimeout)
+ if err = nb.adapterMgr.Start(ctx); err != nil {
+ logger.Fatalf("Cannot start adapterMgr: %s", err)
+ }
+ nb.deviceMgr.Start(ctx)
+ nb.logicalDeviceMgr.Start(ctx)
+
+ if err = nb.kmp.Start(); err != nil {
+ logger.Fatalf("Cannot start InterContainerProxy: %s", err)
+ }
+ requestProxy := NewAdapterRequestHandlerProxy(nb.coreInstanceID, nb.deviceMgr, nb.adapterMgr, proxy, proxy, cfg.LongRunningRequestTimeout, cfg.DefaultRequestTimeout)
+ if err := nb.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: cfg.CoreTopic}, requestProxy); err != nil {
+ logger.Fatalf("Cannot add request handler: %s", err)
+ }
+ if err := nb.kmp.SubscribeWithDefaultRequestHandler(kafka.Topic{Name: cfg.CorePairTopic}, kafka.OffsetNewest); err != nil {
+ logger.Fatalf("Cannot add default request handler: %s", err)
}
}
@@ -125,7 +166,7 @@
}
types := []*voltha.DeviceType{{Id: nb.oltAdapterName, Adapter: nb.oltAdapterName, AcceptsAddRemoveFlowUpdates: true}}
deviceTypes := &voltha.DeviceTypes{Items: types}
- if _, err := nb.core.adapterMgr.registerAdapter(registrationData, deviceTypes); err != nil {
+ if _, err := nb.adapterMgr.RegisterAdapter(registrationData, deviceTypes); err != nil {
logger.Errorw("failed-to-register-adapter", log.Fields{"error": err})
assert.NotNil(t, err)
}
@@ -149,7 +190,7 @@
}
types = []*voltha.DeviceType{{Id: nb.onuAdapterName, Adapter: nb.onuAdapterName, AcceptsAddRemoveFlowUpdates: true}}
deviceTypes = &voltha.DeviceTypes{Items: types}
- if _, err := nb.core.adapterMgr.registerAdapter(registrationData, deviceTypes); err != nil {
+ if _, err := nb.adapterMgr.RegisterAdapter(registrationData, deviceTypes); err != nil {
logger.Errorw("failed-to-register-adapter", log.Fields{"error": err})
assert.NotNil(t, err)
}
@@ -159,15 +200,21 @@
if nb.kClient != nil {
nb.kClient.Stop()
}
- if nb.core != nil {
- nb.core.Stop(context.Background())
+ if nb.logicalDeviceMgr != nil {
+ nb.logicalDeviceMgr.Stop(context.Background())
+ }
+ if nb.deviceMgr != nil {
+ nb.deviceMgr.Stop(context.Background())
+ }
+ if nb.kmp != nil {
+ nb.kmp.Stop()
}
if nb.etcdServer != nil {
stopEmbeddedEtcdServer(nb.etcdServer)
}
}
-func (nb *NBTest) verifyLogicalDevices(t *testing.T, oltDevice *voltha.Device, nbi *APIHandler) {
+func (nb *NBTest) verifyLogicalDevices(t *testing.T, oltDevice *voltha.Device, nbi *NBIHandler) {
// Get the latest set of logical devices
logicalDevices, err := nbi.ListLogicalDevices(getContext(), &empty.Empty{})
assert.Nil(t, err)
@@ -207,7 +254,7 @@
}
}
-func (nb *NBTest) verifyDevices(t *testing.T, nbi *APIHandler) {
+func (nb *NBTest) verifyDevices(t *testing.T, nbi *NBIHandler) {
// Get the latest set of devices
devices, err := nbi.ListDevices(getContext(), &empty.Empty{})
assert.Nil(t, err)
@@ -273,7 +320,7 @@
wg.Wait()
}
-func (nb *NBTest) getADevice(rootDevice bool, nbi *APIHandler) (*voltha.Device, error) {
+func (nb *NBTest) getADevice(rootDevice bool, nbi *NBIHandler) (*voltha.Device, error) {
devices, err := nbi.ListDevices(getContext(), &empty.Empty{})
if err != nil {
return nil, err
@@ -286,7 +333,7 @@
return nil, status.Errorf(codes.NotFound, "%v device not found", rootDevice)
}
-func (nb *NBTest) testCoreWithoutData(t *testing.T, nbi *APIHandler) {
+func (nb *NBTest) testCoreWithoutData(t *testing.T, nbi *NBIHandler) {
lds, err := nbi.ListLogicalDevices(getContext(), &empty.Empty{})
assert.Nil(t, err)
assert.NotNil(t, lds)
@@ -301,7 +348,7 @@
assert.NotNil(t, adapters)
}
-func (nb *NBTest) testAdapterRegistration(t *testing.T, nbi *APIHandler) {
+func (nb *NBTest) testAdapterRegistration(t *testing.T, nbi *NBIHandler) {
adapters, err := nbi.ListAdapters(getContext(), &empty.Empty{})
assert.Nil(t, err)
assert.NotNil(t, adapters)
@@ -336,7 +383,7 @@
}
}
-func (nb *NBTest) testCreateDevice(t *testing.T, nbi *APIHandler) {
+func (nb *NBTest) testCreateDevice(t *testing.T, nbi *NBIHandler) {
// Create a valid device
oltDevice, err := nbi.CreateDevice(getContext(), &voltha.Device{Type: nb.oltAdapterName, MacAddress: "aa:bb:cc:cc:ee:ee"})
assert.Nil(t, err)
@@ -375,7 +422,7 @@
assert.Nil(t, err)
}
-func (nb *NBTest) testEnableDevice(t *testing.T, nbi *APIHandler) {
+func (nb *NBTest) testEnableDevice(t *testing.T, nbi *NBIHandler) {
// Create a device that has no adapter registered
oltDeviceNoAdapter, err := nbi.CreateDevice(getContext(), &voltha.Device{Type: "noAdapterRegistered", MacAddress: "aa:bb:cc:cc:ee:ff"})
assert.Nil(t, err)
@@ -438,7 +485,7 @@
wg.Wait()
}
-func (nb *NBTest) testDisableAndReEnableRootDevice(t *testing.T, nbi *APIHandler) {
+func (nb *NBTest) testDisableAndReEnableRootDevice(t *testing.T, nbi *NBIHandler) {
//Get an OLT device
oltDevice, err := nb.getADevice(true, nbi)
assert.Nil(t, err)
@@ -456,7 +503,7 @@
assert.Nil(t, err)
// Verify that all onu devices are disabled as well
- onuDevices, err := nb.core.deviceMgr.getAllChildDevices(getContext(), oltDevice.Id)
+ onuDevices, err := nb.deviceMgr.GetAllChildDevices(getContext(), oltDevice.Id)
assert.Nil(t, err)
for _, onu := range onuDevices.Items {
err = waitUntilDeviceReadiness(onu.Id, nb.maxTimeout, vdFunction, nbi)
@@ -491,7 +538,7 @@
assert.Nil(t, err)
// Verify that all onu devices are enabled as well
- onuDevices, err = nb.core.deviceMgr.getAllChildDevices(getContext(), oltDevice.Id)
+ onuDevices, err = nb.deviceMgr.GetAllChildDevices(getContext(), oltDevice.Id)
assert.Nil(t, err)
for _, onu := range onuDevices.Items {
err = waitUntilDeviceReadiness(onu.Id, nb.maxTimeout, vdFunction, nbi)
@@ -515,7 +562,7 @@
assert.Nil(t, err)
}
-func (nb *NBTest) testDisableAndDeleteAllDevice(t *testing.T, nbi *APIHandler) {
+func (nb *NBTest) testDisableAndDeleteAllDevice(t *testing.T, nbi *NBIHandler) {
//Get an OLT device
oltDevice, err := nb.getADevice(true, nbi)
assert.Nil(t, err)
@@ -533,7 +580,7 @@
assert.Nil(t, err)
// Verify that all onu devices are disabled as well
- onuDevices, err := nb.core.deviceMgr.getAllChildDevices(getContext(), oltDevice.Id)
+ onuDevices, err := nb.deviceMgr.GetAllChildDevices(getContext(), oltDevice.Id)
assert.Nil(t, err)
for _, onu := range onuDevices.Items {
err = waitUntilDeviceReadiness(onu.Id, nb.maxTimeout, vdFunction, nbi)
@@ -558,7 +605,7 @@
err = waitUntilConditionForLogicalDevices(nb.maxTimeout, nbi, vlFunction)
assert.Nil(t, err)
}
-func (nb *NBTest) testEnableAndDeleteAllDevice(t *testing.T, nbi *APIHandler) {
+func (nb *NBTest) testEnableAndDeleteAllDevice(t *testing.T, nbi *NBIHandler) {
//Create the device with valid data
oltDevice, err := nbi.CreateDevice(getContext(), &voltha.Device{Type: nb.oltAdapterName, MacAddress: "aa:bb:cc:cc:ee:ee"})
assert.Nil(t, err)
@@ -581,7 +628,7 @@
assert.Nil(t, err)
//Get all child devices
- onuDevices, err := nb.core.deviceMgr.getAllChildDevices(getContext(), oltDevice.Id)
+ onuDevices, err := nb.deviceMgr.GetAllChildDevices(getContext(), oltDevice.Id)
assert.Nil(t, err)
// Wait for the all onu devices to be enabled
@@ -626,7 +673,7 @@
err = waitUntilConditionForDevices(nb.maxTimeout, nbi, vFunc)
assert.Nil(t, err)
}
-func (nb *NBTest) testDisableAndEnablePort(t *testing.T, nbi *APIHandler) {
+func (nb *NBTest) testDisableAndEnablePort(t *testing.T, nbi *NBIHandler) {
//Get an OLT device
var cp *voltha.Port
oltDevice, err := nb.getADevice(true, nbi)
@@ -719,7 +766,7 @@
}
-func (nb *NBTest) testDeviceRebootWhenOltIsEnabled(t *testing.T, nbi *APIHandler) {
+func (nb *NBTest) testDeviceRebootWhenOltIsEnabled(t *testing.T, nbi *NBIHandler) {
//Get an OLT device
oltDevice, err := nb.getADevice(true, nbi)
assert.Nil(t, err)
@@ -728,7 +775,7 @@
assert.Equal(t, oltDevice.AdminState, voltha.AdminState_ENABLED)
// Verify that we have one or more ONUs to start with
- onuDevices, err := nb.core.deviceMgr.getAllChildDevices(getContext(), oltDevice.Id)
+ onuDevices, err := nb.deviceMgr.GetAllChildDevices(getContext(), oltDevice.Id)
assert.Nil(t, err)
assert.NotNil(t, onuDevices)
assert.Greater(t, len(onuDevices.Items), 0)
@@ -786,8 +833,7 @@
// Update the OLT Connection Status to REACHABLE and operation status to ACTIVE
// Normally, in a real adapter this happens after connection regain via a heartbeat mechanism with real hardware
- deviceAgent := nbi.deviceMgr.getDeviceAgent(getContext(), oltDevice.Id)
- err = deviceAgent.updateDeviceStatus(getContext(), voltha.OperStatus_ACTIVE, voltha.ConnectStatus_REACHABLE)
+ err = nbi.deviceMgr.UpdateDeviceStatus(getContext(), oltDevice.Id, voltha.OperStatus_ACTIVE, voltha.ConnectStatus_REACHABLE)
assert.Nil(t, err)
// Verify the device connection and operation states
@@ -811,13 +857,13 @@
assert.Equal(t, 1, len(logicalDevices.Items))
// Verify that we have no ONUs left
- onuDevices, err = nb.core.deviceMgr.getAllChildDevices(getContext(), oltDevice.Id)
+ onuDevices, err = nb.deviceMgr.GetAllChildDevices(getContext(), oltDevice.Id)
assert.Nil(t, err)
assert.NotNil(t, onuDevices)
assert.Equal(t, 0, len(onuDevices.Items))
}
-func (nb *NBTest) testStartOmciTestAction(t *testing.T, nbi *APIHandler) {
+func (nb *NBTest) testStartOmciTestAction(t *testing.T, nbi *NBIHandler) {
// -----------------------------------------------------------------------
// SubTest 1: Omci test action should fail due to nonexistent device id
@@ -883,7 +929,7 @@
err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vdFunction, nbi)
assert.Nil(t, err)
- onuDevices, err := nb.core.deviceMgr.getAllChildDevices(getContext(), oltDevice.Id)
+ onuDevices, err := nb.deviceMgr.GetAllChildDevices(getContext(), oltDevice.Id)
assert.Nil(t, err)
assert.Greater(t, len(onuDevices.Items), 0)
@@ -918,7 +964,7 @@
return uint64(md | (port & 0xFFFFFFFF))
}
-func (nb *NBTest) verifyLogicalDeviceFlowCount(t *testing.T, nbi *APIHandler, numNNIPorts int, numUNIPorts int) {
+func (nb *NBTest) verifyLogicalDeviceFlowCount(t *testing.T, nbi *NBIHandler, numNNIPorts int, numUNIPorts int) {
expectedNumFlows := numNNIPorts*3 + numNNIPorts*numUNIPorts
// Wait for logical device to have all the flows
var vlFunction isLogicalDevicesConditionSatisfied = func(lds *voltha.LogicalDevices) bool {
@@ -929,7 +975,7 @@
assert.Nil(t, err)
}
-func (nb *NBTest) sendTrapFlows(t *testing.T, nbi *APIHandler, logicalDevice *voltha.LogicalDevice, meterID uint64, startingVlan int) (numNNIPorts, numUNIPorts int) {
+func (nb *NBTest) sendTrapFlows(t *testing.T, nbi *NBIHandler, logicalDevice *voltha.LogicalDevice, meterID uint64, startingVlan int) (numNNIPorts, numUNIPorts int) {
// Send flows for the parent device
var nniPorts []*voltha.LogicalPort
var uniPorts []*voltha.LogicalPort
@@ -997,7 +1043,7 @@
return len(nniPorts), len(uniPorts)
}
-func (nb *NBTest) sendEAPFlows(t *testing.T, nbi *APIHandler, logicalDeviceID string, port *ofp.OfpPort, vlan int, meterID uint64) {
+func (nb *NBTest) sendEAPFlows(t *testing.T, nbi *NBIHandler, logicalDeviceID string, port *ofp.OfpPort, vlan int, meterID uint64) {
maxInt32 := uint64(0xFFFFFFFF)
controllerPortMask := uint32(4294967293) // will result in 4294967293&0x7fffffff => 2147483645 which is the actual controller port
fa := &flows.FlowArgs{
@@ -1016,11 +1062,9 @@
assert.Nil(t, err)
}
-func (nb *NBTest) monitorLogicalDevice(t *testing.T, nbi *APIHandler, numNNIPorts int, numUNIPorts int, wg *sync.WaitGroup) {
+func (nb *NBTest) monitorLogicalDevice(t *testing.T, nbi *NBIHandler, numNNIPorts int, numUNIPorts int, wg *sync.WaitGroup) {
defer wg.Done()
- if nb.core.logicalDeviceMgr.grpcNbiHdlr != nbi {
- nb.core.logicalDeviceMgr.setGrpcNbiHandler(nbi)
- }
+ nb.logicalDeviceMgr.SetEventCallbacks(nbi)
// Clear any existing flows on the adapters
nb.oltAdapter.ClearFlows()
@@ -1118,7 +1162,7 @@
}
func TestSuiteNbiApiHandler(t *testing.T) {
- f, err := os.Create("profile.cpu")
+ f, err := os.Create("../../../tests/results/profile.cpu")
if err != nil {
logger.Fatalf("could not create CPU profile: %v\n ", err)
}
@@ -1141,7 +1185,7 @@
nb.startCore(false)
// Set the grpc API interface - no grpc server is running in unit test
- nbi := NewAPIHandler(nb.core)
+ nbi := NewAPIHandler(nb.deviceMgr, nb.logicalDeviceMgr, nb.adapterMgr)
// 1. Basic test with no data in Core
nb.testCoreWithoutData(t, nbi)
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 5043d47..de126a2 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -23,6 +23,9 @@
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/rw_core/config"
+ "github.com/opencord/voltha-go/rw_core/core/adapter"
+ "github.com/opencord/voltha-go/rw_core/core/api"
+ "github.com/opencord/voltha-go/rw_core/core/device"
"github.com/opencord/voltha-lib-go/v3/pkg/db"
"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
grpcserver "github.com/opencord/voltha-lib-go/v3/pkg/grpc"
@@ -38,11 +41,11 @@
// Core represent read,write core attributes
type Core struct {
instanceID string
- deviceMgr *DeviceManager
- logicalDeviceMgr *LogicalDeviceManager
+ deviceMgr *device.Manager
+ logicalDeviceMgr *device.LogicalManager
grpcServer *grpcserver.GrpcServer
- grpcNBIAPIHandler *APIHandler
- adapterMgr *AdapterManager
+ grpcNBIAPIHandler *api.NBIHandler
+ adapterMgr *adapter.Manager
config *config.RWCoreFlags
kmp kafka.InterContainerProxy
clusterDataProxy *model.Proxy
@@ -108,6 +111,8 @@
p.UpdateStatus("kv-store", probe.ServiceStatusRunning)
}
+ endpointMgr := kafka.NewEndpointManager(&core.backend)
+
core.clusterDataProxy = model.NewProxy(&core.backend, "/")
core.localDataProxy = model.NewProxy(&core.backend, "/")
@@ -116,10 +121,8 @@
core.initKafkaManager(ctx)
logger.Debugw("values", log.Fields{"kmp": core.kmp})
- core.deviceMgr = newDeviceManager(core)
- core.adapterMgr = newAdapterManager(core.clusterDataProxy, core.instanceID, core.kafkaClient, core.deviceMgr)
- core.deviceMgr.adapterMgr = core.adapterMgr
- core.logicalDeviceMgr = newLogicalDeviceManager(core, core.deviceMgr, core.kmp, core.clusterDataProxy, core.config.DefaultCoreTimeout)
+ core.adapterMgr = adapter.NewAdapterManager(core.clusterDataProxy, core.instanceID, core.kafkaClient)
+ core.deviceMgr, core.logicalDeviceMgr = device.NewDeviceManagers(core.clusterDataProxy, core.adapterMgr, core.kmp, endpointMgr, core.config.CorePairTopic, core.instanceID, core.config.DefaultCoreTimeout)
// Start the KafkaManager. This must be done after the deviceMgr, adapterMgr, and
// logicalDeviceMgr have been created, as once the kmp is started, it will register
@@ -151,10 +154,10 @@
core.grpcServer.Stop()
}
if core.logicalDeviceMgr != nil {
- core.logicalDeviceMgr.stop(ctx)
+ core.logicalDeviceMgr.Stop(ctx)
}
if core.deviceMgr != nil {
- core.deviceMgr.stop(ctx)
+ core.deviceMgr.Stop(ctx)
}
if core.kmp != nil {
core.kmp.Stop()
@@ -169,9 +172,9 @@
core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false, probe.GetProbeFromContext(ctx))
logger.Info("grpc-server-created")
- core.grpcNBIAPIHandler = NewAPIHandler(core)
+ core.grpcNBIAPIHandler = api.NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr)
logger.Infow("grpc-handler", log.Fields{"core_binding_key": core.config.CoreBindingKey})
- core.logicalDeviceMgr.setGrpcNbiHandler(core.grpcNBIAPIHandler)
+ core.logicalDeviceMgr.SetEventCallbacks(core.grpcNBIAPIHandler)
// Create a function to register the core GRPC service with the GRPC server
f := func(gs *grpc.Server) {
voltha.RegisterVolthaServiceServer(
@@ -355,10 +358,10 @@
return nil
}
-func (core *Core) registerAdapterRequestHandlers(ctx context.Context, coreInstanceID string, dMgr *DeviceManager,
- ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy,
+func (core *Core) registerAdapterRequestHandlers(ctx context.Context, coreInstanceID string, dMgr *device.Manager,
+ ldMgr *device.LogicalManager, aMgr *adapter.Manager, cdProxy *model.Proxy, ldProxy *model.Proxy,
) error {
- requestProxy := NewAdapterRequestHandlerProxy(core, coreInstanceID, dMgr, ldMgr, aMgr, cdProxy, ldProxy,
+ requestProxy := api.NewAdapterRequestHandlerProxy(coreInstanceID, dMgr, aMgr, cdProxy, ldProxy,
core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
// Register the broadcast topic to handle any core-bound broadcast requests
@@ -379,19 +382,19 @@
func (core *Core) startDeviceManager(ctx context.Context) {
logger.Info("DeviceManager-Starting...")
- core.deviceMgr.start(ctx, core.logicalDeviceMgr)
+ core.deviceMgr.Start(ctx)
logger.Info("DeviceManager-Started")
}
func (core *Core) startLogicalDeviceManager(ctx context.Context) {
logger.Info("Logical-DeviceManager-Starting...")
- core.logicalDeviceMgr.start(ctx)
+ core.logicalDeviceMgr.Start(ctx)
logger.Info("Logical-DeviceManager-Started")
}
func (core *Core) startAdapterManager(ctx context.Context) {
logger.Info("Adapter-Manager-Starting...")
- err := core.adapterMgr.start(ctx)
+ err := core.adapterMgr.Start(ctx)
if err != nil {
logger.Fatalf("failed-to-start-adapter-manager: error %v ", err)
}
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device/agent.go
similarity index 86%
rename from rw_core/core/device_agent.go
rename to rw_core/core/device/agent.go
index dccd271..8d18e10 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device/agent.go
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package core
+package device
import (
"context"
@@ -22,6 +22,8 @@
"errors"
"fmt"
"github.com/golang/protobuf/ptypes"
+ "github.com/opencord/voltha-go/rw_core/core/adapter"
+ "github.com/opencord/voltha-go/rw_core/core/device/remote"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
"reflect"
"sync"
@@ -39,15 +41,15 @@
"google.golang.org/grpc/status"
)
-// DeviceAgent represents device agent attributes
-type DeviceAgent struct {
+// Agent represents device agent attributes
+type Agent struct {
deviceID string
parentID string
deviceType string
isRootdevice bool
- adapterProxy *AdapterProxy
- adapterMgr *AdapterManager
- deviceMgr *DeviceManager
+ adapterProxy *remote.AdapterProxy
+ adapterMgr *adapter.Manager
+ deviceMgr *Manager
clusterDataProxy *model.Proxy
exitChannel chan int
device *voltha.Device
@@ -58,12 +60,12 @@
stopped bool
}
-//newDeviceAgent creates a new device agent. The device will be initialized when start() is called.
-func newDeviceAgent(ap *AdapterProxy, device *voltha.Device, deviceMgr *DeviceManager, cdProxy *model.Proxy, timeout time.Duration) *DeviceAgent {
- var agent DeviceAgent
+//newAgent creates a new device agent. The device will be initialized when start() is called.
+func newAgent(ap *remote.AdapterProxy, device *voltha.Device, deviceMgr *Manager, cdProxy *model.Proxy, timeout time.Duration) *Agent {
+ var agent Agent
agent.adapterProxy = ap
if device.Id == "" {
- agent.deviceID = CreateDeviceID()
+ agent.deviceID = coreutils.CreateDeviceID()
} else {
agent.deviceID = device.Id
}
@@ -84,7 +86,7 @@
// start() saves the device to the data model and registers for callbacks on that device if deviceToCreate!=nil.
// Otherwise, it will load the data from the dB and setup the necessary callbacks and proxies. Returns the device that
// was started.
-func (agent *DeviceAgent) start(ctx context.Context, deviceToCreate *voltha.Device) (*voltha.Device, error) {
+func (agent *Agent) start(ctx context.Context, deviceToCreate *voltha.Device) (*voltha.Device, error) {
needToStart := false
if agent.startOnce.Do(func() { needToStart = true }); !needToStart {
return agent.getDevice(ctx)
@@ -117,7 +119,7 @@
// Create a new device
// Assumption is that AdminState, FlowGroups, and Flows are unitialized since this
// is a new device, so populate them here before passing the device to clusterDataProxy.AddWithId.
- // agent.deviceId will also have been set during newDeviceAgent().
+ // agent.deviceId will also have been set during newAgent().
device = (proto.Clone(deviceToCreate)).(*voltha.Device)
device.Id = agent.deviceID
device.AdminState = voltha.AdminState_PREPROVISIONED
@@ -143,7 +145,7 @@
}
// stop stops the device agent. Not much to do for now
-func (agent *DeviceAgent) stop(ctx context.Context) error {
+func (agent *Agent) stop(ctx context.Context) error {
needToStop := false
if agent.stopOnce.Do(func() { needToStop = true }); !needToStop {
return nil
@@ -170,7 +172,7 @@
}
// Load the most recent state from the KVStore for the device.
-func (agent *DeviceAgent) reconcileWithKVStore(ctx context.Context) {
+func (agent *Agent) reconcileWithKVStore(ctx context.Context) {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
logger.Warnw("request-aborted", log.Fields{"device-id": agent.deviceID, "error": err})
return
@@ -193,14 +195,14 @@
// onSuccess is a common callback for scenarios where we receive a nil response following a request to an adapter
// and the only action required is to publish a successful result on kafka
-func (agent *DeviceAgent) onSuccess(rpc string, response interface{}, reqArgs ...interface{}) {
+func (agent *Agent) onSuccess(rpc string, response interface{}, reqArgs ...interface{}) {
logger.Debugw("response successful", log.Fields{"rpc": rpc, "device-id": agent.deviceID})
// TODO: Post success message onto kafka
}
// onFailure is a common callback for scenarios where we receive an error response following a request to an adapter
// and the only action required is to publish the failed result on kafka
-func (agent *DeviceAgent) onFailure(rpc string, response interface{}, reqArgs ...interface{}) {
+func (agent *Agent) onFailure(rpc string, response interface{}, reqArgs ...interface{}) {
if res, ok := response.(error); ok {
logger.Errorw("rpc-failed", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": res, "args": reqArgs})
} else {
@@ -209,7 +211,7 @@
// TODO: Post failure message onto kafka
}
-func (agent *DeviceAgent) waitForAdapterResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
+func (agent *Agent) waitForAdapterResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
onSuccess coreutils.ResponseCallback, onFailure coreutils.ResponseCallback, reqArgs ...interface{}) {
defer cancel()
select {
@@ -227,7 +229,7 @@
}
// getDevice returns the device data from cache
-func (agent *DeviceAgent) getDevice(ctx context.Context) (*voltha.Device, error) {
+func (agent *Agent) getDevice(ctx context.Context) (*voltha.Device, error) {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
@@ -236,12 +238,12 @@
}
// getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
-func (agent *DeviceAgent) getDeviceWithoutLock() *voltha.Device {
+func (agent *Agent) getDeviceWithoutLock() *voltha.Device {
return proto.Clone(agent.device).(*voltha.Device)
}
// enableDevice activates a preprovisioned or a disable device
-func (agent *DeviceAgent) enableDevice(ctx context.Context) error {
+func (agent *Agent) enableDevice(ctx context.Context) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
@@ -254,7 +256,7 @@
// First figure out which adapter will handle this device type. We do it at this stage as allow devices to be
// pre-provisioned with the required adapter not registered. At this stage, since we need to communicate
// with the adapter then we need to know the adapter that will handle this request
- adapterName, err := agent.adapterMgr.getAdapterType(cloned.Type)
+ adapterName, err := agent.adapterMgr.GetAdapterType(cloned.Type)
if err != nil {
return err
}
@@ -285,9 +287,9 @@
var ch chan *kafka.RpcResponse
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
if previousAdminState == voltha.AdminState_PREPROVISIONED {
- ch, err = agent.adapterProxy.adoptDevice(subCtx, device)
+ ch, err = agent.adapterProxy.AdoptDevice(subCtx, device)
} else {
- ch, err = agent.adapterProxy.reEnableDevice(subCtx, device)
+ ch, err = agent.adapterProxy.ReEnableDevice(subCtx, device)
}
if err != nil {
cancel()
@@ -298,7 +300,7 @@
return nil
}
-func (agent *DeviceAgent) waitForAdapterFlowResponse(ctx context.Context, cancel context.CancelFunc, ch chan *kafka.RpcResponse, response coreutils.Response) {
+func (agent *Agent) waitForAdapterFlowResponse(ctx context.Context, cancel context.CancelFunc, ch chan *kafka.RpcResponse, response coreutils.Response) {
defer cancel()
select {
case rpcResponse, ok := <-ch:
@@ -373,7 +375,7 @@
return newGroups, groupsToDelete, updatedAllGroups
}
-func (agent *DeviceAgent) addFlowsAndGroupsToAdapter(ctx context.Context, newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+func (agent *Agent) addFlowsAndGroupsToAdapter(ctx context.Context, newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
logger.Debugw("add-flows-groups-to-adapters", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "groups": newGroups, "flow-metadata": flowMetadata})
if (len(newFlows) | len(newGroups)) == 0 {
@@ -387,7 +389,7 @@
defer agent.requestQueue.RequestComplete()
device := agent.getDeviceWithoutLock()
- dType := agent.adapterMgr.getDeviceType(device.Type)
+ dType := agent.adapterMgr.GetDeviceType(device.Type)
if dType == nil {
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
}
@@ -423,7 +425,7 @@
cancel()
return coreutils.DoneResponse(), nil
}
- rpcResponse, err := agent.adapterProxy.updateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
+ rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
if err != nil {
cancel()
return coreutils.DoneResponse(), err
@@ -439,7 +441,7 @@
ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
}
- rpcResponse, err := agent.adapterProxy.updateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+ rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
if err != nil {
cancel()
return coreutils.DoneResponse(), err
@@ -451,7 +453,7 @@
//addFlowsAndGroups adds the "newFlows" and "newGroups" from the existing flows/groups and sends the update to the
//adapters
-func (agent *DeviceAgent) addFlowsAndGroups(ctx context.Context, newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
+func (agent *Agent) addFlowsAndGroups(ctx context.Context, newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
response, err := agent.addFlowsAndGroupsToAdapter(ctx, newFlows, newGroups, flowMetadata)
if err != nil {
return err
@@ -463,7 +465,7 @@
return nil
}
-func (agent *DeviceAgent) deleteFlowsAndGroupsFromAdapter(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+func (agent *Agent) deleteFlowsAndGroupsFromAdapter(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
logger.Debugw("delete-flows-groups-from-adapter", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel, "groups": groupsToDel})
if (len(flowsToDel) | len(groupsToDel)) == 0 {
@@ -477,7 +479,7 @@
defer agent.requestQueue.RequestComplete()
device := agent.getDeviceWithoutLock()
- dType := agent.adapterMgr.getDeviceType(device.Type)
+ dType := agent.adapterMgr.GetDeviceType(device.Type)
if dType == nil {
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
}
@@ -533,7 +535,7 @@
cancel()
return coreutils.DoneResponse(), nil
}
- rpcResponse, err := agent.adapterProxy.updateFlowsBulk(subCtx, device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, flowMetadata)
+ rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, flowMetadata)
if err != nil {
cancel()
return coreutils.DoneResponse(), err
@@ -549,7 +551,7 @@
ToRemove: &voltha.FlowGroups{Items: groupsToDel},
ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
}
- rpcResponse, err := agent.adapterProxy.updateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+ rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
if err != nil {
cancel()
return coreutils.DoneResponse(), err
@@ -561,7 +563,7 @@
//deleteFlowsAndGroups removes the "flowsToDel" and "groupsToDel" from the existing flows/groups and sends the update to the
//adapters
-func (agent *DeviceAgent) deleteFlowsAndGroups(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
+func (agent *Agent) deleteFlowsAndGroups(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
response, err := agent.deleteFlowsAndGroupsFromAdapter(ctx, flowsToDel, groupsToDel, flowMetadata)
if err != nil {
return err
@@ -572,7 +574,7 @@
return nil
}
-func (agent *DeviceAgent) updateFlowsAndGroupsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+func (agent *Agent) updateFlowsAndGroupsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
logger.Debugw("updateFlowsAndGroups", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
if (len(updatedFlows) | len(updatedGroups)) == 0 {
@@ -589,7 +591,7 @@
if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states")
}
- dType := agent.adapterMgr.getDeviceType(device.Type)
+ dType := agent.adapterMgr.GetDeviceType(device.Type)
if dType == nil {
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
}
@@ -620,7 +622,7 @@
response := coreutils.NewResponse()
// Process bulk flow update differently than incremental update
if !dType.AcceptsAddRemoveFlowUpdates {
- rpcResponse, err := agent.adapterProxy.updateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, nil)
+ rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, nil)
if err != nil {
cancel()
return coreutils.DoneResponse(), err
@@ -681,7 +683,7 @@
ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
ToUpdate: &voltha.FlowGroups{Items: updatedGroups},
}
- rpcResponse, err := agent.adapterProxy.updateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+ rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
if err != nil {
cancel()
return coreutils.DoneResponse(), err
@@ -694,7 +696,7 @@
//updateFlowsAndGroups replaces the existing flows and groups with "updatedFlows" and "updatedGroups" respectively. It
//also sends the updates to the adapters
-func (agent *DeviceAgent) updateFlowsAndGroups(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
+func (agent *Agent) updateFlowsAndGroups(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
response, err := agent.updateFlowsAndGroupsToAdapter(ctx, updatedFlows, updatedGroups, flowMetadata)
if err != nil {
return err
@@ -706,7 +708,7 @@
}
//deleteAllFlows deletes all flows in the device table
-func (agent *DeviceAgent) deleteAllFlows(ctx context.Context) error {
+func (agent *Agent) deleteAllFlows(ctx context.Context) error {
logger.Debugw("deleteAllFlows", log.Fields{"deviceId": agent.deviceID})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
@@ -724,7 +726,7 @@
}
//disableDevice disable a device
-func (agent *DeviceAgent) disableDevice(ctx context.Context) error {
+func (agent *Agent) disableDevice(ctx context.Context) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
@@ -748,7 +750,7 @@
}
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.disableDevice(subCtx, proto.Clone(cloned).(*voltha.Device))
+ ch, err := agent.adapterProxy.DisableDevice(subCtx, proto.Clone(cloned).(*voltha.Device))
if err != nil {
cancel()
return err
@@ -758,7 +760,7 @@
return nil
}
-func (agent *DeviceAgent) rebootDevice(ctx context.Context) error {
+func (agent *Agent) rebootDevice(ctx context.Context) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
@@ -767,7 +769,7 @@
device := agent.getDeviceWithoutLock()
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.rebootDevice(subCtx, device)
+ ch, err := agent.adapterProxy.RebootDevice(subCtx, device)
if err != nil {
cancel()
return err
@@ -776,7 +778,7 @@
return nil
}
-func (agent *DeviceAgent) deleteDevice(ctx context.Context) error {
+func (agent *Agent) deleteDevice(ctx context.Context) error {
logger.Debugw("deleteDevice", log.Fields{"device-id": agent.deviceID})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
@@ -797,7 +799,7 @@
// adapter
if previousState != ic.AdminState_PREPROVISIONED {
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.deleteDevice(subCtx, cloned)
+ ch, err := agent.adapterProxy.DeleteDevice(subCtx, cloned)
if err != nil {
cancel()
return err
@@ -807,7 +809,7 @@
return nil
}
-func (agent *DeviceAgent) setParentID(ctx context.Context, device *voltha.Device, parentID string) error {
+func (agent *Agent) setParentID(ctx context.Context, device *voltha.Device, parentID string) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
@@ -825,7 +827,7 @@
return nil
}
-func (agent *DeviceAgent) updatePmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
+func (agent *Agent) updatePmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
@@ -840,7 +842,7 @@
}
// Send the request to the adapter
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.updatePmConfigs(subCtx, cloned, pmConfigs)
+ ch, err := agent.adapterProxy.UpdatePmConfigs(subCtx, cloned, pmConfigs)
if err != nil {
cancel()
return err
@@ -849,7 +851,7 @@
return nil
}
-func (agent *DeviceAgent) initPmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
+func (agent *Agent) initPmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
@@ -862,7 +864,7 @@
return agent.updateDeviceInStoreWithoutLock(updateCtx, cloned, false, "")
}
-func (agent *DeviceAgent) listPmConfigs(ctx context.Context) (*voltha.PmConfigs, error) {
+func (agent *Agent) listPmConfigs(ctx context.Context) (*voltha.PmConfigs, error) {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
@@ -872,7 +874,7 @@
return agent.getDeviceWithoutLock().PmConfigs, nil
}
-func (agent *DeviceAgent) downloadImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
+func (agent *Agent) downloadImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
@@ -910,7 +912,7 @@
// Send the request to the adapter
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.downloadImage(ctx, cloned, clonedImg)
+ ch, err := agent.adapterProxy.DownloadImage(ctx, cloned, clonedImg)
if err != nil {
cancel()
return nil, err
@@ -930,7 +932,7 @@
return false
}
-func (agent *DeviceAgent) cancelImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
+func (agent *Agent) cancelImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
@@ -958,7 +960,7 @@
return nil, err
}
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.cancelImageDownload(subCtx, device, img)
+ ch, err := agent.adapterProxy.CancelImageDownload(subCtx, device, img)
if err != nil {
cancel()
return nil, err
@@ -968,7 +970,7 @@
return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
}
-func (agent *DeviceAgent) activateImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
+func (agent *Agent) activateImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
@@ -996,7 +998,7 @@
}
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.activateImageUpdate(subCtx, proto.Clone(cloned).(*voltha.Device), img)
+ ch, err := agent.adapterProxy.ActivateImageUpdate(subCtx, proto.Clone(cloned).(*voltha.Device), img)
if err != nil {
cancel()
return nil, err
@@ -1008,7 +1010,7 @@
return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
}
-func (agent *DeviceAgent) revertImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
+func (agent *Agent) revertImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
@@ -1037,7 +1039,7 @@
}
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.revertImageUpdate(subCtx, proto.Clone(cloned).(*voltha.Device), img)
+ ch, err := agent.adapterProxy.RevertImageUpdate(subCtx, proto.Clone(cloned).(*voltha.Device), img)
if err != nil {
cancel()
return nil, err
@@ -1047,14 +1049,14 @@
return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
}
-func (agent *DeviceAgent) getImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+func (agent *Agent) getImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
logger.Debugw("getImageDownloadStatus", log.Fields{"device-id": agent.deviceID})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
device := agent.getDeviceWithoutLock()
- ch, err := agent.adapterProxy.getImageDownloadStatus(ctx, device, img)
+ ch, err := agent.adapterProxy.GetImageDownloadStatus(ctx, device, img)
agent.requestQueue.RequestComplete()
if err != nil {
return nil, err
@@ -1075,7 +1077,7 @@
return imgDownload, nil
}
-func (agent *DeviceAgent) updateImageDownload(ctx context.Context, img *voltha.ImageDownload) error {
+func (agent *Agent) updateImageDownload(ctx context.Context, img *voltha.ImageDownload) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
@@ -1103,7 +1105,7 @@
return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
}
-func (agent *DeviceAgent) getImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+func (agent *Agent) getImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
@@ -1119,7 +1121,7 @@
return nil, status.Errorf(codes.NotFound, "image-not-found:%s", img.Name)
}
-func (agent *DeviceAgent) listImageDownloads(ctx context.Context, deviceID string) (*voltha.ImageDownloads, error) {
+func (agent *Agent) listImageDownloads(ctx context.Context, deviceID string) (*voltha.ImageDownloads, error) {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
@@ -1130,7 +1132,7 @@
}
// getPorts retrieves the ports information of the device based on the port type.
-func (agent *DeviceAgent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
+func (agent *Agent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
logger.Debugw("getPorts", log.Fields{"device-id": agent.deviceID, "port-type": portType})
ports := &voltha.Ports{}
if device, _ := agent.deviceMgr.GetDevice(ctx, agent.deviceID); device != nil {
@@ -1144,14 +1146,14 @@
}
// getSwitchCapability retrieves the switch capability of a parent device
-func (agent *DeviceAgent) getSwitchCapability(ctx context.Context) (*ic.SwitchCapability, error) {
+func (agent *Agent) getSwitchCapability(ctx context.Context) (*ic.SwitchCapability, error) {
logger.Debugw("getSwitchCapability", log.Fields{"device-id": agent.deviceID})
cloned, err := agent.getDevice(ctx)
if err != nil {
return nil, err
}
- ch, err := agent.adapterProxy.getOfpDeviceInfo(ctx, cloned)
+ ch, err := agent.adapterProxy.GetOfpDeviceInfo(ctx, cloned)
if err != nil {
return nil, err
}
@@ -1173,13 +1175,13 @@
}
// getPortCapability retrieves the port capability of a device
-func (agent *DeviceAgent) getPortCapability(ctx context.Context, portNo uint32) (*ic.PortCapability, error) {
+func (agent *Agent) getPortCapability(ctx context.Context, portNo uint32) (*ic.PortCapability, error) {
logger.Debugw("getPortCapability", log.Fields{"device-id": agent.deviceID})
device, err := agent.getDevice(ctx)
if err != nil {
return nil, err
}
- ch, err := agent.adapterProxy.getOfpPortInfo(ctx, device, portNo)
+ ch, err := agent.adapterProxy.GetOfpPortInfo(ctx, device, portNo)
if err != nil {
return nil, err
}
@@ -1199,7 +1201,7 @@
return portCap, nil
}
-func (agent *DeviceAgent) onPacketFailure(rpc string, response interface{}, args ...interface{}) {
+func (agent *Agent) onPacketFailure(rpc string, response interface{}, args ...interface{}) {
// packet data is encoded in the args param as the first parameter
var packet []byte
if len(args) >= 1 {
@@ -1218,7 +1220,7 @@
})
}
-func (agent *DeviceAgent) packetOut(ctx context.Context, outPort uint32, packet *ofp.OfpPacketOut) error {
+func (agent *Agent) packetOut(ctx context.Context, outPort uint32, packet *ofp.OfpPacketOut) error {
// If deviceType=="" then we must have taken ownership of this device.
// Fixes VOL-2226 where a core would take ownership and have stale data
if agent.deviceType == "" {
@@ -1226,7 +1228,7 @@
}
// Send packet to adapter
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.packetOut(subCtx, agent.deviceType, agent.deviceID, outPort, packet)
+ ch, err := agent.adapterProxy.PacketOut(subCtx, agent.deviceType, agent.deviceID, outPort, packet)
if err != nil {
cancel()
return nil
@@ -1237,7 +1239,7 @@
// updatePartialDeviceData updates a subset of a device that an Adapter can update.
// TODO: May need a specific proto to handle only a subset of a device that can be changed by an adapter
-func (agent *DeviceAgent) mergeDeviceInfoFromAdapter(device *voltha.Device) (*voltha.Device, error) {
+func (agent *Agent) mergeDeviceInfoFromAdapter(device *voltha.Device) (*voltha.Device, error) {
cloned := agent.getDeviceWithoutLock()
cloned.Root = device.Root
cloned.Vendor = device.Vendor
@@ -1249,7 +1251,7 @@
return cloned, nil
}
-func (agent *DeviceAgent) updateDeviceUsingAdapterData(ctx context.Context, device *voltha.Device) error {
+func (agent *Agent) updateDeviceUsingAdapterData(ctx context.Context, device *voltha.Device) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
@@ -1264,14 +1266,14 @@
return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
}
-func (agent *DeviceAgent) updateDeviceWithoutLock(ctx context.Context, device *voltha.Device) error {
+func (agent *Agent) updateDeviceWithoutLock(ctx context.Context, device *voltha.Device) error {
logger.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
//cloned := proto.Clone(device).(*voltha.Device)
cloned := device
return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
}
-func (agent *DeviceAgent) updateDeviceStatus(ctx context.Context, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
+func (agent *Agent) updateDeviceStatus(ctx context.Context, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
@@ -1294,7 +1296,7 @@
return agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, cloned.AdminState, newConnStatus, newOperStatus)
}
-func (agent *DeviceAgent) updatePortsOperState(ctx context.Context, operStatus voltha.OperStatus_Types) error {
+func (agent *Agent) updatePortsOperState(ctx context.Context, operStatus voltha.OperStatus_Types) error {
logger.Debugw("updatePortsOperState", log.Fields{"device-id": agent.deviceID})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
@@ -1308,7 +1310,7 @@
return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
}
-func (agent *DeviceAgent) updatePortState(ctx context.Context, portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_Types) error {
+func (agent *Agent) updatePortState(ctx context.Context, portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_Types) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
@@ -1331,7 +1333,7 @@
return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
}
-func (agent *DeviceAgent) deleteAllPorts(ctx context.Context) error {
+func (agent *Agent) deleteAllPorts(ctx context.Context) error {
logger.Debugw("deleteAllPorts", log.Fields{"deviceId": agent.deviceID})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
@@ -1356,7 +1358,7 @@
return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
}
-func (agent *DeviceAgent) addPort(ctx context.Context, port *voltha.Port) error {
+func (agent *Agent) addPort(ctx context.Context, port *voltha.Port) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
@@ -1395,7 +1397,7 @@
return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
}
-func (agent *DeviceAgent) addPeerPort(ctx context.Context, peerPort *voltha.Port_PeerPort) error {
+func (agent *Agent) addPeerPort(ctx context.Context, peerPort *voltha.Port_PeerPort) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
@@ -1433,7 +1435,7 @@
}
// TODO: A generic device update by attribute
-func (agent *DeviceAgent) updateDeviceAttribute(ctx context.Context, name string, value interface{}) {
+func (agent *Agent) updateDeviceAttribute(ctx context.Context, name string, value interface{}) {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
logger.Warnw("request-aborted", log.Fields{"device-id": agent.deviceID, "name": name, "error": err})
return
@@ -1471,7 +1473,7 @@
}
}
-func (agent *DeviceAgent) simulateAlarm(ctx context.Context, simulatereq *voltha.SimulateAlarmRequest) error {
+func (agent *Agent) simulateAlarm(ctx context.Context, simulatereq *voltha.SimulateAlarmRequest) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
@@ -1481,7 +1483,7 @@
cloned := agent.getDeviceWithoutLock()
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.simulateAlarm(subCtx, cloned, simulatereq)
+ ch, err := agent.adapterProxy.SimulateAlarm(subCtx, cloned, simulatereq)
if err != nil {
cancel()
return err
@@ -1490,7 +1492,7 @@
return nil
}
-func (agent *DeviceAgent) updateDeviceStateInStoreWithoutLock(
+func (agent *Agent) updateDeviceStateInStoreWithoutLock(
ctx context.Context,
device *voltha.Device,
adminState voltha.AdminState_Types,
@@ -1515,7 +1517,7 @@
//This is an update operation to model without Lock.This function must never be invoked by another function unless the latter holds a lock on the device.
// It is an internal helper function.
-func (agent *DeviceAgent) updateDeviceInStoreWithoutLock(ctx context.Context, device *voltha.Device, strict bool, txid string) error {
+func (agent *Agent) updateDeviceInStoreWithoutLock(ctx context.Context, device *voltha.Device, strict bool, txid string) error {
if agent.stopped {
return errors.New("device agent stopped")
}
@@ -1530,7 +1532,7 @@
return nil
}
-func (agent *DeviceAgent) updateDeviceReason(ctx context.Context, reason string) error {
+func (agent *Agent) updateDeviceReason(ctx context.Context, reason string) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
@@ -1543,7 +1545,7 @@
return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
}
-func (agent *DeviceAgent) disablePort(ctx context.Context, Port *voltha.Port) error {
+func (agent *Agent) disablePort(ctx context.Context, Port *voltha.Port) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
@@ -1575,7 +1577,7 @@
//send request to adapter
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.disablePort(ctx, device, cp)
+ ch, err := agent.adapterProxy.DisablePort(ctx, device, cp)
if err != nil {
cancel()
return err
@@ -1584,7 +1586,7 @@
return nil
}
-func (agent *DeviceAgent) enablePort(ctx context.Context, Port *voltha.Port) error {
+func (agent *Agent) enablePort(ctx context.Context, Port *voltha.Port) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
@@ -1616,7 +1618,7 @@
}
//send request to adapter
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.enablePort(ctx, device, cp)
+ ch, err := agent.adapterProxy.EnablePort(ctx, device, cp)
if err != nil {
cancel()
return err
@@ -1625,7 +1627,7 @@
return nil
}
-func (agent *DeviceAgent) ChildDeviceLost(ctx context.Context, device *voltha.Device) error {
+func (agent *Agent) ChildDeviceLost(ctx context.Context, device *voltha.Device) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
@@ -1651,7 +1653,7 @@
//send request to adapter
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.childDeviceLost(ctx, agent.deviceType, agent.deviceID, device.ParentPortNo, device.ProxyAddress.OnuId)
+ ch, err := agent.adapterProxy.ChildDeviceLost(ctx, agent.deviceType, agent.deviceID, device.ParentPortNo, device.ProxyAddress.OnuId)
if err != nil {
cancel()
return err
@@ -1660,7 +1662,7 @@
return nil
}
-func (agent *DeviceAgent) startOmciTest(ctx context.Context, omcitestrequest *voltha.OmciTestRequest) (*voltha.TestResponse, error) {
+func (agent *Agent) startOmciTest(ctx context.Context, omcitestrequest *voltha.OmciTestRequest) (*voltha.TestResponse, error) {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
@@ -1668,17 +1670,16 @@
device := agent.getDeviceWithoutLock()
if device.Adapter == "" {
- adapterName, err := agent.adapterMgr.getAdapterType(device.Type)
+ adapterName, err := agent.adapterMgr.GetAdapterType(device.Type)
if err != nil {
agent.requestQueue.RequestComplete()
return nil, err
}
-
device.Adapter = adapterName
}
// Send request to the adapter
- ch, err := agent.adapterProxy.startOmciTest(ctx, device, omcitestrequest)
+ ch, err := agent.adapterProxy.StartOmciTest(ctx, device, omcitestrequest)
agent.requestQueue.RequestComplete()
if err != nil {
return nil, err
diff --git a/rw_core/core/device_agent_test.go b/rw_core/core/device/agent_test.go
similarity index 82%
rename from rw_core/core/device_agent_test.go
rename to rw_core/core/device/agent_test.go
index f8fb810..ce69599 100755
--- a/rw_core/core/device_agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -13,13 +13,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package core
+package device
import (
"context"
"github.com/gogo/protobuf/proto"
+ "github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/rw_core/config"
+ "github.com/opencord/voltha-go/rw_core/core/adapter"
com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
mock_kafka "github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka"
@@ -27,8 +31,11 @@
"github.com/opencord/voltha-protos/v3/go/voltha"
"github.com/phayes/freeport"
"github.com/stretchr/testify/assert"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
"math/rand"
"sort"
+ "strconv"
"strings"
"sync"
"testing"
@@ -36,17 +43,19 @@
)
type DATest struct {
- etcdServer *mock_etcd.EtcdServer
- core *Core
- kClient kafka.Client
- kvClientPort int
- oltAdapterName string
- onuAdapterName string
- coreInstanceID string
- defaultTimeout time.Duration
- maxTimeout time.Duration
- device *voltha.Device
- done chan int
+ etcdServer *mock_etcd.EtcdServer
+ deviceMgr *Manager
+ logicalDeviceMgr *LogicalManager
+ kmp kafka.InterContainerProxy
+ kClient kafka.Client
+ kvClientPort int
+ oltAdapterName string
+ onuAdapterName string
+ coreInstanceID string
+ defaultTimeout time.Duration
+ maxTimeout time.Duration
+ device *voltha.Device
+ done chan int
}
func newDATest() *DATest {
@@ -94,10 +103,14 @@
OperStatus: voltha.OperStatus_ACTIVE},
},
}
-
return test
}
+type fakeEventCallbacks struct{}
+
+func (fakeEventCallbacks) SendChangeEvent(_ string, _ *ofp.OfpPortStatus) {}
+func (fakeEventCallbacks) SendPacketIn(_ string, _ string, _ *ofp.OfpPacketIn) {}
+
func (dat *DATest) startCore(inCompeteMode bool) {
cfg := config.NewRWCoreFlags()
cfg.CorePairTopic = "rw_core"
@@ -110,31 +123,92 @@
}
cfg.GrpcPort = grpcPort
cfg.GrpcHost = "127.0.0.1"
- setCoreCompeteMode(inCompeteMode)
client := setupKVClient(cfg, dat.coreInstanceID)
- dat.core = NewCore(context.Background(), dat.coreInstanceID, cfg, client, dat.kClient)
- err = dat.core.Start(context.Background())
- if err != nil {
- logger.Fatal("Cannot start core")
+ backend := &db.Backend{
+ Client: client,
+ StoreType: cfg.KVStoreType,
+ Host: cfg.KVStoreHost,
+ Port: cfg.KVStorePort,
+ Timeout: cfg.KVStoreTimeout,
+ LivenessChannelInterval: cfg.LiveProbeInterval / 2,
+ PathPrefix: cfg.KVStoreDataPrefix}
+ dat.kmp = kafka.NewInterContainerProxy(
+ kafka.InterContainerHost(cfg.KafkaAdapterHost),
+ kafka.InterContainerPort(cfg.KafkaAdapterPort),
+ kafka.MsgClient(dat.kClient),
+ kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}),
+ kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: cfg.AffinityRouterTopic}))
+
+ endpointMgr := kafka.NewEndpointManager(backend)
+ proxy := model.NewProxy(backend, "/")
+ adapterMgr := adapter.NewAdapterManager(proxy, dat.coreInstanceID, dat.kClient)
+
+ dat.deviceMgr, dat.logicalDeviceMgr = NewDeviceManagers(proxy, adapterMgr, dat.kmp, endpointMgr, cfg.CorePairTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout)
+ dat.logicalDeviceMgr.SetEventCallbacks(fakeEventCallbacks{})
+ if err = dat.kmp.Start(); err != nil {
+ logger.Fatal("Cannot start InterContainerProxy")
}
+ if err = adapterMgr.Start(context.Background()); err != nil {
+ logger.Fatal("Cannot start adapterMgr")
+ }
+ dat.deviceMgr.Start(context.Background())
+ dat.logicalDeviceMgr.Start(context.Background())
}
func (dat *DATest) stopAll() {
if dat.kClient != nil {
dat.kClient.Stop()
}
- if dat.core != nil {
- dat.core.Stop(context.Background())
+ if dat.logicalDeviceMgr != nil {
+ dat.logicalDeviceMgr.Stop(context.Background())
+ }
+ if dat.deviceMgr != nil {
+ dat.deviceMgr.Stop(context.Background())
+ }
+ if dat.kmp != nil {
+ dat.kmp.Stop()
}
if dat.etcdServer != nil {
stopEmbeddedEtcdServer(dat.etcdServer)
}
}
-func (dat *DATest) createDeviceAgent(t *testing.T) *DeviceAgent {
- deviceMgr := dat.core.deviceMgr
+//startEmbeddedEtcdServer creates and starts an Embedded etcd server locally.
+func startEmbeddedEtcdServer(configName, storageDir, logLevel string) (*mock_etcd.EtcdServer, int, error) {
+ kvClientPort, err := freeport.GetFreePort()
+ if err != nil {
+ return nil, 0, err
+ }
+ peerPort, err := freeport.GetFreePort()
+ if err != nil {
+ return nil, 0, err
+ }
+ etcdServer := mock_etcd.StartEtcdServer(mock_etcd.MKConfig(configName, kvClientPort, peerPort, storageDir, logLevel))
+ if etcdServer == nil {
+ return nil, 0, status.Error(codes.Internal, "Embedded server failed to start")
+ }
+ return etcdServer, kvClientPort, nil
+}
+
+func stopEmbeddedEtcdServer(server *mock_etcd.EtcdServer) {
+ if server != nil {
+ server.Stop()
+ }
+}
+
+func setupKVClient(cf *config.RWCoreFlags, coreInstanceID string) kvstore.Client {
+ addr := cf.KVStoreHost + ":" + strconv.Itoa(cf.KVStorePort)
+ client, err := kvstore.NewEtcdClient(addr, cf.KVStoreTimeout)
+ if err != nil {
+ panic("no kv client")
+ }
+ return client
+}
+
+func (dat *DATest) createDeviceAgent(t *testing.T) *Agent {
+ deviceMgr := dat.deviceMgr
clonedDevice := proto.Clone(dat.device).(*voltha.Device)
- deviceAgent := newDeviceAgent(deviceMgr.adapterProxy, clonedDevice, deviceMgr, deviceMgr.clusterDataProxy, deviceMgr.defaultTimeout)
+ deviceAgent := newAgent(deviceMgr.adapterProxy, clonedDevice, deviceMgr, deviceMgr.clusterDataProxy, deviceMgr.defaultTimeout)
d, err := deviceAgent.start(context.TODO(), clonedDevice)
assert.Nil(t, err)
assert.NotNil(t, d)
@@ -142,7 +216,7 @@
return deviceAgent
}
-func (dat *DATest) updateDeviceConcurrently(t *testing.T, da *DeviceAgent, globalWG *sync.WaitGroup) {
+func (dat *DATest) updateDeviceConcurrently(t *testing.T, da *Agent, globalWG *sync.WaitGroup) {
originalDevice, err := da.getDevice(context.Background())
assert.Nil(t, err)
assert.NotNil(t, originalDevice)
diff --git a/rw_core/core/device/common.go b/rw_core/core/device/common.go
new file mode 100644
index 0000000..f2b8748
--- /dev/null
+++ b/rw_core/core/device/common.go
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package core Common Logger initialization
+package device
+
+import (
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+)
+
+var logger log.Logger
+
+func init() {
+ // Setup this package so that it's log level can be modified at run time
+ var err error
+ logger, err = log.AddPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "device"})
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/device/logical_agent.go
similarity index 88%
rename from rw_core/core/logical_device_agent.go
rename to rw_core/core/device/logical_agent.go
index a267a77..c6e4e73 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package core
+package device
import (
"context"
@@ -39,13 +39,13 @@
"google.golang.org/grpc/status"
)
-// LogicalDeviceAgent represent attributes of logical device agent
-type LogicalDeviceAgent struct {
+// LogicalAgent represent attributes of logical device agent
+type LogicalAgent struct {
logicalDeviceID string
serialNumber string
rootDeviceID string
- deviceMgr *DeviceManager
- ldeviceMgr *LogicalDeviceManager
+ deviceMgr *Manager
+ ldeviceMgr *LogicalManager
clusterDataProxy *model.Proxy
stopped bool
deviceRoutes *route.DeviceRoutes
@@ -60,9 +60,9 @@
stopOnce sync.Once
}
-func newLogicalDeviceAgent(id string, sn string, deviceID string, ldeviceMgr *LogicalDeviceManager,
- deviceMgr *DeviceManager, cdProxy *model.Proxy, timeout time.Duration) *LogicalDeviceAgent {
- var agent LogicalDeviceAgent
+func newLogicalDeviceAgent(id string, sn string, deviceID string, ldeviceMgr *LogicalManager,
+ deviceMgr *Manager, cdProxy *model.Proxy, timeout time.Duration) *LogicalAgent {
+ var agent LogicalAgent
agent.logicalDeviceID = id
agent.serialNumber = sn
agent.rootDeviceID = deviceID
@@ -77,7 +77,7 @@
}
// start creates the logical device and add it to the data model
-func (agent *LogicalDeviceAgent) start(ctx context.Context, loadFromDB bool) error {
+func (agent *LogicalAgent) start(ctx context.Context, loadFromDB bool) error {
needToStart := false
if agent.startOnce.Do(func() { needToStart = true }); !needToStart {
return nil
@@ -106,7 +106,7 @@
// Create the datapath ID (uint64) using the logical device ID (based on the MAC Address)
var datapathID uint64
- if datapathID, err = CreateDataPathID(agent.serialNumber); err != nil {
+ if datapathID, err = coreutils.CreateDataPathID(agent.serialNumber); err != nil {
return err
}
ld.DatapathId = datapathID
@@ -168,7 +168,7 @@
}
// stop stops the logical device agent. This removes the logical device from the data model.
-func (agent *LogicalDeviceAgent) stop(ctx context.Context) error {
+func (agent *LogicalAgent) stop(ctx context.Context) error {
var returnErr error
agent.stopOnce.Do(func() {
logger.Info("stopping-logical_device-agent")
@@ -195,7 +195,7 @@
}
// GetLogicalDevice returns the latest logical device data
-func (agent *LogicalDeviceAgent) GetLogicalDevice(ctx context.Context) (*voltha.LogicalDevice, error) {
+func (agent *LogicalAgent) GetLogicalDevice(ctx context.Context) (*voltha.LogicalDevice, error) {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
@@ -204,7 +204,7 @@
}
// ListLogicalDeviceFlows returns logical device flows
-func (agent *LogicalDeviceAgent) ListLogicalDeviceFlows(ctx context.Context) (*ofp.Flows, error) {
+func (agent *LogicalAgent) ListLogicalDeviceFlows(ctx context.Context) (*ofp.Flows, error) {
logger.Debug("ListLogicalDeviceFlows")
logicalDevice, err := agent.GetLogicalDevice(ctx)
@@ -218,7 +218,7 @@
}
// ListLogicalDeviceMeters returns logical device meters
-func (agent *LogicalDeviceAgent) ListLogicalDeviceMeters(ctx context.Context) (*ofp.Meters, error) {
+func (agent *LogicalAgent) ListLogicalDeviceMeters(ctx context.Context) (*ofp.Meters, error) {
logger.Debug("ListLogicalDeviceMeters")
logicalDevice, err := agent.GetLogicalDevice(ctx)
@@ -232,7 +232,7 @@
}
// ListLogicalDeviceFlowGroups returns logical device flow groups
-func (agent *LogicalDeviceAgent) ListLogicalDeviceFlowGroups(ctx context.Context) (*ofp.FlowGroups, error) {
+func (agent *LogicalAgent) ListLogicalDeviceFlowGroups(ctx context.Context) (*ofp.FlowGroups, error) {
logger.Debug("ListLogicalDeviceFlowGroups")
logicalDevice, err := agent.GetLogicalDevice(ctx)
@@ -246,7 +246,7 @@
}
// ListLogicalDevicePorts returns logical device ports
-func (agent *LogicalDeviceAgent) ListLogicalDevicePorts(ctx context.Context) (*voltha.LogicalPorts, error) {
+func (agent *LogicalAgent) ListLogicalDevicePorts(ctx context.Context) (*voltha.LogicalPorts, error) {
logger.Debug("ListLogicalDevicePorts")
logicalDevice, err := agent.GetLogicalDevice(ctx)
if err != nil {
@@ -261,7 +261,7 @@
}
//updateLogicalDeviceFlowsWithoutLock updates the logical device with the latest flows in the model.
-func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowsWithoutLock(ctx context.Context, flows *ofp.Flows) error {
+func (agent *LogicalAgent) updateLogicalDeviceFlowsWithoutLock(ctx context.Context, flows *ofp.Flows) error {
ld := agent.getLogicalDeviceWithoutLock()
logger.Debugw("logical-device-before", log.Fields{"lports": len(ld.Ports)})
@@ -275,7 +275,7 @@
}
//updateLogicalDeviceMetersWithoutLock updates the logical device with the meters info
-func (agent *LogicalDeviceAgent) updateLogicalDeviceMetersWithoutLock(ctx context.Context, meters *ofp.Meters) error {
+func (agent *LogicalAgent) updateLogicalDeviceMetersWithoutLock(ctx context.Context, meters *ofp.Meters) error {
ld := agent.getLogicalDeviceWithoutLock()
logger.Debugw("logical-device-before", log.Fields{"lports": len(ld.Ports)})
@@ -289,7 +289,7 @@
}
//updateLogicalDeviceFlowGroupsWithoutLock updates the logical device with the flow groups
-func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowGroupsWithoutLock(ctx context.Context, flowGroups *ofp.FlowGroups) error {
+func (agent *LogicalAgent) updateLogicalDeviceFlowGroupsWithoutLock(ctx context.Context, flowGroups *ofp.FlowGroups) error {
ld := agent.getLogicalDeviceWithoutLock()
logger.Debugw("logical-device-before", log.Fields{"lports": len(ld.Ports)})
@@ -303,12 +303,12 @@
}
// getLogicalDeviceWithoutLock returns a cloned logical device to a function that already holds the agent lock.
-func (agent *LogicalDeviceAgent) getLogicalDeviceWithoutLock() *voltha.LogicalDevice {
+func (agent *LogicalAgent) getLogicalDeviceWithoutLock() *voltha.LogicalDevice {
logger.Debug("getLogicalDeviceWithoutLock")
return proto.Clone(agent.logicalDevice).(*voltha.LogicalDevice)
}
-func (agent *LogicalDeviceAgent) updateLogicalPort(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
+func (agent *LogicalAgent) updateLogicalPort(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
logger.Debugw("updateLogicalPort", log.Fields{"deviceId": device.Id, "port": port})
var err error
if port.Type == voltha.Port_ETHERNET_NNI {
@@ -334,7 +334,7 @@
// setupLogicalPorts is invoked once the logical device has been created and is ready to get ports
// added to it. While the logical device was being created we could have received requests to add
// NNI and UNI ports which were discarded. Now is the time to add them if needed
-func (agent *LogicalDeviceAgent) setupLogicalPorts(ctx context.Context) error {
+func (agent *LogicalAgent) setupLogicalPorts(ctx context.Context) error {
logger.Infow("setupLogicalPorts", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
// First add any NNI ports which could have been missing
if err := agent.setupNNILogicalPorts(ctx, agent.rootDeviceID); err != nil {
@@ -343,7 +343,7 @@
}
// Now, set up the UNI ports if needed.
- children, err := agent.deviceMgr.getAllChildDevices(ctx, agent.rootDeviceID)
+ children, err := agent.deviceMgr.GetAllChildDevices(ctx, agent.rootDeviceID)
if err != nil {
logger.Errorw("error-getting-child-devices", log.Fields{"error": err, "deviceId": agent.rootDeviceID})
return err
@@ -368,7 +368,7 @@
}
// setupNNILogicalPorts creates an NNI port on the logical device that represents an NNI interface on a root device
-func (agent *LogicalDeviceAgent) setupNNILogicalPorts(ctx context.Context, deviceID string) error {
+func (agent *LogicalAgent) setupNNILogicalPorts(ctx context.Context, deviceID string) error {
logger.Infow("setupNNILogicalPorts-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
// Build the logical device based on information retrieved from the device adapter
var err error
@@ -392,7 +392,7 @@
}
// updatePortState updates the port state of the device
-func (agent *LogicalDeviceAgent) updatePortState(ctx context.Context, deviceID string, portNo uint32, operStatus voltha.OperStatus_Types) error {
+func (agent *LogicalAgent) updatePortState(ctx context.Context, deviceID string, portNo uint32, operStatus voltha.OperStatus_Types) error {
logger.Infow("updatePortState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "portNo": portNo, "state": operStatus})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
@@ -422,7 +422,7 @@
}
// updatePortsState updates the ports state related to the device
-func (agent *LogicalDeviceAgent) updatePortsState(ctx context.Context, device *voltha.Device, state voltha.OperStatus_Types) error {
+func (agent *LogicalAgent) updatePortsState(ctx context.Context, device *voltha.Device, state voltha.OperStatus_Types) error {
logger.Infow("updatePortsState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
@@ -451,7 +451,7 @@
}
// setupUNILogicalPorts creates a UNI port on the logical device that represents a child UNI interface
-func (agent *LogicalDeviceAgent) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device) error {
+func (agent *LogicalAgent) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device) error {
logger.Infow("setupUNILogicalPort", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
// Build the logical device based on information retrieved from the device adapter
var err error
@@ -471,7 +471,7 @@
}
// deleteAllLogicalPorts deletes all logical ports associated with this logical device
-func (agent *LogicalDeviceAgent) deleteAllLogicalPorts(ctx context.Context) error {
+func (agent *LogicalAgent) deleteAllLogicalPorts(ctx context.Context) error {
logger.Infow("updatePortsState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
@@ -488,7 +488,7 @@
}
// deleteAllUNILogicalPorts deletes all UNI logical ports associated with this parent device
-func (agent *LogicalDeviceAgent) deleteAllUNILogicalPorts(ctx context.Context, parentDevice *voltha.Device) error {
+func (agent *LogicalAgent) deleteAllUNILogicalPorts(ctx context.Context, parentDevice *voltha.Device) error {
logger.Debugw("delete-all-uni-logical-ports", log.Fields{"logical-device-id": agent.logicalDeviceID})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
@@ -520,7 +520,7 @@
}
//updateLogicalDevicePortsWithoutLock updates the
-func (agent *LogicalDeviceAgent) updateLogicalDevicePortsWithoutLock(ctx context.Context, device *voltha.LogicalDevice, newPorts []*voltha.LogicalPort) error {
+func (agent *LogicalAgent) updateLogicalDevicePortsWithoutLock(ctx context.Context, device *voltha.LogicalDevice, newPorts []*voltha.LogicalPort) error {
oldPorts := device.Ports
device.Ports = newPorts
if err := agent.updateLogicalDeviceWithoutLock(ctx, device); err != nil {
@@ -531,7 +531,7 @@
}
//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
-func (agent *LogicalDeviceAgent) updateLogicalDeviceWithoutLock(ctx context.Context, logicalDevice *voltha.LogicalDevice) error {
+func (agent *LogicalAgent) updateLogicalDeviceWithoutLock(ctx context.Context, logicalDevice *voltha.LogicalDevice) error {
if agent.stopped {
return errors.New("logical device agent stopped")
}
@@ -549,7 +549,7 @@
//generateDeviceRoutesIfNeeded generates the device routes if the logical device has been updated since the last time
//that device graph was generated.
-func (agent *LogicalDeviceAgent) generateDeviceRoutesIfNeeded(ctx context.Context) error {
+func (agent *LogicalAgent) generateDeviceRoutesIfNeeded(ctx context.Context) error {
agent.lockDeviceRoutes.Lock()
defer agent.lockDeviceRoutes.Unlock()
@@ -569,8 +569,8 @@
}
//updateFlowTable updates the flow table of that logical device
-func (agent *LogicalDeviceAgent) updateFlowTable(ctx context.Context, flow *ofp.OfpFlowMod) error {
- logger.Debug("updateFlowTable")
+func (agent *LogicalAgent) updateFlowTable(ctx context.Context, flow *ofp.OfpFlowMod) error {
+ logger.Debug("UpdateFlowTable")
if flow == nil {
return nil
}
@@ -594,7 +594,7 @@
}
//updateGroupTable updates the group table of that logical device
-func (agent *LogicalDeviceAgent) updateGroupTable(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
+func (agent *LogicalAgent) updateGroupTable(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
logger.Debug("updateGroupTable")
if groupMod == nil {
return nil
@@ -616,7 +616,7 @@
}
// updateMeterTable updates the meter table of that logical device
-func (agent *LogicalDeviceAgent) updateMeterTable(ctx context.Context, meterMod *ofp.OfpMeterMod) error {
+func (agent *LogicalAgent) updateMeterTable(ctx context.Context, meterMod *ofp.OfpMeterMod) error {
logger.Debug("updateMeterTable")
if meterMod == nil {
return nil
@@ -634,7 +634,7 @@
}
-func (agent *LogicalDeviceAgent) meterAdd(ctx context.Context, meterMod *ofp.OfpMeterMod) error {
+func (agent *LogicalAgent) meterAdd(ctx context.Context, meterMod *ofp.OfpMeterMod) error {
logger.Debugw("meterAdd", log.Fields{"metermod": *meterMod})
if meterMod == nil {
return nil
@@ -670,7 +670,7 @@
return nil
}
-func (agent *LogicalDeviceAgent) meterDelete(ctx context.Context, meterMod *ofp.OfpMeterMod) error {
+func (agent *LogicalAgent) meterDelete(ctx context.Context, meterMod *ofp.OfpMeterMod) error {
logger.Debug("meterDelete", log.Fields{"meterMod": *meterMod})
if meterMod == nil {
return nil
@@ -728,7 +728,7 @@
return nil
}
-func (agent *LogicalDeviceAgent) meterModify(ctx context.Context, meterMod *ofp.OfpMeterMod) error {
+func (agent *LogicalAgent) meterModify(ctx context.Context, meterMod *ofp.OfpMeterMod) error {
logger.Debug("meterModify")
if meterMod == nil {
return nil
@@ -774,7 +774,7 @@
}
-func (agent *LogicalDeviceAgent) getUpdatedFlowsAfterDeletebyMeterID(flows []*ofp.OfpFlowStats, meterID uint32) (bool, []*ofp.OfpFlowStats) {
+func (agent *LogicalAgent) getUpdatedFlowsAfterDeletebyMeterID(flows []*ofp.OfpFlowStats, meterID uint32) (bool, []*ofp.OfpFlowStats) {
logger.Infow("Delete flows matching meter", log.Fields{"meter": meterID})
changed := false
//updatedFlows := make([]*ofp.OfpFlowStats, 0)
@@ -788,7 +788,7 @@
return changed, flows
}
-func (agent *LogicalDeviceAgent) updateFlowCountOfMeterStats(modCommand *ofp.OfpFlowMod, meters []*ofp.OfpMeterEntry, flow *ofp.OfpFlowStats) bool {
+func (agent *LogicalAgent) updateFlowCountOfMeterStats(modCommand *ofp.OfpFlowMod, meters []*ofp.OfpMeterEntry, flow *ofp.OfpFlowStats) bool {
flowCommand := modCommand.GetCommand()
meterID := fu.GetMeterIdFromFlow(flow)
@@ -819,7 +819,7 @@
}
//flowAdd adds a flow to the flow table of that logical device
-func (agent *LogicalDeviceAgent) flowAdd(ctx context.Context, mod *ofp.OfpFlowMod) error {
+func (agent *LogicalAgent) flowAdd(ctx context.Context, mod *ofp.OfpFlowMod) error {
logger.Debugw("flowAdd", log.Fields{"flow": mod})
if mod == nil {
return nil
@@ -936,7 +936,7 @@
}
// GetMeterConfig returns meter config
-func (agent *LogicalDeviceAgent) GetMeterConfig(flows []*ofp.OfpFlowStats, meters []*ofp.OfpMeterEntry, metadata *voltha.FlowMetadata) error {
+func (agent *LogicalAgent) GetMeterConfig(flows []*ofp.OfpFlowStats, meters []*ofp.OfpMeterEntry, metadata *voltha.FlowMetadata) error {
m := make(map[uint32]bool)
for _, flow := range flows {
if flowMeterID := fu.GetMeterIdFromFlow(flow); flowMeterID != 0 && !m[flowMeterID] {
@@ -965,7 +965,7 @@
}
//flowDelete deletes a flow from the flow table of that logical device
-func (agent *LogicalDeviceAgent) flowDelete(ctx context.Context, mod *ofp.OfpFlowMod) error {
+func (agent *LogicalAgent) flowDelete(ctx context.Context, mod *ofp.OfpFlowMod) error {
logger.Debug("flowDelete")
if mod == nil {
return nil
@@ -1051,7 +1051,7 @@
return nil
}
-func (agent *LogicalDeviceAgent) addFlowsAndGroupsToDevices(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
+func (agent *LogicalAgent) addFlowsAndGroupsToDevices(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
logger.Debugw("send-add-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID, "deviceRules": deviceRules, "flowMetadata": flowMetadata})
responses := make([]coreutils.Response, 0)
@@ -1072,7 +1072,7 @@
return responses
}
-func (agent *LogicalDeviceAgent) deleteFlowsAndGroupsFromDevices(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
+func (agent *LogicalAgent) deleteFlowsAndGroupsFromDevices(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
logger.Debugw("send-delete-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
responses := make([]coreutils.Response, 0)
@@ -1092,7 +1092,7 @@
return responses
}
-func (agent *LogicalDeviceAgent) updateFlowsAndGroupsOfDevice(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
+func (agent *LogicalAgent) updateFlowsAndGroupsOfDevice(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
logger.Debugw("send-update-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
responses := make([]coreutils.Response, 0)
@@ -1113,7 +1113,7 @@
}
//flowDeleteStrict deletes a flow from the flow table of that logical device
-func (agent *LogicalDeviceAgent) flowDeleteStrict(ctx context.Context, mod *ofp.OfpFlowMod) error {
+func (agent *LogicalAgent) flowDeleteStrict(ctx context.Context, mod *ofp.OfpFlowMod) error {
logger.Debug("flowDeleteStrict")
if mod == nil {
return nil
@@ -1198,16 +1198,16 @@
}
//flowModify modifies a flow from the flow table of that logical device
-func (agent *LogicalDeviceAgent) flowModify(mod *ofp.OfpFlowMod) error {
+func (agent *LogicalAgent) flowModify(mod *ofp.OfpFlowMod) error {
return errors.New("flowModify not implemented")
}
//flowModifyStrict deletes a flow from the flow table of that logical device
-func (agent *LogicalDeviceAgent) flowModifyStrict(mod *ofp.OfpFlowMod) error {
+func (agent *LogicalAgent) flowModifyStrict(mod *ofp.OfpFlowMod) error {
return errors.New("flowModifyStrict not implemented")
}
-func (agent *LogicalDeviceAgent) groupAdd(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
+func (agent *LogicalAgent) groupAdd(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
logger.Debug("groupAdd")
if groupMod == nil {
return nil
@@ -1251,7 +1251,7 @@
return fmt.Errorf("Groups %d already present", groupMod.GroupId)
}
-func (agent *LogicalDeviceAgent) groupDelete(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
+func (agent *LogicalAgent) groupDelete(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
logger.Debug("groupDelete")
if groupMod == nil {
return nil
@@ -1315,7 +1315,7 @@
return nil
}
-func (agent *LogicalDeviceAgent) groupModify(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
+func (agent *LogicalAgent) groupModify(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
logger.Debug("groupModify")
if groupMod == nil {
return nil
@@ -1366,7 +1366,7 @@
}
// deleteLogicalPort removes the logical port
-func (agent *LogicalDeviceAgent) deleteLogicalPort(ctx context.Context, lPort *voltha.LogicalPort) error {
+func (agent *LogicalAgent) deleteLogicalPort(ctx context.Context, lPort *voltha.LogicalPort) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
@@ -1408,7 +1408,7 @@
}
// deleteLogicalPorts removes the logical ports associated with that deviceId
-func (agent *LogicalDeviceAgent) deleteLogicalPorts(ctx context.Context, deviceID string) error {
+func (agent *LogicalAgent) deleteLogicalPorts(ctx context.Context, deviceID string) error {
logger.Debugw("deleting-logical-ports", log.Fields{"device-id": deviceID})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
@@ -1444,7 +1444,7 @@
}
// enableLogicalPort enables the logical port
-func (agent *LogicalDeviceAgent) enableLogicalPort(ctx context.Context, lPortID string) error {
+func (agent *LogicalAgent) enableLogicalPort(ctx context.Context, lPortID string) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
@@ -1468,7 +1468,7 @@
}
// disableLogicalPort disabled the logical port
-func (agent *LogicalDeviceAgent) disableLogicalPort(ctx context.Context, lPortID string) error {
+func (agent *LogicalAgent) disableLogicalPort(ctx context.Context, lPortID string) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
@@ -1491,7 +1491,7 @@
return status.Errorf(codes.NotFound, "Port %s on Logical Device %s", lPortID, agent.logicalDeviceID)
}
-func (agent *LogicalDeviceAgent) getPreCalculatedRoute(ingress, egress uint32) ([]route.Hop, error) {
+func (agent *LogicalAgent) getPreCalculatedRoute(ingress, egress uint32) ([]route.Hop, error) {
logger.Debugw("ROUTE", log.Fields{"len": len(agent.deviceRoutes.Routes)})
for routeLink, route := range agent.deviceRoutes.Routes {
logger.Debugw("ROUTELINKS", log.Fields{"ingress": ingress, "egress": egress, "routelink": routeLink})
@@ -1503,7 +1503,7 @@
}
// GetRoute returns route
-func (agent *LogicalDeviceAgent) GetRoute(ctx context.Context, ingressPortNo uint32, egressPortNo uint32) ([]route.Hop, error) {
+func (agent *LogicalAgent) GetRoute(ctx context.Context, ingressPortNo uint32, egressPortNo uint32) ([]route.Hop, error) {
logger.Debugw("getting-route", log.Fields{"ingress-port": ingressPortNo, "egress-port": egressPortNo})
routes := make([]route.Hop, 0)
@@ -1572,7 +1572,7 @@
//GetWildcardInputPorts filters out the logical port number from the set of logical ports on the device and
//returns their port numbers. This function is invoked only during flow decomposition where the lock on the logical
//device is already held. Therefore it is safe to retrieve the logical device without lock.
-func (agent *LogicalDeviceAgent) GetWildcardInputPorts(excludePort ...uint32) []uint32 {
+func (agent *LogicalAgent) GetWildcardInputPorts(excludePort ...uint32) []uint32 {
lPorts := make([]uint32, 0)
var exclPort uint32
if len(excludePort) == 1 {
@@ -1588,12 +1588,12 @@
}
// GetDeviceRoutes returns device graph
-func (agent *LogicalDeviceAgent) GetDeviceRoutes() *route.DeviceRoutes {
+func (agent *LogicalAgent) GetDeviceRoutes() *route.DeviceRoutes {
return agent.deviceRoutes
}
//rebuildRoutes rebuilds the device routes
-func (agent *LogicalDeviceAgent) buildRoutes(ctx context.Context) error {
+func (agent *LogicalAgent) buildRoutes(ctx context.Context) error {
logger.Debugf("building-routes", log.Fields{"logical-device-id": agent.logicalDeviceID})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
@@ -1617,7 +1617,7 @@
}
//updateRoutes updates the device routes
-func (agent *LogicalDeviceAgent) updateRoutes(ctx context.Context, lp *voltha.LogicalPort) error {
+func (agent *LogicalAgent) updateRoutes(ctx context.Context, lp *voltha.LogicalPort) error {
logger.Debugw("updateRoutes", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
@@ -1676,7 +1676,7 @@
// portUpdated is invoked when a port is updated on the logical device. Until
// the POST_ADD notification is fixed, we will use the logical device to
// update that data.
-func (agent *LogicalDeviceAgent) portUpdated(oldPorts, newPorts []*voltha.LogicalPort) interface{} {
+func (agent *LogicalAgent) portUpdated(oldPorts, newPorts []*voltha.LogicalPort) interface{} {
if reflect.DeepEqual(oldPorts, newPorts) {
logger.Debug("ports-have-not-changed")
return nil
@@ -1687,15 +1687,15 @@
// Send the port change events to the OF controller
for _, newP := range newPorts {
- go agent.ldeviceMgr.grpcNbiHdlr.sendChangeEvent(agent.logicalDeviceID,
+ go agent.ldeviceMgr.eventCallbacks.SendChangeEvent(agent.logicalDeviceID,
&ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_ADD, Desc: newP.OfpPort})
}
for _, change := range changedPorts {
- go agent.ldeviceMgr.grpcNbiHdlr.sendChangeEvent(agent.logicalDeviceID,
+ go agent.ldeviceMgr.eventCallbacks.SendChangeEvent(agent.logicalDeviceID,
&ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_MODIFY, Desc: change.OfpPort})
}
for _, del := range deletedPorts {
- go agent.ldeviceMgr.grpcNbiHdlr.sendChangeEvent(agent.logicalDeviceID,
+ go agent.ldeviceMgr.eventCallbacks.SendChangeEvent(agent.logicalDeviceID,
&ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_DELETE, Desc: del.OfpPort})
}
@@ -1706,7 +1706,7 @@
// added and an eror in case a valid error is encountered. If the port was successfully added it will return
// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
// scenario. This also applies to the case where the port was already added.
-func (agent *LogicalDeviceAgent) addNNILogicalPort(ctx context.Context, device *voltha.Device, port *voltha.Port) (bool, error) {
+func (agent *LogicalAgent) addNNILogicalPort(ctx context.Context, device *voltha.Device, port *voltha.Port) (bool, error) {
logger.Debugw("addNNILogicalPort", log.Fields{"NNI": port})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
@@ -1770,7 +1770,7 @@
return true, nil
}
-func (agent *LogicalDeviceAgent) portExist(device *voltha.Device, port *voltha.Port) bool {
+func (agent *LogicalAgent) portExist(device *voltha.Device, port *voltha.Port) bool {
ldevice := agent.getLogicalDeviceWithoutLock()
for _, lPort := range ldevice.Ports {
if lPort.DeviceId == device.Id && lPort.DevicePortNo == port.PortNo && lPort.Id == port.Label {
@@ -1784,7 +1784,7 @@
// added and an eror in case a valid error is encountered. If the port was successfully added it will return
// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
// scenario. This also applies to the case where the port was already added.
-func (agent *LogicalDeviceAgent) addUNILogicalPort(ctx context.Context, childDevice *voltha.Device, port *voltha.Port) (bool, error) {
+func (agent *LogicalAgent) addUNILogicalPort(ctx context.Context, childDevice *voltha.Device, port *voltha.Port) (bool, error) {
logger.Debugw("addUNILogicalPort", log.Fields{"port": port})
if childDevice.AdminState != voltha.AdminState_ENABLED || childDevice.OperStatus != voltha.OperStatus_ACTIVE {
logger.Infow("device-not-ready", log.Fields{"deviceId": childDevice.Id, "admin": childDevice.AdminState, "oper": childDevice.OperStatus})
@@ -1845,7 +1845,7 @@
return true, nil
}
-func (agent *LogicalDeviceAgent) packetOut(ctx context.Context, packet *ofp.OfpPacketOut) {
+func (agent *LogicalAgent) packetOut(ctx context.Context, packet *ofp.OfpPacketOut) {
logger.Debugw("packet-out", log.Fields{
"packet": hex.EncodeToString(packet.Data),
"inPort": packet.GetInPort(),
@@ -1858,18 +1858,18 @@
}
}
-func (agent *LogicalDeviceAgent) packetIn(port uint32, transactionID string, packet []byte) {
+func (agent *LogicalAgent) packetIn(port uint32, transactionID string, packet []byte) {
logger.Debugw("packet-in", log.Fields{
"port": port,
"packet": hex.EncodeToString(packet),
"transactionId": transactionID,
})
packetIn := fu.MkPacketIn(port, packet)
- agent.ldeviceMgr.grpcNbiHdlr.sendPacketIn(agent.logicalDeviceID, transactionID, packetIn)
+ agent.ldeviceMgr.eventCallbacks.SendPacketIn(agent.logicalDeviceID, transactionID, packetIn)
logger.Debugw("sending-packet-in", log.Fields{"packet": hex.EncodeToString(packetIn.Data)})
}
-func (agent *LogicalDeviceAgent) addLogicalPortToMap(portNo uint32, nniPort bool) {
+func (agent *LogicalAgent) addLogicalPortToMap(portNo uint32, nniPort bool) {
agent.lockLogicalPortsNo.Lock()
defer agent.lockLogicalPortsNo.Unlock()
if exist := agent.logicalPortsNo[portNo]; !exist {
@@ -1877,7 +1877,7 @@
}
}
-func (agent *LogicalDeviceAgent) deleteLogicalPortsFromMap(portsNo []uint32) {
+func (agent *LogicalAgent) deleteLogicalPortsFromMap(portsNo []uint32) {
agent.lockLogicalPortsNo.Lock()
defer agent.lockLogicalPortsNo.Unlock()
for _, pNo := range portsNo {
@@ -1885,7 +1885,7 @@
}
}
-func (agent *LogicalDeviceAgent) addLogicalPortsToMap(lps []*voltha.LogicalPort) {
+func (agent *LogicalAgent) addLogicalPortsToMap(lps []*voltha.LogicalPort) {
agent.lockLogicalPortsNo.Lock()
defer agent.lockLogicalPortsNo.Unlock()
for _, lp := range lps {
@@ -1895,7 +1895,7 @@
}
}
-func (agent *LogicalDeviceAgent) isNNIPort(portNo uint32) bool {
+func (agent *LogicalAgent) isNNIPort(portNo uint32) bool {
agent.lockLogicalPortsNo.RLock()
defer agent.lockLogicalPortsNo.RUnlock()
if exist := agent.logicalPortsNo[portNo]; exist {
@@ -1904,7 +1904,7 @@
return false
}
-func (agent *LogicalDeviceAgent) getFirstNNIPort() (uint32, error) {
+func (agent *LogicalAgent) getFirstNNIPort() (uint32, error) {
agent.lockLogicalPortsNo.RLock()
defer agent.lockLogicalPortsNo.RUnlock()
for portNo, nni := range agent.logicalPortsNo {
@@ -1916,7 +1916,7 @@
}
//GetNNIPorts returns NNI ports.
-func (agent *LogicalDeviceAgent) GetNNIPorts() []uint32 {
+func (agent *LogicalAgent) GetNNIPorts() []uint32 {
agent.lockLogicalPortsNo.RLock()
defer agent.lockLogicalPortsNo.RUnlock()
nniPorts := make([]uint32, 0)
diff --git a/rw_core/core/logical_device_agent_test.go b/rw_core/core/device/logical_agent_test.go
similarity index 86%
rename from rw_core/core/logical_device_agent_test.go
rename to rw_core/core/device/logical_agent_test.go
index 70d809a..3c3b2b0 100644
--- a/rw_core/core/logical_device_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -13,10 +13,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package core
+package device
import (
"context"
+ "github.com/opencord/voltha-go/db/model"
+ "github.com/opencord/voltha-go/rw_core/core/adapter"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db"
"math/rand"
"sync"
"testing"
@@ -358,18 +361,20 @@
}
type LDATest struct {
- etcdServer *mock_etcd.EtcdServer
- core *Core
- kClient kafka.Client
- kvClientPort int
- oltAdapterName string
- onuAdapterName string
- coreInstanceID string
- defaultTimeout time.Duration
- maxTimeout time.Duration
- logicalDevice *voltha.LogicalDevice
- deviceIds []string
- done chan int
+ etcdServer *mock_etcd.EtcdServer
+ deviceMgr *Manager
+ kmp kafka.InterContainerProxy
+ logicalDeviceMgr *LogicalManager
+ kClient kafka.Client
+ kvClientPort int
+ oltAdapterName string
+ onuAdapterName string
+ coreInstanceID string
+ defaultTimeout time.Duration
+ maxTimeout time.Duration
+ logicalDevice *voltha.LogicalDevice
+ deviceIds []string
+ done chan int
}
func newLDATest() *LDATest {
@@ -447,7 +452,6 @@
}
func (lda *LDATest) startCore(inCompeteMode bool) {
- ctx := context.Background()
cfg := config.NewRWCoreFlags()
cfg.CorePairTopic = "rw_core"
cfg.DefaultRequestTimeout = lda.defaultTimeout
@@ -459,30 +463,59 @@
}
cfg.GrpcPort = grpcPort
cfg.GrpcHost = "127.0.0.1"
- setCoreCompeteMode(inCompeteMode)
client := setupKVClient(cfg, lda.coreInstanceID)
- lda.core = NewCore(ctx, lda.coreInstanceID, cfg, client, lda.kClient)
- err = lda.core.Start(ctx)
- if err != nil {
- logger.Fatal("Cannot start core")
+ backend := &db.Backend{
+ Client: client,
+ StoreType: cfg.KVStoreType,
+ Host: cfg.KVStoreHost,
+ Port: cfg.KVStorePort,
+ Timeout: cfg.KVStoreTimeout,
+ LivenessChannelInterval: cfg.LiveProbeInterval / 2,
+ PathPrefix: cfg.KVStoreDataPrefix}
+ lda.kmp = kafka.NewInterContainerProxy(
+ kafka.InterContainerHost(cfg.KafkaAdapterHost),
+ kafka.InterContainerPort(cfg.KafkaAdapterPort),
+ kafka.MsgClient(lda.kClient),
+ kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}),
+ kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: cfg.AffinityRouterTopic}))
+
+ endpointMgr := kafka.NewEndpointManager(backend)
+ proxy := model.NewProxy(backend, "/")
+ adapterMgr := adapter.NewAdapterManager(proxy, lda.coreInstanceID, lda.kClient)
+
+ lda.deviceMgr, lda.logicalDeviceMgr = NewDeviceManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg.CorePairTopic, lda.coreInstanceID, cfg.DefaultCoreTimeout)
+ lda.logicalDeviceMgr.SetEventCallbacks(fakeEventCallbacks{})
+ if err = lda.kmp.Start(); err != nil {
+ logger.Fatal("Cannot start InterContainerProxy")
}
+ if err = adapterMgr.Start(context.Background()); err != nil {
+ logger.Fatal("Cannot start adapterMgr")
+ }
+ lda.deviceMgr.Start(context.Background())
+ lda.logicalDeviceMgr.Start(context.Background())
}
func (lda *LDATest) stopAll() {
if lda.kClient != nil {
lda.kClient.Stop()
}
- if lda.core != nil {
- lda.core.Stop(context.Background())
+ if lda.logicalDeviceMgr != nil {
+ lda.logicalDeviceMgr.Stop(context.Background())
+ }
+ if lda.deviceMgr != nil {
+ lda.deviceMgr.Stop(context.Background())
+ }
+ if lda.kmp != nil {
+ lda.kmp.Stop()
}
if lda.etcdServer != nil {
stopEmbeddedEtcdServer(lda.etcdServer)
}
}
-func (lda *LDATest) createLogicalDeviceAgent(t *testing.T) *LogicalDeviceAgent {
- lDeviceMgr := lda.core.logicalDeviceMgr
- deviceMgr := lda.core.deviceMgr
+func (lda *LDATest) createLogicalDeviceAgent(t *testing.T) *LogicalAgent {
+ lDeviceMgr := lda.logicalDeviceMgr
+ deviceMgr := lda.deviceMgr
clonedLD := proto.Clone(lda.logicalDevice).(*voltha.LogicalDevice)
clonedLD.Id = com.GetRandomString(10)
clonedLD.DatapathId = rand.Uint64()
@@ -494,7 +527,7 @@
return lDeviceAgent
}
-func (lda *LDATest) updateLogicalDeviceConcurrently(t *testing.T, ldAgent *LogicalDeviceAgent, globalWG *sync.WaitGroup) {
+func (lda *LDATest) updateLogicalDeviceConcurrently(t *testing.T, ldAgent *LogicalAgent, globalWG *sync.WaitGroup) {
originalLogicalDevice, _ := ldAgent.GetLogicalDevice(context.Background())
assert.NotNil(t, originalLogicalDevice)
var localWG sync.WaitGroup
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/device/logical_manager.go
similarity index 75%
rename from rw_core/core/logical_device_manager.go
rename to rw_core/core/device/logical_manager.go
index 86e6b63..5005e0c 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/device/logical_manager.go
@@ -14,11 +14,12 @@
* limitations under the License.
*/
-package core
+package device
import (
"context"
"errors"
+ "github.com/opencord/voltha-go/rw_core/utils"
"strings"
"sync"
"time"
@@ -33,12 +34,11 @@
"google.golang.org/grpc/status"
)
-// LogicalDeviceManager represent logical device manager attributes
-type LogicalDeviceManager struct {
+// LogicalManager represent logical device manager attributes
+type LogicalManager struct {
logicalDeviceAgents sync.Map
- core *Core
- deviceMgr *DeviceManager
- grpcNbiHdlr *APIHandler
+ deviceMgr *Manager
+ eventCallbacks EventCallbacks
kafkaICProxy kafka.InterContainerProxy
clusterDataProxy *model.Proxy
exitChannel chan int
@@ -47,30 +47,22 @@
logicalDeviceLoadingInProgress map[string][]chan int
}
-func newLogicalDeviceManager(core *Core, deviceMgr *DeviceManager, kafkaICProxy kafka.InterContainerProxy, cdProxy *model.Proxy, timeout time.Duration) *LogicalDeviceManager {
- var logicalDeviceMgr LogicalDeviceManager
- logicalDeviceMgr.core = core
- logicalDeviceMgr.exitChannel = make(chan int, 1)
- logicalDeviceMgr.deviceMgr = deviceMgr
- logicalDeviceMgr.kafkaICProxy = kafkaICProxy
- logicalDeviceMgr.clusterDataProxy = cdProxy
- logicalDeviceMgr.defaultTimeout = timeout
- logicalDeviceMgr.logicalDevicesLoadingLock = sync.RWMutex{}
- logicalDeviceMgr.logicalDeviceLoadingInProgress = make(map[string][]chan int)
- return &logicalDeviceMgr
+type EventCallbacks interface {
+ SendChangeEvent(deviceID string, portStatus *openflow_13.OfpPortStatus)
+ SendPacketIn(deviceID string, transactionID string, packet *openflow_13.OfpPacketIn)
}
-func (ldMgr *LogicalDeviceManager) setGrpcNbiHandler(grpcNbiHandler *APIHandler) {
- ldMgr.grpcNbiHdlr = grpcNbiHandler
+func (ldMgr *LogicalManager) SetEventCallbacks(callbacks EventCallbacks) {
+ ldMgr.eventCallbacks = callbacks
}
-func (ldMgr *LogicalDeviceManager) start(ctx context.Context) {
+func (ldMgr *LogicalManager) Start(ctx context.Context) {
logger.Info("starting-logical-device-manager")
probe.UpdateStatusFromContext(ctx, "logical-device-manager", probe.ServiceStatusRunning)
logger.Info("logical-device-manager-started")
}
-func (ldMgr *LogicalDeviceManager) stop(ctx context.Context) {
+func (ldMgr *LogicalManager) Stop(ctx context.Context) {
logger.Info("stopping-logical-device-manager")
ldMgr.exitChannel <- 1
probe.UpdateStatusFromContext(ctx, "logical-device-manager", probe.ServiceStatusStopped)
@@ -89,7 +81,7 @@
}
}
-func (ldMgr *LogicalDeviceManager) addLogicalDeviceAgentToMap(agent *LogicalDeviceAgent) {
+func (ldMgr *LogicalManager) addLogicalDeviceAgentToMap(agent *LogicalAgent) {
if _, exist := ldMgr.logicalDeviceAgents.Load(agent.logicalDeviceID); !exist {
ldMgr.logicalDeviceAgents.Store(agent.logicalDeviceID, agent)
}
@@ -97,11 +89,11 @@
// getLogicalDeviceAgent returns the logical device agent. If the device is not in memory then the device will
// be loaded from dB and a logical device agent created to managed it.
-func (ldMgr *LogicalDeviceManager) getLogicalDeviceAgent(ctx context.Context, logicalDeviceID string) *LogicalDeviceAgent {
+func (ldMgr *LogicalManager) getLogicalDeviceAgent(ctx context.Context, logicalDeviceID string) *LogicalAgent {
logger.Debugw("get-logical-device-agent", log.Fields{"logical-device-id": logicalDeviceID})
agent, ok := ldMgr.logicalDeviceAgents.Load(logicalDeviceID)
if ok {
- lda := agent.(*LogicalDeviceAgent)
+ lda := agent.(*LogicalAgent)
if lda.logicalDevice == nil {
// This can happen when an agent for the logical device has been created but the logical device
// itself is not ready for action as it is waiting for switch and port capabilities from the
@@ -114,19 +106,19 @@
// Try to load into memory - loading will also create the logical device agent
if err := ldMgr.load(ctx, logicalDeviceID); err == nil {
if agent, ok = ldMgr.logicalDeviceAgents.Load(logicalDeviceID); ok {
- return agent.(*LogicalDeviceAgent)
+ return agent.(*LogicalAgent)
}
}
return nil
}
-func (ldMgr *LogicalDeviceManager) deleteLogicalDeviceAgent(logicalDeviceID string) {
+func (ldMgr *LogicalManager) deleteLogicalDeviceAgent(logicalDeviceID string) {
ldMgr.logicalDeviceAgents.Delete(logicalDeviceID)
}
// GetLogicalDevice provides a cloned most up to date logical device. If device is not in memory
// it will be fetched from the dB
-func (ldMgr *LogicalDeviceManager) getLogicalDevice(ctx context.Context, id string) (*voltha.LogicalDevice, error) {
+func (ldMgr *LogicalManager) GetLogicalDevice(ctx context.Context, id string) (*voltha.LogicalDevice, error) {
logger.Debugw("getlogicalDevice", log.Fields{"logicaldeviceid": id})
if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
return agent.GetLogicalDevice(ctx)
@@ -134,8 +126,8 @@
return nil, status.Errorf(codes.NotFound, "%s", id)
}
-//listLogicalDevices returns the list of all logical devices
-func (ldMgr *LogicalDeviceManager) listLogicalDevices(ctx context.Context) (*voltha.LogicalDevices, error) {
+//ListLogicalDevices returns the list of all logical devices
+func (ldMgr *LogicalManager) ListLogicalDevices(ctx context.Context) (*voltha.LogicalDevices, error) {
logger.Debug("ListAllLogicalDevices")
var logicalDevices []*voltha.LogicalDevice
@@ -146,7 +138,7 @@
return &voltha.LogicalDevices{Items: logicalDevices}, nil
}
-func (ldMgr *LogicalDeviceManager) createLogicalDevice(ctx context.Context, device *voltha.Device) (*string, error) {
+func (ldMgr *LogicalManager) createLogicalDevice(ctx context.Context, device *voltha.Device) (*string, error) {
logger.Debugw("creating-logical-device", log.Fields{"deviceId": device.Id})
// Sanity check
if !device.Root {
@@ -157,7 +149,7 @@
// For now use the serial number - it may contain any combination of alphabetic characters and numbers,
// with length varying from eight characters to a maximum of 14 characters. Mac Address is part of oneof
// in the Device model. May need to be moved out.
- id := CreateLogicalDeviceID()
+ id := utils.CreateLogicalDeviceID()
sn := strings.Replace(device.MacAddress, ":", "", -1)
if id == "" {
logger.Errorw("mac-address-not-set", log.Fields{"deviceId": device.Id, "serial-number": sn})
@@ -191,12 +183,12 @@
// stopManagingLogicalDeviceWithDeviceId stops the management of the logical device. This implies removal of any
// reference of this logical device in cache. The device Id is passed as param because the logical device may already
// have been removed from the model. This function returns the logical device Id if found
-func (ldMgr *LogicalDeviceManager) stopManagingLogicalDeviceWithDeviceID(ctx context.Context, id string) string {
+func (ldMgr *LogicalManager) stopManagingLogicalDeviceWithDeviceID(ctx context.Context, id string) string {
logger.Infow("stop-managing-logical-device", log.Fields{"deviceId": id})
// Go over the list of logical device agents to find the one which has rootDeviceId as id
var ldID = ""
ldMgr.logicalDeviceAgents.Range(func(key, value interface{}) bool {
- ldAgent := value.(*LogicalDeviceAgent)
+ ldAgent := value.(*LogicalAgent)
if ldAgent.rootDeviceID == id {
logger.Infow("stopping-logical-device-agent", log.Fields{"lDeviceId": key})
if err := ldAgent.stop(ctx); err != nil {
@@ -212,7 +204,7 @@
}
//getLogicalDeviceFromModel retrieves the logical device data from the model.
-func (ldMgr *LogicalDeviceManager) getLogicalDeviceFromModel(ctx context.Context, lDeviceID string) (*voltha.LogicalDevice, error) {
+func (ldMgr *LogicalManager) getLogicalDeviceFromModel(ctx context.Context, lDeviceID string) (*voltha.LogicalDevice, error) {
logicalDevice := &voltha.LogicalDevice{}
if have, err := ldMgr.clusterDataProxy.Get(ctx, "logical_devices/"+lDeviceID, logicalDevice); err != nil {
logger.Errorw("failed-to-get-logical-devices-from-cluster-proxy", log.Fields{"error": err})
@@ -225,7 +217,7 @@
}
// load loads a logical device manager in memory
-func (ldMgr *LogicalDeviceManager) load(ctx context.Context, lDeviceID string) error {
+func (ldMgr *LogicalManager) load(ctx context.Context, lDeviceID string) error {
if lDeviceID == "" {
return nil
}
@@ -270,7 +262,7 @@
return status.Errorf(codes.Aborted, "Error loading logical device %s", lDeviceID)
}
-func (ldMgr *LogicalDeviceManager) deleteLogicalDevice(ctx context.Context, device *voltha.Device) error {
+func (ldMgr *LogicalManager) deleteLogicalDevice(ctx context.Context, device *voltha.Device) error {
logger.Debugw("deleting-logical-device", log.Fields{"deviceId": device.Id})
// Sanity check
if !device.Root {
@@ -291,7 +283,7 @@
return nil
}
-func (ldMgr *LogicalDeviceManager) getLogicalDeviceID(ctx context.Context, device *voltha.Device) (*string, error) {
+func (ldMgr *LogicalManager) getLogicalDeviceID(ctx context.Context, device *voltha.Device) (*string, error) {
// Device can either be a parent or a child device
if device.Root {
// Parent device. The ID of a parent device is the logical device ID
@@ -305,7 +297,7 @@
return nil, status.Errorf(codes.NotFound, "%s", device.Id)
}
-func (ldMgr *LogicalDeviceManager) getLogicalDeviceIDFromDeviceID(ctx context.Context, deviceID string) (*string, error) {
+func (ldMgr *LogicalManager) getLogicalDeviceIDFromDeviceID(ctx context.Context, deviceID string) (*string, error) {
// Get the device
var device *voltha.Device
var err error
@@ -315,7 +307,7 @@
return ldMgr.getLogicalDeviceID(ctx, device)
}
-func (ldMgr *LogicalDeviceManager) getLogicalPortID(ctx context.Context, device *voltha.Device) (*voltha.LogicalPortId, error) {
+func (ldMgr *LogicalManager) getLogicalPortID(ctx context.Context, device *voltha.Device) (*voltha.LogicalPortId, error) {
// Get the logical device where this device is attached
var lDeviceID *string
var err error
@@ -323,7 +315,7 @@
return nil, err
}
var lDevice *voltha.LogicalDevice
- if lDevice, err = ldMgr.getLogicalDevice(ctx, *lDeviceID); err != nil {
+ if lDevice, err = ldMgr.GetLogicalDevice(ctx, *lDeviceID); err != nil {
return nil, err
}
// Go over list of ports
@@ -336,7 +328,7 @@
}
// ListLogicalDeviceFlows returns the flows of logical device
-func (ldMgr *LogicalDeviceManager) ListLogicalDeviceFlows(ctx context.Context, id string) (*openflow_13.Flows, error) {
+func (ldMgr *LogicalManager) ListLogicalDeviceFlows(ctx context.Context, id string) (*openflow_13.Flows, error) {
logger.Debugw("ListLogicalDeviceFlows", log.Fields{"logicaldeviceid": id})
if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
return agent.ListLogicalDeviceFlows(ctx)
@@ -345,7 +337,7 @@
}
// ListLogicalDeviceFlowGroups returns logical device flow groups
-func (ldMgr *LogicalDeviceManager) ListLogicalDeviceFlowGroups(ctx context.Context, id string) (*openflow_13.FlowGroups, error) {
+func (ldMgr *LogicalManager) ListLogicalDeviceFlowGroups(ctx context.Context, id string) (*openflow_13.FlowGroups, error) {
logger.Debugw("ListLogicalDeviceFlowGroups", log.Fields{"logicaldeviceid": id})
if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
return agent.ListLogicalDeviceFlowGroups(ctx)
@@ -354,7 +346,7 @@
}
// ListLogicalDevicePorts returns logical device ports
-func (ldMgr *LogicalDeviceManager) ListLogicalDevicePorts(ctx context.Context, id string) (*voltha.LogicalPorts, error) {
+func (ldMgr *LogicalManager) ListLogicalDevicePorts(ctx context.Context, id string) (*voltha.LogicalPorts, error) {
logger.Debugw("ListLogicalDevicePorts", log.Fields{"logicaldeviceid": id})
if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
return agent.ListLogicalDevicePorts(ctx)
@@ -362,11 +354,11 @@
return nil, status.Errorf(codes.NotFound, "%s", id)
}
-func (ldMgr *LogicalDeviceManager) getLogicalPort(ctx context.Context, lPortID *voltha.LogicalPortId) (*voltha.LogicalPort, error) {
+func (ldMgr *LogicalManager) GetLogicalPort(ctx context.Context, lPortID *voltha.LogicalPortId) (*voltha.LogicalPort, error) {
// Get the logical device where this device is attached
var err error
var lDevice *voltha.LogicalDevice
- if lDevice, err = ldMgr.getLogicalDevice(ctx, lPortID.Id); err != nil {
+ if lDevice, err = ldMgr.GetLogicalDevice(ctx, lPortID.Id); err != nil {
return nil, err
}
// Go over list of ports
@@ -380,7 +372,7 @@
// updateLogicalPort sets up a logical port on the logical device based on the device port
// information, if needed
-func (ldMgr *LogicalDeviceManager) updateLogicalPort(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
+func (ldMgr *LogicalManager) updateLogicalPort(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
ldID, err := ldMgr.getLogicalDeviceID(ctx, device)
if err != nil || *ldID == "" {
// This is not an error as the logical device may not have been created at this time. In such a case,
@@ -396,12 +388,12 @@
}
// deleteLogicalPort removes the logical port associated with a device
-func (ldMgr *LogicalDeviceManager) deleteLogicalPort(ctx context.Context, lPortID *voltha.LogicalPortId) error {
+func (ldMgr *LogicalManager) deleteLogicalPort(ctx context.Context, lPortID *voltha.LogicalPortId) error {
logger.Debugw("deleting-logical-port", log.Fields{"LDeviceId": lPortID.Id})
// Get logical port
var logicalPort *voltha.LogicalPort
var err error
- if logicalPort, err = ldMgr.getLogicalPort(ctx, lPortID); err != nil {
+ if logicalPort, err = ldMgr.GetLogicalPort(ctx, lPortID); err != nil {
logger.Debugw("no-logical-device-port-present", log.Fields{"logicalPortId": lPortID.PortId})
return err
}
@@ -420,7 +412,7 @@
}
// deleteLogicalPort removes the logical port associated with a child device
-func (ldMgr *LogicalDeviceManager) deleteLogicalPorts(ctx context.Context, deviceID string) error {
+func (ldMgr *LogicalManager) deleteLogicalPorts(ctx context.Context, deviceID string) error {
logger.Debugw("deleting-logical-ports", log.Fields{"device-id": deviceID})
// Get logical port
ldID, err := ldMgr.getLogicalDeviceIDFromDeviceID(ctx, deviceID)
@@ -437,7 +429,7 @@
return nil
}
-func (ldMgr *LogicalDeviceManager) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device) error {
+func (ldMgr *LogicalManager) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device) error {
logger.Debugw("setupUNILogicalPorts", log.Fields{"childDeviceId": childDevice.Id, "parentDeviceId": childDevice.ParentId, "current-data": childDevice})
// Sanity check
if childDevice.Root {
@@ -462,7 +454,7 @@
return nil
}
-func (ldMgr *LogicalDeviceManager) deleteAllLogicalPorts(ctx context.Context, device *voltha.Device) error {
+func (ldMgr *LogicalManager) deleteAllLogicalPorts(ctx context.Context, device *voltha.Device) error {
logger.Debugw("deleteAllLogicalPorts", log.Fields{"deviceId": device.Id})
var ldID *string
@@ -480,7 +472,7 @@
return nil
}
-func (ldMgr *LogicalDeviceManager) deleteAllUNILogicalPorts(ctx context.Context, parentDevice *voltha.Device) error {
+func (ldMgr *LogicalManager) deleteAllUNILogicalPorts(ctx context.Context, parentDevice *voltha.Device) error {
logger.Debugw("delete-all-uni-logical-ports", log.Fields{"parent-device-id": parentDevice.Id})
var ldID *string
@@ -497,7 +489,7 @@
return nil
}
-func (ldMgr *LogicalDeviceManager) updatePortState(ctx context.Context, deviceID string, portNo uint32, state voltha.OperStatus_Types) error {
+func (ldMgr *LogicalManager) updatePortState(ctx context.Context, deviceID string, portNo uint32, state voltha.OperStatus_Types) error {
logger.Debugw("updatePortState", log.Fields{"deviceId": deviceID, "state": state, "portNo": portNo})
var ldID *string
@@ -515,7 +507,7 @@
return nil
}
-func (ldMgr *LogicalDeviceManager) updatePortsState(ctx context.Context, device *voltha.Device, state voltha.OperStatus_Types) error {
+func (ldMgr *LogicalManager) updatePortsState(ctx context.Context, device *voltha.Device, state voltha.OperStatus_Types) error {
logger.Debugw("updatePortsState", log.Fields{"deviceId": device.Id, "state": state, "current-data": device})
var ldID *string
@@ -533,24 +525,24 @@
return nil
}
-func (ldMgr *LogicalDeviceManager) updateFlowTable(ctx context.Context, id string, flow *openflow_13.OfpFlowMod, ch chan interface{}) {
- logger.Debugw("updateFlowTable", log.Fields{"logicalDeviceId": id})
+func (ldMgr *LogicalManager) UpdateFlowTable(ctx context.Context, id string, flow *openflow_13.OfpFlowMod, ch chan interface{}) {
+ logger.Debugw("UpdateFlowTable", log.Fields{"logicalDeviceId": id})
var res interface{}
if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
res = agent.updateFlowTable(ctx, flow)
- logger.Debugw("updateFlowTable-result", log.Fields{"result": res})
+ logger.Debugw("UpdateFlowTable-result", log.Fields{"result": res})
} else {
res = status.Errorf(codes.NotFound, "%s", id)
}
sendAPIResponse(ctx, ch, res)
}
-func (ldMgr *LogicalDeviceManager) updateMeterTable(ctx context.Context, id string, meter *openflow_13.OfpMeterMod, ch chan interface{}) {
- logger.Debugw("updateMeterTable", log.Fields{"logicalDeviceId": id})
+func (ldMgr *LogicalManager) UpdateMeterTable(ctx context.Context, id string, meter *openflow_13.OfpMeterMod, ch chan interface{}) {
+ logger.Debugw("UpdateMeterTable", log.Fields{"logicalDeviceId": id})
var res interface{}
if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
res = agent.updateMeterTable(ctx, meter)
- logger.Debugw("updateMeterTable-result", log.Fields{"result": res})
+ logger.Debugw("UpdateMeterTable-result", log.Fields{"result": res})
} else {
res = status.Errorf(codes.NotFound, "%s", id)
}
@@ -558,50 +550,50 @@
}
// ListLogicalDeviceMeters returns logical device meters
-func (ldMgr *LogicalDeviceManager) ListLogicalDeviceMeters(ctx context.Context, id string) (*openflow_13.Meters, error) {
+func (ldMgr *LogicalManager) ListLogicalDeviceMeters(ctx context.Context, id string) (*openflow_13.Meters, error) {
logger.Debugw("ListLogicalDeviceMeters", log.Fields{"logicalDeviceId": id})
if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
return agent.ListLogicalDeviceMeters(ctx)
}
return nil, status.Errorf(codes.NotFound, "%s", id)
}
-func (ldMgr *LogicalDeviceManager) updateGroupTable(ctx context.Context, id string, groupMod *openflow_13.OfpGroupMod, ch chan interface{}) {
- logger.Debugw("updateGroupTable", log.Fields{"logicalDeviceId": id})
+func (ldMgr *LogicalManager) UpdateGroupTable(ctx context.Context, id string, groupMod *openflow_13.OfpGroupMod, ch chan interface{}) {
+ logger.Debugw("UpdateGroupTable", log.Fields{"logicalDeviceId": id})
var res interface{}
if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
res = agent.updateGroupTable(ctx, groupMod)
- logger.Debugw("updateGroupTable-result", log.Fields{"result": res})
+ logger.Debugw("UpdateGroupTable-result", log.Fields{"result": res})
} else {
res = status.Errorf(codes.NotFound, "%s", id)
}
sendAPIResponse(ctx, ch, res)
}
-func (ldMgr *LogicalDeviceManager) enableLogicalPort(ctx context.Context, id *voltha.LogicalPortId, ch chan interface{}) {
- logger.Debugw("enableLogicalPort", log.Fields{"logicalDeviceId": id})
+func (ldMgr *LogicalManager) EnableLogicalPort(ctx context.Context, id *voltha.LogicalPortId, ch chan interface{}) {
+ logger.Debugw("EnableLogicalPort", log.Fields{"logicalDeviceId": id})
var res interface{}
if agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id); agent != nil {
res = agent.enableLogicalPort(ctx, id.PortId)
- logger.Debugw("enableLogicalPort-result", log.Fields{"result": res})
+ logger.Debugw("EnableLogicalPort-result", log.Fields{"result": res})
} else {
res = status.Errorf(codes.NotFound, "%s", id.Id)
}
sendAPIResponse(ctx, ch, res)
}
-func (ldMgr *LogicalDeviceManager) disableLogicalPort(ctx context.Context, id *voltha.LogicalPortId, ch chan interface{}) {
- logger.Debugw("disableLogicalPort", log.Fields{"logicalDeviceId": id})
+func (ldMgr *LogicalManager) DisableLogicalPort(ctx context.Context, id *voltha.LogicalPortId, ch chan interface{}) {
+ logger.Debugw("DisableLogicalPort", log.Fields{"logicalDeviceId": id})
var res interface{}
if agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id); agent != nil {
res = agent.disableLogicalPort(ctx, id.PortId)
- logger.Debugw("disableLogicalPort-result", log.Fields{"result": res})
+ logger.Debugw("DisableLogicalPort-result", log.Fields{"result": res})
} else {
res = status.Errorf(codes.NotFound, "%s", id.Id)
}
sendAPIResponse(ctx, ch, res)
}
-func (ldMgr *LogicalDeviceManager) packetIn(ctx context.Context, logicalDeviceID string, port uint32, transactionID string, packet []byte) error {
+func (ldMgr *LogicalManager) packetIn(ctx context.Context, logicalDeviceID string, port uint32, transactionID string, packet []byte) error {
logger.Debugw("packetIn", log.Fields{"logicalDeviceId": logicalDeviceID, "port": port})
if agent := ldMgr.getLogicalDeviceAgent(ctx, logicalDeviceID); agent != nil {
agent.packetIn(port, transactionID, packet)
@@ -610,3 +602,11 @@
}
return nil
}
+
+func (ldMgr *LogicalManager) PacketOut(ctx context.Context, packet *openflow_13.PacketOut) {
+ if agent := ldMgr.getLogicalDeviceAgent(ctx, packet.Id); agent != nil {
+ agent.packetOut(ctx, packet.PacketOut)
+ } else {
+ logger.Errorf("No logical device agent present", log.Fields{"logicalDeviceID": packet.Id})
+ }
+}
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device/manager.go
similarity index 77%
rename from rw_core/core/device_manager.go
rename to rw_core/core/device/manager.go
index 8f9847b..48edc5b 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device/manager.go
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package core
+package device
import (
"context"
@@ -25,6 +25,8 @@
"time"
"github.com/opencord/voltha-go/db/model"
+ "github.com/opencord/voltha-go/rw_core/core/adapter"
+ "github.com/opencord/voltha-go/rw_core/core/device/remote"
"github.com/opencord/voltha-go/rw_core/utils"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
@@ -36,15 +38,14 @@
"google.golang.org/grpc/status"
)
-// DeviceManager represent device manager attributes
-type DeviceManager struct {
+// Manager represent device manager attributes
+type Manager struct {
deviceAgents sync.Map
rootDevices map[string]bool
lockRootDeviceMap sync.RWMutex
- core *Core
- adapterProxy *AdapterProxy
- adapterMgr *AdapterManager
- logicalDeviceMgr *LogicalDeviceManager
+ adapterProxy *remote.AdapterProxy
+ adapterMgr *adapter.Manager
+ logicalDeviceMgr *LogicalManager
kafkaICProxy kafka.InterContainerProxy
stateTransitions *TransitionMap
clusterDataProxy *model.Proxy
@@ -55,35 +56,41 @@
deviceLoadingInProgress map[string][]chan int
}
-func newDeviceManager(core *Core) *DeviceManager {
+func NewDeviceManagers(proxy *model.Proxy, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, corePairTopic, coreInstanceID string, defaultCoreTimeout time.Duration) (*Manager, *LogicalManager) {
+ deviceMgr := &Manager{
+ exitChannel: make(chan int, 1),
+ rootDevices: make(map[string]bool),
+ kafkaICProxy: kmp,
+ adapterProxy: remote.NewAdapterProxy(kmp, corePairTopic, endpointMgr),
+ coreInstanceID: coreInstanceID,
+ clusterDataProxy: proxy,
+ adapterMgr: adapterMgr,
+ defaultTimeout: defaultCoreTimeout * time.Millisecond,
+ deviceLoadingInProgress: make(map[string][]chan int),
+ }
+ logicalDeviceMgr := &LogicalManager{
+ exitChannel: make(chan int, 1),
+ deviceMgr: deviceMgr,
+ kafkaICProxy: kmp,
+ clusterDataProxy: proxy,
+ defaultTimeout: defaultCoreTimeout,
+ logicalDeviceLoadingInProgress: make(map[string][]chan int),
+ }
+ deviceMgr.logicalDeviceMgr = logicalDeviceMgr
- endpointManager := kafka.NewEndpointManager(&core.backend)
+ adapterMgr.SetAdapterRestartedCallback(deviceMgr.adapterRestarted)
- var deviceMgr DeviceManager
- deviceMgr.core = core
- deviceMgr.exitChannel = make(chan int, 1)
- deviceMgr.rootDevices = make(map[string]bool)
- deviceMgr.kafkaICProxy = core.kmp
- deviceMgr.adapterProxy = NewAdapterProxy(core.kmp, core.config.CorePairTopic, endpointManager)
- deviceMgr.coreInstanceID = core.instanceID
- deviceMgr.clusterDataProxy = core.clusterDataProxy
- deviceMgr.adapterMgr = core.adapterMgr
- deviceMgr.lockRootDeviceMap = sync.RWMutex{}
- deviceMgr.defaultTimeout = time.Duration(core.config.DefaultCoreTimeout) * time.Millisecond
- deviceMgr.devicesLoadingLock = sync.RWMutex{}
- deviceMgr.deviceLoadingInProgress = make(map[string][]chan int)
- return &deviceMgr
+ return deviceMgr, logicalDeviceMgr
}
-func (dMgr *DeviceManager) start(ctx context.Context, logicalDeviceMgr *LogicalDeviceManager) {
+func (dMgr *Manager) Start(ctx context.Context) {
logger.Info("starting-device-manager")
- dMgr.logicalDeviceMgr = logicalDeviceMgr
dMgr.stateTransitions = NewTransitionMap(dMgr)
probe.UpdateStatusFromContext(ctx, "device-manager", probe.ServiceStatusRunning)
logger.Info("device-manager-started")
}
-func (dMgr *DeviceManager) stop(ctx context.Context) {
+func (dMgr *Manager) Stop(ctx context.Context) {
logger.Info("stopping-device-manager")
dMgr.exitChannel <- 1
probe.UpdateStatusFromContext(ctx, "device-manager", probe.ServiceStatusStopped)
@@ -102,7 +109,7 @@
}
}
-func (dMgr *DeviceManager) addDeviceAgentToMap(agent *DeviceAgent) {
+func (dMgr *Manager) addDeviceAgentToMap(agent *Agent) {
if _, exist := dMgr.deviceAgents.Load(agent.deviceID); !exist {
dMgr.deviceAgents.Store(agent.deviceID, agent)
}
@@ -112,7 +119,7 @@
}
-func (dMgr *DeviceManager) deleteDeviceAgentFromMap(agent *DeviceAgent) {
+func (dMgr *Manager) deleteDeviceAgentFromMap(agent *Agent) {
dMgr.deviceAgents.Delete(agent.deviceID)
dMgr.lockRootDeviceMap.Lock()
defer dMgr.lockRootDeviceMap.Unlock()
@@ -120,10 +127,10 @@
}
// getDeviceAgent returns the agent managing the device. If the device is not in memory, it will loads it, if it exists
-func (dMgr *DeviceManager) getDeviceAgent(ctx context.Context, deviceID string) *DeviceAgent {
+func (dMgr *Manager) getDeviceAgent(ctx context.Context, deviceID string) *Agent {
agent, ok := dMgr.deviceAgents.Load(deviceID)
if ok {
- return agent.(*DeviceAgent)
+ return agent.(*Agent)
}
// Try to load into memory - loading will also create the device agent and set the device ownership
err := dMgr.load(ctx, deviceID)
@@ -132,7 +139,7 @@
if !ok {
return nil
}
- return agent.(*DeviceAgent)
+ return agent.(*Agent)
}
//TODO: Change the return params to return an error as well
logger.Errorw("loading-device-failed", log.Fields{"deviceId": deviceID, "error": err})
@@ -140,7 +147,7 @@
}
// listDeviceIdsFromMap returns the list of device IDs that are in memory
-func (dMgr *DeviceManager) listDeviceIdsFromMap() *voltha.IDs {
+func (dMgr *Manager) listDeviceIdsFromMap() *voltha.IDs {
result := &voltha.IDs{Items: make([]*voltha.ID, 0)}
dMgr.deviceAgents.Range(func(key, value interface{}) bool {
@@ -151,7 +158,7 @@
return result
}
-func (dMgr *DeviceManager) createDevice(ctx context.Context, device *voltha.Device, ch chan interface{}) {
+func (dMgr *Manager) CreateDevice(ctx context.Context, device *voltha.Device, ch chan interface{}) {
deviceExist, err := dMgr.isParentDeviceExist(ctx, device)
if err != nil {
logger.Errorf("Failed to fetch parent device info")
@@ -163,12 +170,12 @@
sendResponse(ctx, ch, errors.New("Device is already pre-provisioned"))
return
}
- logger.Debugw("createDevice", log.Fields{"device": device, "aproxy": dMgr.adapterProxy})
+ logger.Debugw("CreateDevice", log.Fields{"device": device, "aproxy": dMgr.adapterProxy})
// Ensure this device is set as root
device.Root = true
// Create and start a device agent for that device
- agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+ agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
device, err = agent.start(ctx, device)
if err != nil {
logger.Errorw("Fail-to-start-device", log.Fields{"device-id": agent.deviceID, "error": err})
@@ -180,8 +187,8 @@
sendResponse(ctx, ch, device)
}
-func (dMgr *DeviceManager) enableDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
- logger.Debugw("enableDevice", log.Fields{"deviceid": id})
+func (dMgr *Manager) EnableDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
+ logger.Debugw("EnableDevice", log.Fields{"deviceid": id})
var res interface{}
if agent := dMgr.getDeviceAgent(ctx, id.Id); agent != nil {
res = agent.enableDevice(ctx)
@@ -192,12 +199,12 @@
sendResponse(ctx, ch, res)
}
-func (dMgr *DeviceManager) disableDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
- logger.Debugw("disableDevice", log.Fields{"deviceid": id})
+func (dMgr *Manager) DisableDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
+ logger.Debugw("DisableDevice", log.Fields{"deviceid": id})
var res interface{}
if agent := dMgr.getDeviceAgent(ctx, id.Id); agent != nil {
res = agent.disableDevice(ctx)
- logger.Debugw("disableDevice-result", log.Fields{"result": res})
+ logger.Debugw("DisableDevice-result", log.Fields{"result": res})
} else {
res = status.Errorf(codes.NotFound, "%s", id.Id)
}
@@ -205,24 +212,24 @@
sendResponse(ctx, ch, res)
}
-func (dMgr *DeviceManager) rebootDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
- logger.Debugw("rebootDevice", log.Fields{"deviceid": id})
+func (dMgr *Manager) RebootDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
+ logger.Debugw("RebootDevice", log.Fields{"deviceid": id})
var res interface{}
if agent := dMgr.getDeviceAgent(ctx, id.Id); agent != nil {
res = agent.rebootDevice(ctx)
- logger.Debugw("rebootDevice-result", log.Fields{"result": res})
+ logger.Debugw("RebootDevice-result", log.Fields{"result": res})
} else {
res = status.Errorf(codes.NotFound, "%s", id.Id)
}
sendResponse(ctx, ch, res)
}
-func (dMgr *DeviceManager) deleteDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
- logger.Debugw("deleteDevice", log.Fields{"deviceid": id})
+func (dMgr *Manager) DeleteDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
+ logger.Debugw("DeleteDevice", log.Fields{"deviceid": id})
var res interface{}
if agent := dMgr.getDeviceAgent(ctx, id.Id); agent != nil {
res = agent.deleteDevice(ctx)
- logger.Debugw("deleteDevice-result", log.Fields{"result": res})
+ logger.Debugw("DeleteDevice-result", log.Fields{"result": res})
} else {
res = status.Errorf(codes.NotFound, "%s", id.Id)
}
@@ -232,7 +239,7 @@
// stopManagingDevice stops the management of the device as well as any of its reference device and logical device.
// This function is called only in the Core that does not own this device. In the Core that owns this device then a
// deletion deletion also includes removal of any reference of this device.
-func (dMgr *DeviceManager) stopManagingDevice(ctx context.Context, id string) {
+func (dMgr *Manager) stopManagingDevice(ctx context.Context, id string) {
logger.Infow("stopManagingDevice", log.Fields{"deviceId": id})
if dMgr.IsDeviceInCache(id) { // Proceed only if an agent is present for this device
if root, _ := dMgr.IsRootDevice(id); root {
@@ -249,14 +256,14 @@
}
// RunPostDeviceDelete removes any reference of this device
-func (dMgr *DeviceManager) RunPostDeviceDelete(ctx context.Context, cDevice *voltha.Device) error {
+func (dMgr *Manager) RunPostDeviceDelete(ctx context.Context, cDevice *voltha.Device) error {
logger.Infow("RunPostDeviceDelete", log.Fields{"deviceId": cDevice.Id})
dMgr.stopManagingDevice(ctx, cDevice.Id)
return nil
}
// GetDevice will returns a device, either from memory or from the dB, if present
-func (dMgr *DeviceManager) GetDevice(ctx context.Context, id string) (*voltha.Device, error) {
+func (dMgr *Manager) GetDevice(ctx context.Context, id string) (*voltha.Device, error) {
logger.Debugw("GetDevice", log.Fields{"deviceid": id})
if agent := dMgr.getDeviceAgent(ctx, id); agent != nil {
return agent.getDevice(ctx)
@@ -265,7 +272,7 @@
}
// GetChildDevice will return a device, either from memory or from the dB, if present
-func (dMgr *DeviceManager) GetChildDevice(ctx context.Context, parentDeviceID string, serialNumber string, onuID int64, parentPortNo int64) (*voltha.Device, error) {
+func (dMgr *Manager) GetChildDevice(ctx context.Context, parentDeviceID string, serialNumber string, onuID int64, parentPortNo int64) (*voltha.Device, error) {
logger.Debugw("GetChildDevice", log.Fields{"parentDeviceid": parentDeviceID, "serialNumber": serialNumber,
"parentPortNo": parentPortNo, "onuId": onuID})
@@ -328,7 +335,7 @@
}
// GetChildDeviceWithProxyAddress will return a device based on proxy address
-func (dMgr *DeviceManager) GetChildDeviceWithProxyAddress(ctx context.Context, proxyAddress *voltha.Device_ProxyAddress) (*voltha.Device, error) {
+func (dMgr *Manager) GetChildDeviceWithProxyAddress(ctx context.Context, proxyAddress *voltha.Device_ProxyAddress) (*voltha.Device, error) {
logger.Debugw("GetChildDeviceWithProxyAddress", log.Fields{"proxyAddress": proxyAddress})
var parentDevice *voltha.Device
@@ -365,13 +372,13 @@
}
// IsDeviceInCache returns true if device is found in the map
-func (dMgr *DeviceManager) IsDeviceInCache(id string) bool {
+func (dMgr *Manager) IsDeviceInCache(id string) bool {
_, exist := dMgr.deviceAgents.Load(id)
return exist
}
// IsRootDevice returns true if root device is found in the map
-func (dMgr *DeviceManager) IsRootDevice(id string) (bool, error) {
+func (dMgr *Manager) IsRootDevice(id string) (bool, error) {
dMgr.lockRootDeviceMap.RLock()
defer dMgr.lockRootDeviceMap.RUnlock()
if exist := dMgr.rootDevices[id]; exist {
@@ -381,7 +388,7 @@
}
// ListDevices retrieves the latest devices from the data model
-func (dMgr *DeviceManager) ListDevices(ctx context.Context) (*voltha.Devices, error) {
+func (dMgr *Manager) ListDevices(ctx context.Context) (*voltha.Devices, error) {
logger.Debug("ListDevices")
result := &voltha.Devices{}
@@ -395,7 +402,7 @@
// If device is not in memory then set it up
if !dMgr.IsDeviceInCache(device.Id) {
logger.Debugw("loading-device-from-Model", log.Fields{"id": device.Id})
- agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+ agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
if _, err := agent.start(ctx, nil); err != nil {
logger.Warnw("failure-starting-agent", log.Fields{"deviceId": device.Id})
} else {
@@ -409,7 +416,7 @@
}
//isParentDeviceExist checks whether device is already preprovisioned.
-func (dMgr *DeviceManager) isParentDeviceExist(ctx context.Context, newDevice *voltha.Device) (bool, error) {
+func (dMgr *Manager) isParentDeviceExist(ctx context.Context, newDevice *voltha.Device) (bool, error) {
hostPort := newDevice.GetHostAndPort()
var devices []*voltha.Device
if err := dMgr.clusterDataProxy.List(ctx, "devices", &devices); err != nil {
@@ -431,7 +438,7 @@
}
//getDeviceFromModelretrieves the device data from the model.
-func (dMgr *DeviceManager) getDeviceFromModel(ctx context.Context, deviceID string) (*voltha.Device, error) {
+func (dMgr *Manager) getDeviceFromModel(ctx context.Context, deviceID string) (*voltha.Device, error) {
device := &voltha.Device{}
if have, err := dMgr.clusterDataProxy.Get(ctx, "devices/"+deviceID, device); err != nil {
logger.Errorw("failed-to-get-device-info-from-cluster-proxy", log.Fields{"error": err})
@@ -444,7 +451,7 @@
}
// loadDevice loads the deviceID in memory, if not present
-func (dMgr *DeviceManager) loadDevice(ctx context.Context, deviceID string) (*DeviceAgent, error) {
+func (dMgr *Manager) loadDevice(ctx context.Context, deviceID string) (*Agent, error) {
if deviceID == "" {
return nil, status.Error(codes.InvalidArgument, "deviceId empty")
}
@@ -458,7 +465,7 @@
// Proceed with the loading only if the device exist in the Model (could have been deleted)
if device, err = dMgr.getDeviceFromModel(ctx, deviceID); err == nil {
logger.Debugw("loading-device", log.Fields{"deviceId": deviceID})
- agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+ agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
if _, err = agent.start(ctx, nil); err != nil {
logger.Warnw("Failure loading device", log.Fields{"deviceId": deviceID, "error": err})
} else {
@@ -487,13 +494,13 @@
<-ch
}
if agent, ok := dMgr.deviceAgents.Load(deviceID); ok {
- return agent.(*DeviceAgent), nil
+ return agent.(*Agent), nil
}
return nil, status.Errorf(codes.Aborted, "Error loading device %s", deviceID)
}
// loadRootDeviceParentAndChildren loads the children and parents of a root device in memory
-func (dMgr *DeviceManager) loadRootDeviceParentAndChildren(ctx context.Context, device *voltha.Device) error {
+func (dMgr *Manager) loadRootDeviceParentAndChildren(ctx context.Context, device *voltha.Device) error {
logger.Debugw("loading-parent-and-children", log.Fields{"deviceId": device.Id})
if device.Root {
// Scenario A
@@ -525,10 +532,10 @@
// in memory is for improved performance. It is not imperative that a device needs to be in memory when a request
// acting on the device is received by the core. In such a scenario, the Core will load the device in memory first
// and the proceed with the request.
-func (dMgr *DeviceManager) load(ctx context.Context, deviceID string) error {
+func (dMgr *Manager) load(ctx context.Context, deviceID string) error {
logger.Debug("load...")
// First load the device - this may fail in case the device was deleted intentionally by the other core
- var dAgent *DeviceAgent
+ var dAgent *Agent
var err error
if dAgent, err = dMgr.loadDevice(ctx, deviceID); err != nil {
return err
@@ -562,7 +569,7 @@
}
// ListDeviceIds retrieves the latest device IDs information from the data model (memory data only)
-func (dMgr *DeviceManager) ListDeviceIds() (*voltha.IDs, error) {
+func (dMgr *Manager) ListDeviceIds() (*voltha.IDs, error) {
logger.Debug("ListDeviceIDs")
// Report only device IDs that are in the device agent map
return dMgr.listDeviceIdsFromMap(), nil
@@ -570,7 +577,7 @@
//ReconcileDevices is a request to a voltha core to update its list of managed devices. This will
//trigger loading the devices along with their children and parent in memory
-func (dMgr *DeviceManager) ReconcileDevices(ctx context.Context, ids *voltha.IDs, ch chan interface{}) {
+func (dMgr *Manager) ReconcileDevices(ctx context.Context, ids *voltha.IDs, ch chan interface{}) {
logger.Debugw("ReconcileDevices", log.Fields{"numDevices": len(ids.Items)})
var res interface{}
if ids != nil && len(ids.Items) != 0 {
@@ -602,7 +609,7 @@
}
// adapterRestarted is invoked whenever an adapter is restarted
-func (dMgr *DeviceManager) adapterRestarted(ctx context.Context, adapter *voltha.Adapter) error {
+func (dMgr *Manager) adapterRestarted(ctx context.Context, adapter *voltha.Adapter) error {
logger.Debugw("adapter-restarted", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
@@ -615,7 +622,7 @@
responses := make([]utils.Response, 0)
for rootDeviceID := range dMgr.rootDevices {
if rootDevice, _ := dMgr.getDeviceFromModel(ctx, rootDeviceID); rootDevice != nil {
- isDeviceOwnedByService, err := dMgr.adapterProxy.endpointManager.IsDeviceOwnedByService(rootDeviceID, adapter.Type, adapter.CurrentReplica)
+ isDeviceOwnedByService, err := dMgr.adapterProxy.IsDeviceOwnedByService(rootDeviceID, adapter.Type, adapter.CurrentReplica)
if err != nil {
logger.Warnw("is-device-owned-by-service", log.Fields{"error": err, "root-device-id": rootDeviceID, "adapterType": adapter.Type, "replica-number": adapter.CurrentReplica})
continue
@@ -632,7 +639,7 @@
for _, port := range rootDevice.Ports {
for _, peer := range port.Peers {
if childDevice, _ := dMgr.getDeviceFromModel(ctx, peer.DeviceId); childDevice != nil {
- isDeviceOwnedByService, err := dMgr.adapterProxy.endpointManager.IsDeviceOwnedByService(childDevice.Id, adapter.Type, adapter.CurrentReplica)
+ isDeviceOwnedByService, err := dMgr.adapterProxy.IsDeviceOwnedByService(childDevice.Id, adapter.Type, adapter.CurrentReplica)
if err != nil {
logger.Warnw("is-device-owned-by-service", log.Fields{"error": err, "child-device-id": childDevice.Id, "adapterType": adapter.Type, "replica-number": adapter.CurrentReplica})
}
@@ -665,13 +672,13 @@
return nil
}
-func (dMgr *DeviceManager) sendReconcileDeviceRequest(ctx context.Context, device *voltha.Device) utils.Response {
+func (dMgr *Manager) sendReconcileDeviceRequest(ctx context.Context, device *voltha.Device) utils.Response {
// Send a reconcile request to the adapter. Since this Core may not be managing this device then there is no
// point of creating a device agent (if the device is not being managed by this Core) before sending the request
// to the adapter. We will therefore bypass the adapter adapter and send the request directly to the adapter via
- // the adapter_proxy.
+ // the adapter proxy.
response := utils.NewResponse()
- ch, err := dMgr.adapterProxy.reconcileDevice(ctx, device)
+ ch, err := dMgr.adapterProxy.ReconcileDevice(ctx, device)
if err != nil {
response.Error(err)
}
@@ -688,7 +695,7 @@
return response
}
-func (dMgr *DeviceManager) reconcileChildDevices(ctx context.Context, parentDeviceID string) error {
+func (dMgr *Manager) ReconcileChildDevices(ctx context.Context, parentDeviceID string) error {
if parentDevice, _ := dMgr.getDeviceFromModel(ctx, parentDeviceID); parentDevice != nil {
responses := make([]utils.Response, 0)
for _, port := range parentDevice.Ports {
@@ -706,15 +713,15 @@
return nil
}
-func (dMgr *DeviceManager) updateDeviceUsingAdapterData(ctx context.Context, device *voltha.Device) error {
- logger.Debugw("updateDeviceUsingAdapterData", log.Fields{"deviceid": device.Id, "device": device})
+func (dMgr *Manager) UpdateDeviceUsingAdapterData(ctx context.Context, device *voltha.Device) error {
+ logger.Debugw("UpdateDeviceUsingAdapterData", log.Fields{"deviceid": device.Id, "device": device})
if agent := dMgr.getDeviceAgent(ctx, device.Id); agent != nil {
return agent.updateDeviceUsingAdapterData(ctx, device)
}
return status.Errorf(codes.NotFound, "%s", device.Id)
}
-func (dMgr *DeviceManager) addPort(ctx context.Context, deviceID string, port *voltha.Port) error {
+func (dMgr *Manager) AddPort(ctx context.Context, deviceID string, port *voltha.Port) error {
agent := dMgr.getDeviceAgent(ctx, deviceID)
if agent != nil {
if err := agent.addPort(ctx, port); err != nil {
@@ -749,7 +756,7 @@
return status.Errorf(codes.NotFound, "%s", deviceID)
}
-func (dMgr *DeviceManager) addFlowsAndGroups(ctx context.Context, deviceID string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
+func (dMgr *Manager) addFlowsAndGroups(ctx context.Context, deviceID string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
logger.Debugw("addFlowsAndGroups", log.Fields{"deviceid": deviceID, "groups:": groups, "flowMetadata": flowMetadata})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.addFlowsAndGroups(ctx, flows, groups, flowMetadata)
@@ -757,7 +764,7 @@
return status.Errorf(codes.NotFound, "%s", deviceID)
}
-func (dMgr *DeviceManager) deleteFlowsAndGroups(ctx context.Context, deviceID string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
+func (dMgr *Manager) deleteFlowsAndGroups(ctx context.Context, deviceID string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
logger.Debugw("deleteFlowsAndGroups", log.Fields{"deviceid": deviceID})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.deleteFlowsAndGroups(ctx, flows, groups, flowMetadata)
@@ -765,7 +772,7 @@
return status.Errorf(codes.NotFound, "%s", deviceID)
}
-func (dMgr *DeviceManager) updateFlowsAndGroups(ctx context.Context, deviceID string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
+func (dMgr *Manager) updateFlowsAndGroups(ctx context.Context, deviceID string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
logger.Debugw("updateFlowsAndGroups", log.Fields{"deviceid": deviceID})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.updateFlowsAndGroups(ctx, flows, groups, flowMetadata)
@@ -773,9 +780,9 @@
return status.Errorf(codes.NotFound, "%s", deviceID)
}
-// updatePmConfigs updates the PM configs. This is executed when the northbound gRPC API is invoked, typically
+// UpdatePmConfigs updates the PM configs. This is executed when the northbound gRPC API is invoked, typically
// following a user action
-func (dMgr *DeviceManager) updatePmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs, ch chan interface{}) {
+func (dMgr *Manager) UpdatePmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs, ch chan interface{}) {
var res interface{}
if pmConfigs.Id == "" {
res = status.Errorf(codes.FailedPrecondition, "invalid-device-Id")
@@ -787,8 +794,8 @@
sendResponse(ctx, ch, res)
}
-// initPmConfigs initialize the pm configs as defined by the adapter.
-func (dMgr *DeviceManager) initPmConfigs(ctx context.Context, deviceID string, pmConfigs *voltha.PmConfigs) error {
+// InitPmConfigs initialize the pm configs as defined by the adapter.
+func (dMgr *Manager) InitPmConfigs(ctx context.Context, deviceID string, pmConfigs *voltha.PmConfigs) error {
if pmConfigs.Id == "" {
return status.Errorf(codes.FailedPrecondition, "invalid-device-Id")
}
@@ -798,14 +805,14 @@
return status.Errorf(codes.NotFound, "%s", deviceID)
}
-func (dMgr *DeviceManager) listPmConfigs(ctx context.Context, deviceID string) (*voltha.PmConfigs, error) {
+func (dMgr *Manager) ListPmConfigs(ctx context.Context, deviceID string) (*voltha.PmConfigs, error) {
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.listPmConfigs(ctx)
}
return nil, status.Errorf(codes.NotFound, "%s", deviceID)
}
-func (dMgr *DeviceManager) getSwitchCapability(ctx context.Context, deviceID string) (*ic.SwitchCapability, error) {
+func (dMgr *Manager) getSwitchCapability(ctx context.Context, deviceID string) (*ic.SwitchCapability, error) {
logger.Debugw("getSwitchCapability", log.Fields{"deviceid": deviceID})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.getSwitchCapability(ctx)
@@ -813,15 +820,15 @@
return nil, status.Errorf(codes.NotFound, "%s", deviceID)
}
-func (dMgr *DeviceManager) getPorts(ctx context.Context, deviceID string, portType voltha.Port_PortType) (*voltha.Ports, error) {
- logger.Debugw("getPorts", log.Fields{"deviceid": deviceID, "portType": portType})
+func (dMgr *Manager) GetPorts(ctx context.Context, deviceID string, portType voltha.Port_PortType) (*voltha.Ports, error) {
+ logger.Debugw("GetPorts", log.Fields{"deviceid": deviceID, "portType": portType})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.getPorts(ctx, portType), nil
}
return nil, status.Errorf(codes.NotFound, "%s", deviceID)
}
-func (dMgr *DeviceManager) getPortCapability(ctx context.Context, deviceID string, portNo uint32) (*ic.PortCapability, error) {
+func (dMgr *Manager) getPortCapability(ctx context.Context, deviceID string, portNo uint32) (*ic.PortCapability, error) {
logger.Debugw("getPortCapability", log.Fields{"deviceid": deviceID})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.getPortCapability(ctx, portNo)
@@ -829,16 +836,16 @@
return nil, status.Errorf(codes.NotFound, "%s", deviceID)
}
-func (dMgr *DeviceManager) updateDeviceStatus(ctx context.Context, deviceID string, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
- logger.Debugw("updateDeviceStatus", log.Fields{"deviceid": deviceID, "operStatus": operStatus, "connStatus": connStatus})
+func (dMgr *Manager) UpdateDeviceStatus(ctx context.Context, deviceID string, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
+ logger.Debugw("UpdateDeviceStatus", log.Fields{"deviceid": deviceID, "operStatus": operStatus, "connStatus": connStatus})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.updateDeviceStatus(ctx, operStatus, connStatus)
}
return status.Errorf(codes.NotFound, "%s", deviceID)
}
-func (dMgr *DeviceManager) updateChildrenStatus(ctx context.Context, deviceID string, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
- logger.Debugw("updateChildrenStatus", log.Fields{"parentDeviceid": deviceID, "operStatus": operStatus, "connStatus": connStatus})
+func (dMgr *Manager) UpdateChildrenStatus(ctx context.Context, deviceID string, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
+ logger.Debugw("UpdateChildrenStatus", log.Fields{"parentDeviceid": deviceID, "operStatus": operStatus, "connStatus": connStatus})
var parentDevice *voltha.Device
var err error
if parentDevice, err = dMgr.GetDevice(ctx, deviceID); err != nil {
@@ -861,8 +868,8 @@
return nil
}
-func (dMgr *DeviceManager) updatePortState(ctx context.Context, deviceID string, portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_Types) error {
- logger.Debugw("updatePortState", log.Fields{"deviceid": deviceID, "portType": portType, "portNo": portNo, "operStatus": operStatus})
+func (dMgr *Manager) UpdatePortState(ctx context.Context, deviceID string, portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_Types) error {
+ logger.Debugw("UpdatePortState", log.Fields{"deviceid": deviceID, "portType": portType, "portNo": portNo, "operStatus": operStatus})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
if err := agent.updatePortState(ctx, portType, portNo, operStatus); err != nil {
logger.Errorw("updating-port-state-failed", log.Fields{"deviceid": deviceID, "portNo": portNo, "error": err})
@@ -889,14 +896,14 @@
return status.Errorf(codes.NotFound, "%s", deviceID)
}
-func (dMgr *DeviceManager) deleteAllPorts(ctx context.Context, deviceID string) error {
+func (dMgr *Manager) DeleteAllPorts(ctx context.Context, deviceID string) error {
logger.Debugw("DeleteAllPorts", log.Fields{"deviceid": deviceID})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
if err := agent.deleteAllPorts(ctx); err != nil {
return err
}
// Notify the logical device manager to remove all logical ports, if needed.
- // At this stage the device itself may gave been deleted already at a deleteAllPorts
+ // At this stage the device itself may gave been deleted already at a DeleteAllPorts
// typically is part of a device deletion phase.
if device, err := dMgr.GetDevice(ctx, deviceID); err == nil {
go func() {
@@ -914,9 +921,9 @@
return status.Errorf(codes.NotFound, "%s", deviceID)
}
-//updatePortsState updates all ports on the device
-func (dMgr *DeviceManager) updatePortsState(ctx context.Context, deviceID string, state voltha.OperStatus_Types) error {
- logger.Debugw("updatePortsState", log.Fields{"deviceid": deviceID})
+//UpdatePortsState updates all ports on the device
+func (dMgr *Manager) UpdatePortsState(ctx context.Context, deviceID string, state voltha.OperStatus_Types) error {
+ logger.Debugw("UpdatePortsState", log.Fields{"deviceid": deviceID})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
switch state {
@@ -948,19 +955,14 @@
return status.Errorf(codes.NotFound, "%s", deviceID)
}
-func (dMgr *DeviceManager) childDeviceDetected(ctx context.Context, parentDeviceID string, parentPortNo int64, deviceType string,
+func (dMgr *Manager) ChildDeviceDetected(ctx context.Context, parentDeviceID string, parentPortNo int64, deviceType string,
channelID int64, vendorID string, serialNumber string, onuID int64) (*voltha.Device, error) {
- logger.Debugw("childDeviceDetected", log.Fields{"parentDeviceId": parentDeviceID, "parentPortNo": parentPortNo, "deviceType": deviceType, "channelId": channelID, "vendorId": vendorID, "serialNumber": serialNumber, "onuId": onuID})
+ logger.Debugw("ChildDeviceDetected", log.Fields{"parentDeviceId": parentDeviceID, "parentPortNo": parentPortNo, "deviceType": deviceType, "channelId": channelID, "vendorId": vendorID, "serialNumber": serialNumber, "onuId": onuID})
if deviceType == "" && vendorID != "" {
logger.Debug("device-type-is-nil-fetching-device-type")
- var deviceTypes []*voltha.DeviceType
- if err := dMgr.adapterMgr.clusterDataProxy.List(ctx, "device_types", &deviceTypes); err != nil {
- logger.Errorw("failed-to-get-device-type-info", log.Fields{"error": err})
- return nil, err
- }
OLoop:
- for _, dType := range deviceTypes {
+ for _, dType := range dMgr.adapterMgr.ListDeviceTypes() {
for _, v := range dType.VendorIds {
if v == vendorID {
deviceType = dType.Adapter
@@ -1001,7 +1003,7 @@
childDevice.ProxyAddress = &voltha.Device_ProxyAddress{DeviceId: parentDeviceID, DeviceType: pAgent.deviceType, ChannelId: uint32(channelID), OnuId: uint32(onuID)}
// Create and start a device agent for that device
- agent := newDeviceAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+ agent := newAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
childDevice, err := agent.start(ctx, childDevice)
if err != nil {
logger.Errorw("error-starting-child-device", log.Fields{"parent-device-id": childDevice.ParentId, "child-device-id": agent.deviceID, "error": err})
@@ -1030,7 +1032,7 @@
return childDevice, nil
}
-func (dMgr *DeviceManager) processTransition(ctx context.Context, device *voltha.Device, previousState *DeviceState) error {
+func (dMgr *Manager) processTransition(ctx context.Context, device *voltha.Device, previousState *deviceState) error {
// This will be triggered on every state update
logger.Debugw("state-transition", log.Fields{
"device": device.Id,
@@ -1057,7 +1059,7 @@
return nil
}
-func (dMgr *DeviceManager) packetOut(ctx context.Context, deviceID string, outPort uint32, packet *ofp.OfpPacketOut) error {
+func (dMgr *Manager) packetOut(ctx context.Context, deviceID string, outPort uint32, packet *ofp.OfpPacketOut) error {
logger.Debugw("packetOut", log.Fields{"deviceId": deviceID, "outPort": outPort})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.packetOut(ctx, outPort, packet)
@@ -1066,7 +1068,7 @@
}
// PacketIn receives packet from adapter
-func (dMgr *DeviceManager) PacketIn(ctx context.Context, deviceID string, port uint32, transactionID string, packet []byte) error {
+func (dMgr *Manager) PacketIn(ctx context.Context, deviceID string, port uint32, transactionID string, packet []byte) error {
logger.Debugw("PacketIn", log.Fields{"deviceId": deviceID, "port": port})
// Get the logical device Id based on the deviceId
var device *voltha.Device
@@ -1086,7 +1088,7 @@
return nil
}
-func (dMgr *DeviceManager) setParentID(ctx context.Context, device *voltha.Device, parentID string) error {
+func (dMgr *Manager) setParentID(ctx context.Context, device *voltha.Device, parentID string) error {
logger.Debugw("setParentId", log.Fields{"deviceId": device.Id, "parentId": parentID})
if agent := dMgr.getDeviceAgent(ctx, device.Id); agent != nil {
return agent.setParentID(ctx, device, parentID)
@@ -1095,7 +1097,7 @@
}
// CreateLogicalDevice creates logical device in core
-func (dMgr *DeviceManager) CreateLogicalDevice(ctx context.Context, cDevice *voltha.Device) error {
+func (dMgr *Manager) CreateLogicalDevice(ctx context.Context, cDevice *voltha.Device) error {
logger.Info("CreateLogicalDevice")
// Verify whether the logical device has already been created
if cDevice.ParentId != "" {
@@ -1111,7 +1113,7 @@
}
// DeleteLogicalDevice deletes logical device from core
-func (dMgr *DeviceManager) DeleteLogicalDevice(ctx context.Context, cDevice *voltha.Device) error {
+func (dMgr *Manager) DeleteLogicalDevice(ctx context.Context, cDevice *voltha.Device) error {
logger.Info("DeleteLogicalDevice")
var err error
if err = dMgr.logicalDeviceMgr.deleteLogicalDevice(ctx, cDevice); err != nil {
@@ -1125,7 +1127,7 @@
}
// DeleteLogicalPort removes the logical port associated with a device
-func (dMgr *DeviceManager) DeleteLogicalPort(ctx context.Context, device *voltha.Device) error {
+func (dMgr *Manager) DeleteLogicalPort(ctx context.Context, device *voltha.Device) error {
logger.Info("deleteLogicalPort")
var err error
// Get the logical port associated with this device
@@ -1142,7 +1144,7 @@
}
// DeleteLogicalPorts removes the logical ports associated with that deviceId
-func (dMgr *DeviceManager) DeleteLogicalPorts(ctx context.Context, cDevice *voltha.Device) error {
+func (dMgr *Manager) DeleteLogicalPorts(ctx context.Context, cDevice *voltha.Device) error {
logger.Debugw("delete-all-logical-ports", log.Fields{"device-id": cDevice.Id})
if err := dMgr.logicalDeviceMgr.deleteLogicalPorts(ctx, cDevice.Id); err != nil {
// Just log the error. The logical device or port may already have been deleted before this callback is invoked.
@@ -1151,7 +1153,7 @@
return nil
}
-func (dMgr *DeviceManager) getParentDevice(ctx context.Context, childDevice *voltha.Device) *voltha.Device {
+func (dMgr *Manager) getParentDevice(ctx context.Context, childDevice *voltha.Device) *voltha.Device {
// Sanity check
if childDevice.Root {
// childDevice is the parent device
@@ -1161,10 +1163,10 @@
return parentDevice
}
-//childDevicesLost is invoked by an adapter to indicate that a parent device is in a state (Disabled) where it
+//ChildDevicesLost is invoked by an adapter to indicate that a parent device is in a state (Disabled) where it
//cannot manage the child devices. This will trigger the Core to disable all the child devices.
-func (dMgr *DeviceManager) childDevicesLost(ctx context.Context, parentDeviceID string) error {
- logger.Debug("childDevicesLost")
+func (dMgr *Manager) ChildDevicesLost(ctx context.Context, parentDeviceID string) error {
+ logger.Debug("ChildDevicesLost")
var err error
var parentDevice *voltha.Device
if parentDevice, err = dMgr.GetDevice(ctx, parentDeviceID); err != nil {
@@ -1174,10 +1176,10 @@
return dMgr.DisableAllChildDevices(ctx, parentDevice)
}
-//childDevicesDetected is invoked by an adapter when child devices are found, typically after after a
+//ChildDevicesDetected is invoked by an adapter when child devices are found, typically after after a
// disable/enable sequence. This will trigger the Core to Enable all the child devices of that parent.
-func (dMgr *DeviceManager) childDevicesDetected(ctx context.Context, parentDeviceID string) error {
- logger.Debug("childDevicesDetected")
+func (dMgr *Manager) ChildDevicesDetected(ctx context.Context, parentDeviceID string) error {
+ logger.Debug("ChildDevicesDetected")
var err error
var parentDevice *voltha.Device
var childDeviceIds []string
@@ -1221,7 +1223,7 @@
*/
//DisableAllChildDevices is invoked as a callback when the parent device is disabled
-func (dMgr *DeviceManager) DisableAllChildDevices(ctx context.Context, parentCurrDevice *voltha.Device) error {
+func (dMgr *Manager) DisableAllChildDevices(ctx context.Context, parentCurrDevice *voltha.Device) error {
logger.Debug("DisableAllChildDevices")
var childDeviceIds []string
var err error
@@ -1243,7 +1245,7 @@
}
//DeleteAllChildDevices is invoked as a callback when the parent device is deleted
-func (dMgr *DeviceManager) DeleteAllChildDevices(ctx context.Context, parentCurrDevice *voltha.Device) error {
+func (dMgr *Manager) DeleteAllChildDevices(ctx context.Context, parentCurrDevice *voltha.Device) error {
logger.Debug("DeleteAllChildDevices")
var childDeviceIds []string
var err error
@@ -1266,7 +1268,7 @@
}
//DeleteAllUNILogicalPorts is invoked as a callback when the parent device is deleted
-func (dMgr *DeviceManager) DeleteAllUNILogicalPorts(ctx context.Context, curr *voltha.Device) error {
+func (dMgr *Manager) DeleteAllUNILogicalPorts(ctx context.Context, curr *voltha.Device) error {
logger.Debugw("delete-all-uni-logical-ports", log.Fields{"parent-device-id": curr.Id})
if err := dMgr.logicalDeviceMgr.deleteAllUNILogicalPorts(ctx, curr); err != nil {
// Just log the error and let the remaining pipeline proceed - ports may already have been deleted
@@ -1276,7 +1278,7 @@
}
//DeleteAllLogicalPorts is invoked as a callback when the parent device's connection status moves to UNREACHABLE
-func (dMgr *DeviceManager) DeleteAllLogicalPorts(ctx context.Context, parentDevice *voltha.Device) error {
+func (dMgr *Manager) DeleteAllLogicalPorts(ctx context.Context, parentDevice *voltha.Device) error {
logger.Debugw("delete-all-logical-ports", log.Fields{"parent-device-id": parentDevice.Id})
if err := dMgr.logicalDeviceMgr.deleteAllLogicalPorts(ctx, parentDevice); err != nil {
// Just log error as logical device may already have been deleted
@@ -1286,7 +1288,7 @@
}
//DeleteAllDeviceFlows is invoked as a callback when the parent device's connection status moves to UNREACHABLE
-func (dMgr *DeviceManager) DeleteAllDeviceFlows(ctx context.Context, parentDevice *voltha.Device) error {
+func (dMgr *Manager) DeleteAllDeviceFlows(ctx context.Context, parentDevice *voltha.Device) error {
logger.Debugw("delete-all-device-flows", log.Fields{"parent-device-id": parentDevice.Id})
if agent := dMgr.getDeviceAgent(ctx, parentDevice.Id); agent != nil {
if err := agent.deleteAllFlows(ctx); err != nil {
@@ -1299,7 +1301,7 @@
}
//getAllChildDeviceIds is a helper method to get all the child device IDs from the device passed as parameter
-func (dMgr *DeviceManager) getAllChildDeviceIds(parentDevice *voltha.Device) ([]string, error) {
+func (dMgr *Manager) getAllChildDeviceIds(parentDevice *voltha.Device) ([]string, error) {
logger.Debugw("getAllChildDeviceIds", log.Fields{"parentDeviceId": parentDevice.Id})
childDeviceIds := make([]string, 0)
if parentDevice != nil {
@@ -1313,9 +1315,9 @@
return childDeviceIds, nil
}
-//getAllChildDevices is a helper method to get all the child device IDs from the device passed as parameter
-func (dMgr *DeviceManager) getAllChildDevices(ctx context.Context, parentDeviceID string) (*voltha.Devices, error) {
- logger.Debugw("getAllChildDevices", log.Fields{"parentDeviceId": parentDeviceID})
+//GetAllChildDevices is a helper method to get all the child device IDs from the device passed as parameter
+func (dMgr *Manager) GetAllChildDevices(ctx context.Context, parentDeviceID string) (*voltha.Devices, error) {
+ logger.Debugw("GetAllChildDevices", log.Fields{"parentDeviceId": parentDeviceID})
if parentDevice, err := dMgr.GetDevice(ctx, parentDeviceID); err == nil {
childDevices := make([]*voltha.Device, 0)
if childDeviceIds, er := dMgr.getAllChildDeviceIds(parentDevice); er == nil {
@@ -1331,7 +1333,7 @@
}
// SetupUNILogicalPorts creates UNI ports on the logical device that represents a child UNI interface
-func (dMgr *DeviceManager) SetupUNILogicalPorts(ctx context.Context, cDevice *voltha.Device) error {
+func (dMgr *Manager) SetupUNILogicalPorts(ctx context.Context, cDevice *voltha.Device) error {
logger.Info("addUNILogicalPort")
if err := dMgr.logicalDeviceMgr.setupUNILogicalPorts(ctx, cDevice); err != nil {
logger.Warnw("addUNILogicalPort-error", log.Fields{"device": cDevice, "err": err})
@@ -1340,13 +1342,13 @@
return nil
}
-func (dMgr *DeviceManager) downloadImage(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
- logger.Debugw("downloadImage", log.Fields{"deviceid": img.Id, "imageName": img.Name})
+func (dMgr *Manager) DownloadImage(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
+ logger.Debugw("DownloadImage", log.Fields{"deviceid": img.Id, "imageName": img.Name})
var res interface{}
var err error
if agent := dMgr.getDeviceAgent(ctx, img.Id); agent != nil {
if res, err = agent.downloadImage(ctx, img); err != nil {
- logger.Debugw("downloadImage-failed", log.Fields{"err": err, "imageName": img.Name})
+ logger.Debugw("DownloadImage-failed", log.Fields{"err": err, "imageName": img.Name})
res = err
}
} else {
@@ -1355,13 +1357,13 @@
sendResponse(ctx, ch, res)
}
-func (dMgr *DeviceManager) cancelImageDownload(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
- logger.Debugw("cancelImageDownload", log.Fields{"deviceid": img.Id, "imageName": img.Name})
+func (dMgr *Manager) CancelImageDownload(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
+ logger.Debugw("CancelImageDownload", log.Fields{"deviceid": img.Id, "imageName": img.Name})
var res interface{}
var err error
if agent := dMgr.getDeviceAgent(ctx, img.Id); agent != nil {
if res, err = agent.cancelImageDownload(ctx, img); err != nil {
- logger.Debugw("cancelImageDownload-failed", log.Fields{"err": err, "imageName": img.Name})
+ logger.Debugw("CancelImageDownload-failed", log.Fields{"err": err, "imageName": img.Name})
res = err
}
} else {
@@ -1370,13 +1372,13 @@
sendResponse(ctx, ch, res)
}
-func (dMgr *DeviceManager) activateImage(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
- logger.Debugw("activateImage", log.Fields{"deviceid": img.Id, "imageName": img.Name})
+func (dMgr *Manager) ActivateImage(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
+ logger.Debugw("ActivateImage", log.Fields{"deviceid": img.Id, "imageName": img.Name})
var res interface{}
var err error
if agent := dMgr.getDeviceAgent(ctx, img.Id); agent != nil {
if res, err = agent.activateImage(ctx, img); err != nil {
- logger.Debugw("activateImage-failed", log.Fields{"err": err, "imageName": img.Name})
+ logger.Debugw("ActivateImage-failed", log.Fields{"err": err, "imageName": img.Name})
res = err
}
} else {
@@ -1385,13 +1387,13 @@
sendResponse(ctx, ch, res)
}
-func (dMgr *DeviceManager) revertImage(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
- logger.Debugw("revertImage", log.Fields{"deviceid": img.Id, "imageName": img.Name})
+func (dMgr *Manager) RevertImage(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
+ logger.Debugw("RevertImage", log.Fields{"deviceid": img.Id, "imageName": img.Name})
var res interface{}
var err error
if agent := dMgr.getDeviceAgent(ctx, img.Id); agent != nil {
if res, err = agent.revertImage(ctx, img); err != nil {
- logger.Debugw("revertImage-failed", log.Fields{"err": err, "imageName": img.Name})
+ logger.Debugw("RevertImage-failed", log.Fields{"err": err, "imageName": img.Name})
res = err
}
} else {
@@ -1400,13 +1402,13 @@
sendResponse(ctx, ch, res)
}
-func (dMgr *DeviceManager) getImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
- logger.Debugw("getImageDownloadStatus", log.Fields{"deviceid": img.Id, "imageName": img.Name})
+func (dMgr *Manager) GetImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
+ logger.Debugw("GetImageDownloadStatus", log.Fields{"deviceid": img.Id, "imageName": img.Name})
var res interface{}
var err error
if agent := dMgr.getDeviceAgent(ctx, img.Id); agent != nil {
if res, err = agent.getImageDownloadStatus(ctx, img); err != nil {
- logger.Debugw("getImageDownloadStatus-failed", log.Fields{"err": err, "imageName": img.Name})
+ logger.Debugw("GetImageDownloadStatus-failed", log.Fields{"err": err, "imageName": img.Name})
res = err
}
} else {
@@ -1415,11 +1417,11 @@
sendResponse(ctx, ch, res)
}
-func (dMgr *DeviceManager) updateImageDownload(ctx context.Context, deviceID string, img *voltha.ImageDownload) error {
- logger.Debugw("updateImageDownload", log.Fields{"deviceid": img.Id, "imageName": img.Name})
+func (dMgr *Manager) UpdateImageDownload(ctx context.Context, deviceID string, img *voltha.ImageDownload) error {
+ logger.Debugw("UpdateImageDownload", log.Fields{"deviceid": img.Id, "imageName": img.Name})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
if err := agent.updateImageDownload(ctx, img); err != nil {
- logger.Debugw("updateImageDownload-failed", log.Fields{"err": err, "imageName": img.Name})
+ logger.Debugw("UpdateImageDownload-failed", log.Fields{"err": err, "imageName": img.Name})
return err
}
} else {
@@ -1428,23 +1430,23 @@
return nil
}
-func (dMgr *DeviceManager) getImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
- logger.Debugw("getImageDownload", log.Fields{"deviceid": img.Id, "imageName": img.Name})
+func (dMgr *Manager) GetImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ logger.Debugw("GetImageDownload", log.Fields{"deviceid": img.Id, "imageName": img.Name})
if agent := dMgr.getDeviceAgent(ctx, img.Id); agent != nil {
return agent.getImageDownload(ctx, img)
}
return nil, status.Errorf(codes.NotFound, "%s", img.Id)
}
-func (dMgr *DeviceManager) listImageDownloads(ctx context.Context, deviceID string) (*voltha.ImageDownloads, error) {
- logger.Debugw("listImageDownloads", log.Fields{"deviceID": deviceID})
+func (dMgr *Manager) ListImageDownloads(ctx context.Context, deviceID string) (*voltha.ImageDownloads, error) {
+ logger.Debugw("ListImageDownloads", log.Fields{"deviceID": deviceID})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.listImageDownloads(ctx, deviceID)
}
return nil, status.Errorf(codes.NotFound, "%s", deviceID)
}
-func (dMgr *DeviceManager) NotifyInvalidTransition(_ context.Context, device *voltha.Device) error {
+func (dMgr *Manager) NotifyInvalidTransition(_ context.Context, device *voltha.Device) error {
logger.Errorw("NotifyInvalidTransition", log.Fields{
"device": device.Id,
"curr-admin-state": device.AdminState,
@@ -1462,14 +1464,14 @@
}
// UpdateDeviceAttribute updates value of particular device attribute
-func (dMgr *DeviceManager) UpdateDeviceAttribute(ctx context.Context, deviceID string, attribute string, value interface{}) {
+func (dMgr *Manager) UpdateDeviceAttribute(ctx context.Context, deviceID string, attribute string, value interface{}) {
if agent, ok := dMgr.deviceAgents.Load(deviceID); ok {
- agent.(*DeviceAgent).updateDeviceAttribute(ctx, attribute, value)
+ agent.(*Agent).updateDeviceAttribute(ctx, attribute, value)
}
}
// GetParentDeviceID returns parent device id, either from memory or from the dB, if present
-func (dMgr *DeviceManager) GetParentDeviceID(ctx context.Context, deviceID string) string {
+func (dMgr *Manager) GetParentDeviceID(ctx context.Context, deviceID string) string {
if device, _ := dMgr.GetDevice(ctx, deviceID); device != nil {
logger.Infow("GetParentDeviceId", log.Fields{"deviceId": device.Id, "parentId": device.ParentId})
return device.ParentId
@@ -1477,33 +1479,33 @@
return ""
}
-func (dMgr *DeviceManager) simulateAlarm(ctx context.Context, simulatereq *voltha.SimulateAlarmRequest, ch chan interface{}) {
- logger.Debugw("simulateAlarm", log.Fields{"id": simulatereq.Id, "Indicator": simulatereq.Indicator, "IntfId": simulatereq.IntfId,
+func (dMgr *Manager) SimulateAlarm(ctx context.Context, simulatereq *voltha.SimulateAlarmRequest, ch chan interface{}) {
+ logger.Debugw("SimulateAlarm", log.Fields{"id": simulatereq.Id, "Indicator": simulatereq.Indicator, "IntfId": simulatereq.IntfId,
"PortTypeName": simulatereq.PortTypeName, "OnuDeviceId": simulatereq.OnuDeviceId, "InverseBitErrorRate": simulatereq.InverseBitErrorRate,
"Drift": simulatereq.Drift, "NewEqd": simulatereq.NewEqd, "OnuSerialNumber": simulatereq.OnuSerialNumber, "Operation": simulatereq.Operation})
var res interface{}
if agent := dMgr.getDeviceAgent(ctx, simulatereq.Id); agent != nil {
res = agent.simulateAlarm(ctx, simulatereq)
- logger.Debugw("simulateAlarm-result", log.Fields{"result": res})
+ logger.Debugw("SimulateAlarm-result", log.Fields{"result": res})
}
//TODO CLI always get successful response
sendResponse(ctx, ch, res)
}
-func (dMgr *DeviceManager) updateDeviceReason(ctx context.Context, deviceID string, reason string) error {
- logger.Debugw("updateDeviceReason", log.Fields{"deviceid": deviceID, "reason": reason})
+func (dMgr *Manager) UpdateDeviceReason(ctx context.Context, deviceID string, reason string) error {
+ logger.Debugw("UpdateDeviceReason", log.Fields{"deviceid": deviceID, "reason": reason})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.updateDeviceReason(ctx, reason)
}
return status.Errorf(codes.NotFound, "%s", deviceID)
}
-func (dMgr *DeviceManager) enablePort(ctx context.Context, port *voltha.Port, ch chan interface{}) {
- logger.Debugw("enablePort", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
+func (dMgr *Manager) EnablePort(ctx context.Context, port *voltha.Port, ch chan interface{}) {
+ logger.Debugw("EnablePort", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
var res interface{}
if agent := dMgr.getDeviceAgent(ctx, port.DeviceId); agent != nil {
res = agent.enablePort(ctx, port)
- logger.Debugw("enablePort-result", log.Fields{"result": res})
+ logger.Debugw("EnablePort-result", log.Fields{"result": res})
} else {
res = status.Errorf(codes.NotFound, "%s", port.DeviceId)
}
@@ -1511,12 +1513,12 @@
sendResponse(ctx, ch, res)
}
-func (dMgr *DeviceManager) disablePort(ctx context.Context, port *voltha.Port, ch chan interface{}) {
- logger.Debugw("disablePort", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
+func (dMgr *Manager) DisablePort(ctx context.Context, port *voltha.Port, ch chan interface{}) {
+ logger.Debugw("DisablePort", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
var res interface{}
if agent := dMgr.getDeviceAgent(ctx, port.DeviceId); agent != nil {
res = agent.disablePort(ctx, port)
- logger.Debugw("disablePort-result", log.Fields{"result": res})
+ logger.Debugw("DisablePort-result", log.Fields{"result": res})
} else {
res = status.Errorf(codes.NotFound, "%s", port.DeviceId)
}
@@ -1524,8 +1526,8 @@
sendResponse(ctx, ch, res)
}
-// childDeviceLost calls parent adapter to delete child device and all its references
-func (dMgr *DeviceManager) ChildDeviceLost(ctx context.Context, curr *voltha.Device) error {
+// ChildDeviceLost calls parent adapter to delete child device and all its references
+func (dMgr *Manager) ChildDeviceLost(ctx context.Context, curr *voltha.Device) error {
logger.Debugw("childDeviceLost", log.Fields{"child-device-id": curr.Id, "parent-device-id": curr.ParentId})
if parentAgent := dMgr.getDeviceAgent(ctx, curr.ParentId); parentAgent != nil {
if err := parentAgent.ChildDeviceLost(ctx, curr); err != nil {
@@ -1537,7 +1539,7 @@
return nil
}
-func (dMgr *DeviceManager) startOmciTest(ctx context.Context, omcitestrequest *voltha.OmciTestRequest) (*voltha.TestResponse, error) {
+func (dMgr *Manager) StartOmciTest(ctx context.Context, omcitestrequest *voltha.OmciTestRequest) (*voltha.TestResponse, error) {
logger.Debugw("Omci_test_Request", log.Fields{"device-id": omcitestrequest.Id, "uuid": omcitestrequest.Uuid})
if agent := dMgr.getDeviceAgent(ctx, omcitestrequest.Id); agent != nil {
res, err := agent.startOmciTest(ctx, omcitestrequest)
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/device/remote/adapter_proxy.go
similarity index 78%
rename from rw_core/core/adapter_proxy.go
rename to rw_core/core/device/remote/adapter_proxy.go
index 1e18ba4..4b04ee5 100755
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/device/remote/adapter_proxy.go
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package core
+package remote
import (
"context"
@@ -27,19 +27,19 @@
// AdapterProxy represents adapter proxy attributes
type AdapterProxy struct {
+ kafka.EndpointManager
deviceTopicRegistered bool
corePairTopic string
kafkaICProxy kafka.InterContainerProxy
- endpointManager kafka.EndpointManager
}
// NewAdapterProxy will return adapter proxy instance
func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, corePairTopic string, endpointManager kafka.EndpointManager) *AdapterProxy {
return &AdapterProxy{
+ EndpointManager: endpointManager,
kafkaICProxy: kafkaProxy,
corePairTopic: corePairTopic,
deviceTopicRegistered: false,
- endpointManager: endpointManager,
}
}
@@ -49,7 +49,7 @@
func (ap *AdapterProxy) getAdapterTopic(deviceID string, adapterType string) (*kafka.Topic, error) {
- endpoint, err := ap.endpointManager.GetEndpoint(deviceID, adapterType)
+ endpoint, err := ap.GetEndpoint(deviceID, adapterType)
if err != nil {
return nil, err
}
@@ -73,9 +73,9 @@
return respChnl, nil
}
-// adoptDevice invokes adopt device rpc
-func (ap *AdapterProxy) adoptDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
- logger.Debugw("adoptDevice", log.Fields{"device-id": device.Id})
+// AdoptDevice invokes adopt device rpc
+func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
+ logger.Debugw("AdoptDevice", log.Fields{"device-id": device.Id})
rpc := "adopt_device"
toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
if err != nil {
@@ -90,9 +90,9 @@
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
-// disableDevice invokes disable device rpc
-func (ap *AdapterProxy) disableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
- logger.Debugw("disableDevice", log.Fields{"device-id": device.Id})
+// DisableDevice invokes disable device rpc
+func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
+ logger.Debugw("DisableDevice", log.Fields{"device-id": device.Id})
rpc := "disable_device"
toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
if err != nil {
@@ -105,9 +105,9 @@
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
-// reEnableDevice invokes reenable device rpc
-func (ap *AdapterProxy) reEnableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
- logger.Debugw("reEnableDevice", log.Fields{"device-id": device.Id})
+// ReEnableDevice invokes reenable device rpc
+func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
+ logger.Debugw("ReEnableDevice", log.Fields{"device-id": device.Id})
rpc := "reenable_device"
toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
if err != nil {
@@ -120,9 +120,9 @@
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
-// rebootDevice invokes reboot device rpc
-func (ap *AdapterProxy) rebootDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
- logger.Debugw("rebootDevice", log.Fields{"device-id": device.Id})
+// RebootDevice invokes reboot device rpc
+func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
+ logger.Debugw("RebootDevice", log.Fields{"device-id": device.Id})
rpc := "reboot_device"
toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
if err != nil {
@@ -135,9 +135,9 @@
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
-// deleteDevice invokes delete device rpc
-func (ap *AdapterProxy) deleteDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
- logger.Debugw("deleteDevice", log.Fields{"device-id": device.Id})
+// DeleteDevice invokes delete device rpc
+func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
+ logger.Debugw("DeleteDevice", log.Fields{"device-id": device.Id})
rpc := "delete_device"
toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
if err != nil {
@@ -150,9 +150,9 @@
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
-// getOfpDeviceInfo invokes get ofp device info rpc
-func (ap *AdapterProxy) getOfpDeviceInfo(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
- logger.Debugw("getOfpDeviceInfo", log.Fields{"device-id": device.Id})
+// GetOfpDeviceInfo invokes get ofp device info rpc
+func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
+ logger.Debugw("GetOfpDeviceInfo", log.Fields{"device-id": device.Id})
rpc := "get_ofp_device_info"
toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
if err != nil {
@@ -165,9 +165,9 @@
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
-// getOfpPortInfo invokes get ofp port info rpc
-func (ap *AdapterProxy) getOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (chan *kafka.RpcResponse, error) {
- logger.Debugw("getOfpPortInfo", log.Fields{"device-id": device.Id, "port-no": portNo})
+// GetOfpPortInfo invokes get ofp port info rpc
+func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (chan *kafka.RpcResponse, error) {
+ logger.Debugw("GetOfpPortInfo", log.Fields{"device-id": device.Id, "port-no": portNo})
toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
if err != nil {
return nil, err
@@ -180,9 +180,9 @@
return ap.sendRPC(ctx, "get_ofp_port_info", toTopic, &replyToTopic, true, device.Id, args...)
}
-// reconcileDevice invokes reconcile device rpc
-func (ap *AdapterProxy) reconcileDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
- logger.Debugw("reconcileDevice", log.Fields{"device-id": device.Id})
+// ReconcileDevice invokes reconcile device rpc
+func (ap *AdapterProxy) ReconcileDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
+ logger.Debugw("ReconcileDevice", log.Fields{"device-id": device.Id})
rpc := "reconcile_device"
toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
if err != nil {
@@ -195,9 +195,9 @@
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
-// downloadImage invokes download image rpc
-func (ap *AdapterProxy) downloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
- logger.Debugw("downloadImage", log.Fields{"device-id": device.Id, "image": download.Name})
+// DownloadImage invokes download image rpc
+func (ap *AdapterProxy) DownloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
+ logger.Debugw("DownloadImage", log.Fields{"device-id": device.Id, "image": download.Name})
rpc := "download_image"
toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
if err != nil {
@@ -211,9 +211,9 @@
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
-// getImageDownloadStatus invokes get image download status rpc
-func (ap *AdapterProxy) getImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
- logger.Debugw("getImageDownloadStatus", log.Fields{"device-id": device.Id, "image": download.Name})
+// GetImageDownloadStatus invokes get image download status rpc
+func (ap *AdapterProxy) GetImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
+ logger.Debugw("GetImageDownloadStatus", log.Fields{"device-id": device.Id, "image": download.Name})
rpc := "get_image_download_status"
toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
if err != nil {
@@ -227,9 +227,9 @@
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
-// cancelImageDownload invokes cancel image download rpc
-func (ap *AdapterProxy) cancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
- logger.Debugw("cancelImageDownload", log.Fields{"device-id": device.Id, "image": download.Name})
+// CancelImageDownload invokes cancel image download rpc
+func (ap *AdapterProxy) CancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
+ logger.Debugw("CancelImageDownload", log.Fields{"device-id": device.Id, "image": download.Name})
rpc := "cancel_image_download"
toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
if err != nil {
@@ -243,9 +243,9 @@
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
-// activateImageUpdate invokes activate image update rpc
-func (ap *AdapterProxy) activateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
- logger.Debugw("activateImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
+// ActivateImageUpdate invokes activate image update rpc
+func (ap *AdapterProxy) ActivateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
+ logger.Debugw("ActivateImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
rpc := "activate_image_update"
toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
if err != nil {
@@ -259,9 +259,9 @@
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
-// revertImageUpdate invokes revert image update rpc
-func (ap *AdapterProxy) revertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
- logger.Debugw("revertImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
+// RevertImageUpdate invokes revert image update rpc
+func (ap *AdapterProxy) RevertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
+ logger.Debugw("RevertImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
rpc := "revert_image_update"
toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
if err != nil {
@@ -275,8 +275,8 @@
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
-func (ap *AdapterProxy) packetOut(ctx context.Context, deviceType string, deviceID string, outPort uint32, packet *openflow_13.OfpPacketOut) (chan *kafka.RpcResponse, error) {
- logger.Debugw("packetOut", log.Fields{"device-id": deviceID, "device-type": deviceType, "out-port": outPort})
+func (ap *AdapterProxy) PacketOut(ctx context.Context, deviceType string, deviceID string, outPort uint32, packet *openflow_13.OfpPacketOut) (chan *kafka.RpcResponse, error) {
+ logger.Debugw("PacketOut", log.Fields{"device-id": deviceID, "device-type": deviceType, "out-port": outPort})
toTopic, err := ap.getAdapterTopic(deviceID, deviceType)
if err != nil {
return nil, err
@@ -291,9 +291,9 @@
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, deviceID, args...)
}
-// updateFlowsBulk invokes update flows bulk rpc
-func (ap *AdapterProxy) updateFlowsBulk(ctx context.Context, device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
- logger.Debugw("updateFlowsBulk", log.Fields{"device-id": device.Id, "flow-count": len(flows.Items), "group-count": len(groups.Items), "flow-metadata": flowMetadata})
+// UpdateFlowsBulk invokes update flows bulk rpc
+func (ap *AdapterProxy) UpdateFlowsBulk(ctx context.Context, device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
+ logger.Debugw("UpdateFlowsBulk", log.Fields{"device-id": device.Id, "flow-count": len(flows.Items), "group-count": len(groups.Items), "flow-metadata": flowMetadata})
toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
if err != nil {
return nil, err
@@ -309,9 +309,9 @@
return ap.sendRPC(context.TODO(), rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
-// updateFlowsIncremental invokes update flows incremental rpc
-func (ap *AdapterProxy) updateFlowsIncremental(ctx context.Context, device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
- logger.Debugw("updateFlowsIncremental",
+// UpdateFlowsIncremental invokes update flows incremental rpc
+func (ap *AdapterProxy) UpdateFlowsIncremental(ctx context.Context, device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
+ logger.Debugw("UpdateFlowsIncremental",
log.Fields{
"device-id": device.Id,
"flow-to-add-count": len(flowChanges.ToAdd.Items),
@@ -335,9 +335,9 @@
return ap.sendRPC(context.TODO(), rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
-// updatePmConfigs invokes update pm configs rpc
-func (ap *AdapterProxy) updatePmConfigs(ctx context.Context, device *voltha.Device, pmConfigs *voltha.PmConfigs) (chan *kafka.RpcResponse, error) {
- logger.Debugw("updatePmConfigs", log.Fields{"device-id": device.Id, "pm-configs-id": pmConfigs.Id})
+// UpdatePmConfigs invokes update pm configs rpc
+func (ap *AdapterProxy) UpdatePmConfigs(ctx context.Context, device *voltha.Device, pmConfigs *voltha.PmConfigs) (chan *kafka.RpcResponse, error) {
+ logger.Debugw("UpdatePmConfigs", log.Fields{"device-id": device.Id, "pm-configs-id": pmConfigs.Id})
toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
if err != nil {
return nil, err
@@ -351,9 +351,9 @@
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
-// simulateAlarm invokes simulate alarm rpc
-func (ap *AdapterProxy) simulateAlarm(ctx context.Context, device *voltha.Device, simulateReq *voltha.SimulateAlarmRequest) (chan *kafka.RpcResponse, error) {
- logger.Debugw("simulateAlarm", log.Fields{"device-id": device.Id, "simulate-req-id": simulateReq.Id})
+// SimulateAlarm invokes simulate alarm rpc
+func (ap *AdapterProxy) SimulateAlarm(ctx context.Context, device *voltha.Device, simulateReq *voltha.SimulateAlarmRequest) (chan *kafka.RpcResponse, error) {
+ logger.Debugw("SimulateAlarm", log.Fields{"device-id": device.Id, "simulate-req-id": simulateReq.Id})
rpc := "simulate_alarm"
toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
if err != nil {
@@ -368,8 +368,8 @@
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
-func (ap *AdapterProxy) disablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
- logger.Debugw("disablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
+func (ap *AdapterProxy) DisablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
+ logger.Debugw("DisablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
rpc := "disable_port"
toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
if err != nil {
@@ -383,8 +383,8 @@
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
-func (ap *AdapterProxy) enablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
- logger.Debugw("enablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
+func (ap *AdapterProxy) EnablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
+ logger.Debugw("EnablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
rpc := "enable_port"
toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
if err != nil {
@@ -398,9 +398,9 @@
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
-// childDeviceLost invokes child device_lost rpc
-func (ap *AdapterProxy) childDeviceLost(ctx context.Context, deviceType string, deviceID string, pPortNo uint32, onuID uint32) (chan *kafka.RpcResponse, error) {
- logger.Debugw("childDeviceLost", log.Fields{"device-id": deviceID, "parent-port-no": pPortNo, "onu-id": onuID})
+// ChildDeviceLost invokes child device_lost rpc
+func (ap *AdapterProxy) ChildDeviceLost(ctx context.Context, deviceType string, deviceID string, pPortNo uint32, onuID uint32) (chan *kafka.RpcResponse, error) {
+ logger.Debugw("ChildDeviceLost", log.Fields{"device-id": deviceID, "parent-port-no": pPortNo, "onu-id": onuID})
rpc := "child_device_lost"
toTopic, err := ap.getAdapterTopic(deviceID, deviceType)
if err != nil {
@@ -415,7 +415,7 @@
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, deviceID, args...)
}
-func (ap *AdapterProxy) startOmciTest(ctx context.Context, device *voltha.Device, omcitestrequest *voltha.OmciTestRequest) (chan *kafka.RpcResponse, error) {
+func (ap *AdapterProxy) StartOmciTest(ctx context.Context, device *voltha.Device, omcitestrequest *voltha.OmciTestRequest) (chan *kafka.RpcResponse, error) {
logger.Debugw("Omci_test_Request_adapter_proxy", log.Fields{"device": device, "omciTestRequest": omcitestrequest})
rpc := "start_omci_test"
toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
diff --git a/rw_core/core/adapter_proxy_test.go b/rw_core/core/device/remote/adapter_proxy_test.go
similarity index 94%
rename from rw_core/core/adapter_proxy_test.go
rename to rw_core/core/device/remote/adapter_proxy_test.go
index 718cc30..17627dc 100755
--- a/rw_core/core/adapter_proxy_test.go
+++ b/rw_core/core/device/remote/adapter_proxy_test.go
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package core
+package remote
import (
"context"
@@ -121,12 +121,12 @@
type simpleRequest func(context.Context, *voltha.Device) (chan *kafka.RpcResponse, error)
ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
simpleRequests := []simpleRequest{
- ap.adoptDevice,
- ap.disableDevice,
- ap.rebootDevice,
- ap.deleteDevice,
- ap.reconcileDevice,
- ap.reEnableDevice,
+ ap.AdoptDevice,
+ ap.DisableDevice,
+ ap.RebootDevice,
+ ap.DeleteDevice,
+ ap.ReconcileDevice,
+ ap.ReEnableDevice,
}
for _, f := range simpleRequests {
// Success
@@ -166,7 +166,7 @@
d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
- rpcResponse, err := ap.getOfpDeviceInfo(ctx, d)
+ rpcResponse, err := ap.GetOfpDeviceInfo(ctx, d)
assert.Nil(t, err)
response, err := waitForResponse(ctx, rpcResponse)
assert.Nil(t, err)
@@ -184,7 +184,7 @@
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
portNo := uint32(1)
- rpcResponse, err := ap.getOfpPortInfo(ctx, d, portNo)
+ rpcResponse, err := ap.GetOfpPortInfo(ctx, d, portNo)
assert.Nil(t, err)
response, err := waitForResponse(ctx, rpcResponse)
assert.Nil(t, err)
@@ -204,7 +204,7 @@
assert.Nil(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
- rpcResponse, err := ap.packetOut(ctx, adapterName, d.Id, outPort, &of.OfpPacketOut{Data: packet})
+ rpcResponse, err := ap.PacketOut(ctx, adapterName, d.Id, outPort, &of.OfpPacketOut{Data: packet})
assert.Nil(t, err)
_, err = waitForResponse(ctx, rpcResponse)
assert.Nil(t, err)
@@ -213,13 +213,13 @@
func testFlowUpdates(t *testing.T) {
ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
- _, err := ap.updateFlowsBulk(context.Background(), d, &voltha.Flows{}, &voltha.FlowGroups{}, &voltha.FlowMetadata{})
+ _, err := ap.UpdateFlowsBulk(context.Background(), d, &voltha.Flows{}, &voltha.FlowGroups{}, &voltha.FlowMetadata{})
assert.Nil(t, err)
flowChanges := &voltha.FlowChanges{ToAdd: &voltha.Flows{Items: nil}, ToRemove: &voltha.Flows{Items: nil}}
groupChanges := &voltha.FlowGroupChanges{ToAdd: &voltha.FlowGroups{Items: nil}, ToRemove: &voltha.FlowGroups{Items: nil}, ToUpdate: &voltha.FlowGroups{Items: nil}}
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
- rpcResponse, err := ap.updateFlowsIncremental(ctx, d, flowChanges, groupChanges, &voltha.FlowMetadata{})
+ rpcResponse, err := ap.UpdateFlowsIncremental(ctx, d, flowChanges, groupChanges, &voltha.FlowMetadata{})
assert.Nil(t, err)
_, err = waitForResponse(ctx, rpcResponse)
assert.Nil(t, err)
@@ -230,7 +230,7 @@
d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
- rpcResponse, err := ap.updatePmConfigs(ctx, d, &voltha.PmConfigs{})
+ rpcResponse, err := ap.UpdatePmConfigs(ctx, d, &voltha.PmConfigs{})
assert.Nil(t, err)
_, err = waitForResponse(ctx, rpcResponse)
assert.Nil(t, err)
diff --git a/rw_core/core/device/remote/common.go b/rw_core/core/device/remote/common.go
new file mode 100644
index 0000000..7383bd9
--- /dev/null
+++ b/rw_core/core/device/remote/common.go
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package core Common Logger initialization
+package remote
+
+import (
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+)
+
+var logger log.Logger
+
+func init() {
+ // Setup this package so that it's log level can be modified at run time
+ var err error
+ logger, err = log.AddPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "remote"})
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/rw_core/core/device_state_transitions.go b/rw_core/core/device/state_transitions.go
similarity index 79%
rename from rw_core/core/device_state_transitions.go
rename to rw_core/core/device/state_transitions.go
index 1d98fdd..f7be154 100644
--- a/rw_core/core/device_state_transitions.go
+++ b/rw_core/core/device/state_transitions.go
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package core
+package device
import (
"context"
@@ -22,19 +22,19 @@
"github.com/opencord/voltha-protos/v3/go/voltha"
)
-// DeviceType mentions type of device like parent, child
-type DeviceType int32
+// deviceType mentions type of device like parent, child
+type deviceType int32
const (
- parent DeviceType = 0
- child DeviceType = 1
- any DeviceType = 2
+ parent deviceType = 0
+ child deviceType = 1
+ any deviceType = 2
)
-type MatchResult uint8
+type matchResult uint8
const (
- noMatch MatchResult = iota // current state has not match in the transition table
+ noMatch matchResult = iota // current state has not match in the transition table
currWildcardMatch // current state matches the wildcard *_UNKNOWN state in the transition table
currStateOnlyMatch // current state matches the current state and previous state matches the wildcard in the transition table
currPrevStateMatch // both current and previous states match in the transition table
@@ -42,7 +42,7 @@
// match is used to keep the current match states
type match struct {
- admin, oper, conn MatchResult
+ admin, oper, conn matchResult
}
// toInt returns an integer representing the matching level of the match (the larger the number the better)
@@ -60,8 +60,8 @@
return m.toInt() > newMatch.toInt()
}
-// DeviceState has admin, operational and connection status of device
-type DeviceState struct {
+// deviceState has admin, operational and connection status of device
+type deviceState struct {
Admin voltha.AdminState_Types
Connection voltha.ConnectStatus_Types
Operational voltha.OperStatus_Types
@@ -72,9 +72,9 @@
// Transition represent transition related attributes
type Transition struct {
- deviceType DeviceType
- previousState DeviceState
- currentState DeviceState
+ deviceType deviceType
+ previousState deviceState
+ currentState deviceState
handlers []TransitionHandler
}
@@ -93,157 +93,157 @@
transitionMap.transitions,
Transition{
deviceType: parent,
- previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVATING},
- currentState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVE},
+ previousState: deviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVATING},
+ currentState: deviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVE},
handlers: []TransitionHandler{dMgr.CreateLogicalDevice}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: child,
- previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_DISCOVERED},
- currentState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVATING},
+ previousState: deviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_DISCOVERED},
+ currentState: deviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVATING},
handlers: []TransitionHandler{}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: child,
- previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_DISCOVERED},
- currentState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVE},
+ previousState: deviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_DISCOVERED},
+ currentState: deviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVE},
handlers: []TransitionHandler{dMgr.SetupUNILogicalPorts}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: child,
- previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVATING},
- currentState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_DISCOVERED},
+ previousState: deviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVATING},
+ currentState: deviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_DISCOVERED},
handlers: []TransitionHandler{}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: child,
- previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVATING},
- currentState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVE},
+ previousState: deviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVATING},
+ currentState: deviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVE},
handlers: []TransitionHandler{dMgr.SetupUNILogicalPorts}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
- previousState: DeviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- currentState: DeviceState{Admin: voltha.AdminState_DELETED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+ previousState: deviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+ currentState: deviceState{Admin: voltha.AdminState_DELETED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
handlers: []TransitionHandler{dMgr.RunPostDeviceDelete}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: parent,
- previousState: DeviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- currentState: DeviceState{Admin: voltha.AdminState_DELETED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+ previousState: deviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+ currentState: deviceState{Admin: voltha.AdminState_DELETED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
handlers: []TransitionHandler{dMgr.DisableAllChildDevices, dMgr.DeleteAllUNILogicalPorts, dMgr.DeleteAllChildDevices, dMgr.DeleteAllLogicalPorts, dMgr.DeleteLogicalDevice, dMgr.RunPostDeviceDelete}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: parent,
- previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_REACHABLE, Operational: voltha.OperStatus_ACTIVE},
- currentState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNREACHABLE, Operational: voltha.OperStatus_UNKNOWN},
+ previousState: deviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_REACHABLE, Operational: voltha.OperStatus_ACTIVE},
+ currentState: deviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNREACHABLE, Operational: voltha.OperStatus_UNKNOWN},
handlers: []TransitionHandler{dMgr.DeleteAllLogicalPorts, dMgr.DeleteLogicalDevice, dMgr.DeleteAllChildDevices, dMgr.DeleteAllDeviceFlows}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: parent,
- previousState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_REACHABLE, Operational: voltha.OperStatus_UNKNOWN},
- currentState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNREACHABLE, Operational: voltha.OperStatus_UNKNOWN},
+ previousState: deviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_REACHABLE, Operational: voltha.OperStatus_UNKNOWN},
+ currentState: deviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNREACHABLE, Operational: voltha.OperStatus_UNKNOWN},
handlers: []TransitionHandler{dMgr.DeleteAllLogicalPorts, dMgr.DeleteLogicalDevice, dMgr.DeleteAllChildDevices, dMgr.DeleteAllDeviceFlows}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: parent,
- previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNREACHABLE, Operational: voltha.OperStatus_UNKNOWN},
- currentState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_REACHABLE, Operational: voltha.OperStatus_ACTIVE},
+ previousState: deviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNREACHABLE, Operational: voltha.OperStatus_UNKNOWN},
+ currentState: deviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_REACHABLE, Operational: voltha.OperStatus_ACTIVE},
handlers: []TransitionHandler{dMgr.CreateLogicalDevice}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: parent,
- previousState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNREACHABLE, Operational: voltha.OperStatus_UNKNOWN},
- currentState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_REACHABLE, Operational: voltha.OperStatus_UNKNOWN},
+ previousState: deviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNREACHABLE, Operational: voltha.OperStatus_UNKNOWN},
+ currentState: deviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_REACHABLE, Operational: voltha.OperStatus_UNKNOWN},
handlers: []TransitionHandler{dMgr.CreateLogicalDevice}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: child,
- previousState: DeviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- currentState: DeviceState{Admin: voltha.AdminState_DELETED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+ previousState: deviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+ currentState: deviceState{Admin: voltha.AdminState_DELETED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
handlers: []TransitionHandler{dMgr.ChildDeviceLost, dMgr.DeleteLogicalPorts, dMgr.RunPostDeviceDelete}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: child,
- previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- currentState: DeviceState{Admin: voltha.AdminState_DELETED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+ previousState: deviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+ currentState: deviceState{Admin: voltha.AdminState_DELETED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
handlers: []TransitionHandler{dMgr.ChildDeviceLost, dMgr.DeleteLogicalPorts, dMgr.RunPostDeviceDelete}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
- previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVE},
- currentState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVATING},
+ previousState: deviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVE},
+ currentState: deviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVATING},
handlers: []TransitionHandler{dMgr.NotifyInvalidTransition}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
- previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVATING},
- currentState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+ previousState: deviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVATING},
+ currentState: deviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
handlers: []TransitionHandler{dMgr.NotifyInvalidTransition}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
- previousState: DeviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- currentState: DeviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+ previousState: deviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+ currentState: deviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
handlers: []TransitionHandler{dMgr.NotifyInvalidTransition}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
- previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- currentState: DeviceState{Admin: voltha.AdminState_DOWNLOADING_IMAGE, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+ previousState: deviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+ currentState: deviceState{Admin: voltha.AdminState_DOWNLOADING_IMAGE, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
handlers: []TransitionHandler{dMgr.NotifyInvalidTransition}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
- previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- currentState: DeviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+ previousState: deviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+ currentState: deviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
handlers: []TransitionHandler{dMgr.NotifyInvalidTransition}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: parent,
- previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVE},
- currentState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVATING},
+ previousState: deviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVE},
+ currentState: deviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVATING},
handlers: []TransitionHandler{dMgr.NotifyInvalidTransition}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
- previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVATING},
- currentState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+ previousState: deviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVATING},
+ currentState: deviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
handlers: []TransitionHandler{dMgr.NotifyInvalidTransition}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
- previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- currentState: DeviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+ previousState: deviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+ currentState: deviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
handlers: []TransitionHandler{dMgr.NotifyInvalidTransition}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: child,
- previousState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- currentState: DeviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+ previousState: deviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+ currentState: deviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
handlers: []TransitionHandler{dMgr.NotifyInvalidTransition}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
- previousState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- currentState: DeviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+ previousState: deviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+ currentState: deviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
handlers: []TransitionHandler{dMgr.NotifyInvalidTransition}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
- previousState: DeviceState{Admin: voltha.AdminState_DOWNLOADING_IMAGE, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- currentState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+ previousState: deviceState{Admin: voltha.AdminState_DOWNLOADING_IMAGE, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+ currentState: deviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
handlers: []TransitionHandler{dMgr.NotifyInvalidTransition}})
return &transitionMap
}
-func getDeviceStates(device *voltha.Device) *DeviceState {
- return &DeviceState{Admin: device.AdminState, Connection: device.ConnectStatus, Operational: device.OperStatus}
+func getDeviceStates(device *voltha.Device) *deviceState {
+ return &deviceState{Admin: device.AdminState, Connection: device.ConnectStatus, Operational: device.OperStatus}
}
// isMatched matches a state transition. It returns whether there is a match and if there is whether it is an exact match
-func getHandler(previous *DeviceState, current *DeviceState, transition *Transition) ([]TransitionHandler, *match) {
+func getHandler(previous *deviceState, current *deviceState, transition *Transition) ([]TransitionHandler, *match) {
m := &match{}
// Do we have an exact match?
if *previous == transition.previousState && *current == transition.currentState {
@@ -297,7 +297,7 @@
}
// GetTransitionHandler returns transition handler & a flag that's set if the transition is invalid
-func (tMap *TransitionMap) GetTransitionHandler(device *voltha.Device, pState *DeviceState) []TransitionHandler {
+func (tMap *TransitionMap) GetTransitionHandler(device *voltha.Device, pState *deviceState) []TransitionHandler {
//1. Get the previous and current set of states
cState := getDeviceStates(device)
@@ -306,7 +306,7 @@
return nil
}
- //logger.Infow("DeviceType", log.Fields{"device": pDevice})
+ //logger.Infow("deviceType", log.Fields{"device": pDevice})
deviceType := parent
if !device.Root {
logger.Info("device is child")
diff --git a/rw_core/core/device_state_transitions_test.go b/rw_core/core/device/state_transitions_test.go
similarity index 98%
rename from rw_core/core/device_state_transitions_test.go
rename to rw_core/core/device/state_transitions_test.go
index ec93d90..0a1b43b 100644
--- a/rw_core/core/device_state_transitions_test.go
+++ b/rw_core/core/device/state_transitions_test.go
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package core
+package device
import (
"fmt"
@@ -52,21 +52,21 @@
}
}
-func getDeviceState(admin voltha.AdminState_Types, conn voltha.ConnectStatus_Types, oper voltha.OperStatus_Types) *DeviceState {
- return &DeviceState{
+func getDeviceState(admin voltha.AdminState_Types, conn voltha.ConnectStatus_Types, oper voltha.OperStatus_Types) *deviceState {
+ return &deviceState{
Admin: admin,
Connection: conn,
Operational: oper,
}
}
-func assertInvalidTransition(t *testing.T, device *voltha.Device, previousState *DeviceState) {
+func assertInvalidTransition(t *testing.T, device *voltha.Device, previousState *deviceState) {
handlers := transitionMap.GetTransitionHandler(device, previousState)
assert.Equal(t, 1, len(handlers))
assert.True(t, reflect.ValueOf(tdm.NotifyInvalidTransition).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
}
-func assertNoOpTransition(t *testing.T, device *voltha.Device, previousState *DeviceState) {
+func assertNoOpTransition(t *testing.T, device *voltha.Device, previousState *deviceState) {
handlers := transitionMap.GetTransitionHandler(device, previousState)
assert.Equal(t, 0, len(handlers))
}
@@ -213,12 +213,12 @@
assert.True(t, reflect.ValueOf(tdm.CreateLogicalDevice).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
var deleteDeviceTest = struct {
- previousStates []*DeviceState
+ previousStates []*deviceState
devices []*voltha.Device
expectedParentHandlers []TransitionHandler
expectedChildHandlers []TransitionHandler
}{
- previousStates: []*DeviceState{
+ previousStates: []*deviceState{
getDeviceState(voltha.AdminState_DISABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_FAILED),
getDeviceState(voltha.AdminState_UNKNOWN, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_UNKNOWN),
getDeviceState(voltha.AdminState_DOWNLOADING_IMAGE, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_UNKNOWN),
diff --git a/rw_core/core/id.go b/rw_core/core/id.go
deleted file mode 100644
index cb87377..0000000
--- a/rw_core/core/id.go
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package core
-
-import (
- "errors"
- "fmt"
- "math/rand"
- "strconv"
-
- "github.com/google/uuid"
-)
-
-// CreateDeviceID produces a device ID. The device ID is a UUID
-func CreateDeviceID() string {
- return uuid.New().String()
-}
-
-// CreateLogicalDeviceID produces a logical device ID. The logical device ID is a UUID
-func CreateLogicalDeviceID() string {
- return uuid.New().String()
-}
-
-// CreateLogicalPortID produces a random port ID for a logical device.
-func CreateLogicalPortID() uint32 {
- // A logical port is a uint32
- return rand.Uint32()
-}
-
-// CreateDataPathID creates uint64 pathid from string pathid
-func CreateDataPathID(idInHexString string) (uint64, error) {
- if idInHexString == "" {
- return 0, errors.New("id-empty")
- }
- // First prepend 0x to the string
- newID := fmt.Sprintf("0x%s", idInHexString)
- d, err := strconv.ParseUint(newID, 0, 64)
- if err != nil {
- return 0, err
- }
- return d, nil
-}