[VOL-1036] Initial implementation of device lifecycle management
Change-Id: I5aa58fdcbcd852f6f5eef35d48f25f76e20c0418
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index a0e25f3..bccb227 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -18,6 +18,7 @@
import (
"context"
"github.com/golang/protobuf/ptypes"
+ a "github.com/golang/protobuf/ptypes/any"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/kafka"
ca "github.com/opencord/voltha-go/protos/core_adapter"
@@ -37,28 +38,47 @@
return &proxy
}
+func unPackResponse(rpc string, deviceId string, success bool, response *a.Any) error {
+ if success {
+ return nil
+ } else {
+ unpackResult := &ca.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
+ log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ log.Debugw("response", log.Fields{"rpc": rpc, "deviceId": deviceId, "success": success, "error": err})
+ // TODO: Need to get the real error code
+ return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
+ }
+}
+
func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) error {
log.Debugw("AdoptDevice", log.Fields{"device": device})
+ rpc := "adopt_device"
topic := kafka.Topic{Name: device.Type}
args := make([]*kafka.KVArg, 1)
args[0] = &kafka.KVArg{
Key: "device",
Value: device,
}
- success, result := ap.kafkaProxy.InvokeRPC(ctx, "adopt_device", &topic, true, args...)
- log.Debugw("AdoptDevice-response", log.Fields{"deviceid": device.Id, "success": success, "result": result})
- if success {
- return nil
- } else {
- unpackResult := &ca.Error{}
- var err error
- if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
- log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
- }
- log.Debugw("AdoptDevice-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
- // TODO: Need to get the real error code
- return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
+ success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
+ log.Debugw("AdoptDevice-response", log.Fields{"deviceid": device.Id, "success": success})
+ return unPackResponse(rpc, device.Id, success, result)
+}
+
+func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) error {
+ log.Debugw("DisableDevice", log.Fields{"deviceId": device.Id})
+ rpc := "disable_device"
+ topic := kafka.Topic{Name: device.Type}
+ args := make([]*kafka.KVArg, 1)
+ args[0] = &kafka.KVArg{
+ Key: "device",
+ Value: device,
}
+ success, result := ap.kafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+ log.Debugw("DisableDevice-response", log.Fields{"deviceId": device.Id, "success": success})
+ return unPackResponse(rpc, device.Id, success, result)
}
func (ap *AdapterProxy) AdapterDescriptor() (*voltha.Adapter, error) {
@@ -76,7 +96,7 @@
return nil, nil
}
-func (ap *AdapterProxy) ReconcileDevice(device voltha.Device) error {
+func (ap *AdapterProxy) ReconcileDevice(device *voltha.Device) error {
log.Debug("ReconcileDevice")
return nil
}
@@ -86,22 +106,26 @@
return nil
}
-func (ap *AdapterProxy) DisableDevice(device voltha.Device) error {
- log.Debug("DisableDevice")
- return nil
+func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) error {
+ log.Debugw("ReEnableDevice", log.Fields{"deviceId": device.Id})
+ rpc := "reenable_device"
+ topic := kafka.Topic{Name: device.Type}
+ args := make([]*kafka.KVArg, 1)
+ args[0] = &kafka.KVArg{
+ Key: "device",
+ Value: device,
+ }
+ success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
+ log.Debugw("ReEnableDevice-response", log.Fields{"deviceid": device.Id, "success": success})
+ return unPackResponse(rpc, device.Id, success, result)
}
-func (ap *AdapterProxy) ReEnableDevice(device voltha.Device) error {
- log.Debug("ReEnableDevice")
- return nil
-}
-
-func (ap *AdapterProxy) RebootDevice(device voltha.Device) error {
+func (ap *AdapterProxy) RebootDevice(device *voltha.Device) error {
log.Debug("RebootDevice")
return nil
}
-func (ap *AdapterProxy) DeleteDevice(device voltha.Device) error {
+func (ap *AdapterProxy) DeleteDevice(device *voltha.Device) error {
log.Debug("DeleteDevice")
return nil
}
@@ -172,7 +196,7 @@
}
func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ca.SwitchCapability, error) {
- log.Debugw("GetOfpDeviceInfo", log.Fields{"device": device})
+ log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
topic := kafka.Topic{Name: device.Type}
args := make([]*kafka.KVArg, 1)
args[0] = &kafka.KVArg{
@@ -180,7 +204,7 @@
Value: device,
}
success, result := ap.kafkaProxy.InvokeRPC(ctx, "get_ofp_device_info", &topic, true, args...)
- log.Debugw("GetOfpDeviceInfo-response", log.Fields{"device": device, "success": success, "result": result})
+ log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
if success {
unpackResult := &ca.SwitchCapability{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
@@ -201,7 +225,7 @@
}
func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ca.PortCapability, error) {
- log.Debug("GetOfpPortInfo", log.Fields{"device": device})
+ log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
topic := kafka.Topic{Name: device.Type}
args := make([]*kafka.KVArg, 2)
args[0] = &kafka.KVArg{
@@ -215,7 +239,7 @@
}
success, result := ap.kafkaProxy.InvokeRPC(ctx, "get_ofp_port_info", &topic, true, args...)
- log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "device": device, "success": success, "result": result})
+ log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
if success {
unpackResult := &ca.PortCapability{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index bfc4ee4..7ae9f1a 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -25,6 +25,7 @@
"github.com/opencord/voltha-go/protos/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ "reflect"
)
type AdapterRequestHandlerProxy struct {
@@ -89,6 +90,24 @@
}
}
+// updatePartialDeviceData updates a subset of a device that an Adapter can update.
+// TODO: May need a specific proto to handle only a subset of a device that can be changed by an adapter
+func (rhp *AdapterRequestHandlerProxy) mergeDeviceInfoFromAdapter(device *voltha.Device) (*voltha.Device, error) {
+ // First retrieve the most up to date device info
+ var currentDevice *voltha.Device
+ var err error
+ if currentDevice, err = rhp.deviceMgr.getDevice(device.Id); err != nil {
+ return nil, err
+ }
+ cloned := reflect.ValueOf(currentDevice).Elem().Interface().(voltha.Device)
+ cloned.Root = device.Root
+ cloned.Vendor = device.Vendor
+ cloned.Model = device.Model
+ cloned.SerialNumber = device.SerialNumber
+ cloned.MacAddress = device.MacAddress
+ return &cloned, nil
+}
+
func (rhp *AdapterRequestHandlerProxy) DeviceUpdate(args []*ca.Argument) (*empty.Empty, error) {
if len(args) != 1 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
@@ -100,15 +119,21 @@
log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
return nil, err
}
- log.Debugw("DeviceUpdate", log.Fields{"device": device})
+ log.Debugw("DeviceUpdate", log.Fields{"deviceId": device.Id})
if rhp.TestMode { // Execute only for test cases
return new(empty.Empty), nil
}
- if err := rhp.deviceMgr.updateDevice(device); err != nil {
- log.Debugw("DeviceUpdate-error", log.Fields{"device": device, "error": err})
+
+ //Merge the adapter device info (only the ones an adapter can change) with the latest device data
+ if updatedDevice, err := rhp.mergeDeviceInfoFromAdapter(device); err != nil {
return nil, status.Errorf(codes.Internal, "%s", err.Error())
+ } else {
+ // An adapter request needs an Ack without having to wait for the update to be
+ // completed. We therefore run the update in its own routine.
+ go rhp.deviceMgr.updateDevice(updatedDevice)
}
+
return new(empty.Empty), nil
}
@@ -137,28 +162,30 @@
err := errors.New("invalid-number-of-args")
return nil, err
}
- pID := &voltha.ID{}
- if err := ptypes.UnmarshalAny(args[0].Value, pID); err != nil {
- log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
- return nil, err
- }
- // Porttype is an enum sent as an integer proto
+ deviceId := &voltha.ID{}
pt := &ca.IntType{}
- if err := ptypes.UnmarshalAny(args[1].Value, pt); err != nil {
- log.Warnw("cannot-unmarshal-porttype", log.Fields{"error": err})
- return nil, err
+ for _, arg := range args {
+ switch arg.Key {
+ case "device_id":
+ if err := ptypes.UnmarshalAny(arg.Value, deviceId); err != nil {
+ log.Warnw("cannot-unmarshal-device-id", log.Fields{"error": err})
+ return nil, err
+ }
+ case "port_type":
+ if err := ptypes.UnmarshalAny(arg.Value, pt); err != nil {
+ log.Warnw("cannot-unmarshal-porttype", log.Fields{"error": err})
+ return nil, err
+ }
+ }
}
-
- log.Debugw("GetPorts", log.Fields{"deviceID": pID.Id, "portype": pt.Val})
-
+ log.Debugw("GetPorts", log.Fields{"deviceID": deviceId.Id, "portype": pt.Val})
if rhp.TestMode { // Execute only for test cases
aPort := &voltha.Port{Label: "test_port"}
allPorts := &voltha.Ports{}
allPorts.Items = append(allPorts.Items, aPort)
return allPorts, nil
}
- return nil, nil
-
+ return rhp.deviceMgr.getPorts(nil, deviceId.Id, voltha.Port_PortType(pt.Val))
}
func (rhp *AdapterRequestHandlerProxy) GetChildDevices(args []*ca.Argument) (*voltha.Device, error) {
@@ -254,29 +281,69 @@
log.Warnw("cannot-unmarshal-operStatus", log.Fields{"error": err})
return nil, err
}
- if operStatus.Val == -1 {
- operStatus = nil
- }
+ //if operStatus.Val == -1 {
+ // operStatus = nil
+ //}
case "connect_status":
if err := ptypes.UnmarshalAny(arg.Value, connStatus); err != nil {
log.Warnw("cannot-unmarshal-connStatus", log.Fields{"error": err})
return nil, err
}
- if connStatus.Val == -1 {
- connStatus = nil
- }
+ //if connStatus.Val == -1 {
+ // connStatus = nil
+ //}
}
}
-
log.Debugw("DeviceStateUpdate", log.Fields{"deviceId": deviceId.Id, "oper-status": operStatus, "conn-status": connStatus})
-
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
- if err := rhp.deviceMgr.updateDeviceState(deviceId.Id, operStatus, connStatus); err != nil {
- log.Debugw("DeviceUpdate-error", log.Fields{"deviceId": deviceId.Id, "error": err})
- return nil, status.Errorf(codes.Internal, "%s", err.Error())
+
+ // When the enum is not set (i.e. -1), Go still convert to the Enum type with the value being -1
+ go rhp.deviceMgr.updateDeviceStatus(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val), voltha.ConnectStatus_ConnectStatus(connStatus.Val))
+ return new(empty.Empty), nil
+}
+
+func (rhp *AdapterRequestHandlerProxy) PortStateUpdate(args []*ca.Argument) (*empty.Empty, error) {
+ if len(args) < 2 {
+ log.Warn("invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
}
+ deviceId := &voltha.ID{}
+ portType := &ca.IntType{}
+ portNo := &ca.IntType{}
+ operStatus := &ca.IntType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device_id":
+ if err := ptypes.UnmarshalAny(arg.Value, deviceId); err != nil {
+ log.Warnw("cannot-unmarshal-device-id", log.Fields{"error": err})
+ return nil, err
+ }
+ case "oper_status":
+ if err := ptypes.UnmarshalAny(arg.Value, operStatus); err != nil {
+ log.Warnw("cannot-unmarshal-operStatus", log.Fields{"error": err})
+ return nil, err
+ }
+ case "port_type":
+ if err := ptypes.UnmarshalAny(arg.Value, portType); err != nil {
+ log.Warnw("cannot-unmarshal-porttype", log.Fields{"error": err})
+ return nil, err
+ }
+ case "port_no":
+ if err := ptypes.UnmarshalAny(arg.Value, portNo); err != nil {
+ log.Warnw("cannot-unmarshal-portno", log.Fields{"error": err})
+ return nil, err
+ }
+
+ }
+ }
+ log.Debugw("PortStateUpdate", log.Fields{"deviceId": deviceId.Id, "operStatus": operStatus, "portType": portType, "portNo": portNo})
+ if rhp.TestMode { // Execute only for test cases
+ return nil, nil
+ }
+ go rhp.deviceMgr.updatePortState(deviceId.Id, voltha.Port_PortType(portType.Val), uint32(portNo.Val), voltha.OperStatus_OperStatus(operStatus.Val))
return new(empty.Empty), nil
}
@@ -309,10 +376,14 @@
return nil, nil
}
- if err := rhp.deviceMgr.addPort(deviceId.Id, port); err != nil {
- log.Debugw("addport-error", log.Fields{"deviceId": deviceId.Id, "error": err})
- return nil, status.Errorf(codes.Internal, "%s", err.Error())
- }
+ // Run port creation in its own go routine
+ go rhp.deviceMgr.addPort(deviceId.Id, port)
+
+ //if err := rhp.deviceMgr.addPort(deviceId.Id, port); err != nil {
+ // log.Debugw("addport-error", log.Fields{"deviceId": deviceId.Id, "error": err})
+ // return nil, status.Errorf(codes.Internal, "%s", err.Error())
+ //}
+ // Return an Ack
return new(empty.Empty), nil
}
@@ -346,10 +417,14 @@
return nil, nil
}
- if err := rhp.deviceMgr.updatePmConfigs(pmConfigs.Id, pmConfigs); err != nil {
- log.Debugw("update-pmconfigs-error", log.Fields{"deviceId": pmConfigs.Id, "error": err})
- return nil, status.Errorf(codes.Internal, "%s", err.Error())
- }
+ // Run PM config update in its own go routine
+ go rhp.deviceMgr.updatePmConfigs(pmConfigs.Id, pmConfigs)
+
+ //if err := rhp.deviceMgr.updatePmConfigs(pmConfigs.Id, pmConfigs); err != nil {
+ // log.Debugw("update-pmconfigs-error", log.Fields{"deviceId": pmConfigs.Id, "error": err})
+ // return nil, status.Errorf(codes.Internal, "%s", err.Error())
+ //}
+ // Return an Ack
return new(empty.Empty), nil
}
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 06f3ca3..480e32f 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -24,7 +24,6 @@
"github.com/opencord/voltha-go/protos/voltha"
"github.com/opencord/voltha-go/rw_core/config"
"google.golang.org/grpc"
- "reflect"
)
type Core struct {
@@ -35,8 +34,8 @@
grpcNBIAPIHanfler *APIHandler
config *config.RWCoreFlags
kmp *kafka.KafkaMessagingProxy
- clusterDataRoot *model.Root
- localDataRoot *model.Root
+ clusterDataRoot model.Root
+ localDataRoot model.Root
clusterDataProxy *model.Proxy
localDataProxy *model.Proxy
exitChannel chan int
@@ -52,10 +51,10 @@
core.exitChannel = make(chan int, 1)
core.config = cf
// TODO: Setup the KV store
- core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, nil, reflect.TypeOf(model.NonPersistedRevision{}))
- core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, nil, reflect.TypeOf(model.NonPersistedRevision{}))
- core.clusterDataProxy = core.clusterDataRoot.Node.GetProxy("/", false)
- core.localDataProxy = core.localDataRoot.Node.GetProxy("/", false)
+ core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, nil)
+ core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, nil)
+ core.clusterDataProxy = core.clusterDataRoot.GetProxy("/", false)
+ core.localDataProxy = core.localDataRoot.GetProxy("/", false)
return &core
}
@@ -79,7 +78,7 @@
log.Info("core-stopped")
}
-//startGRPCService creates the grpc service handler, registers it to the grpc server
+//startGRPCService creates the grpc service handlers, registers it to the grpc server
// and starts the server
func (core *Core) startGRPCService(ctx context.Context) {
// create an insecure gserver server
@@ -129,7 +128,7 @@
requestProxy := NewAdapterRequestHandlerProxy(dMgr, ldMgr, cdProxy, ldProxy)
core.kmp.SubscribeWithTarget(kafka.Topic{Name: core.config.CoreTopic}, requestProxy)
- log.Info("request-handler")
+ log.Info("request-handlers")
return nil
}
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 805dd21..d9dacbc 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -17,6 +17,9 @@
import (
"context"
+ "reflect"
+ "sync"
+
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/model"
@@ -24,7 +27,6 @@
"github.com/opencord/voltha-go/protos/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
- "reflect"
)
type DeviceAgent struct {
@@ -33,64 +35,160 @@
adapterProxy *AdapterProxy
deviceMgr *DeviceManager
clusterDataProxy *model.Proxy
+ deviceProxy *model.Proxy
exitChannel chan int
+ lockDevice sync.RWMutex
}
func newDeviceAgent(ap *AdapterProxy, device *voltha.Device, deviceMgr *DeviceManager, cdProxy *model.Proxy) *DeviceAgent {
var agent DeviceAgent
- device.Id = CreateDeviceId()
- agent.deviceId = device.Id
agent.adapterProxy = ap
- agent.lastData = device
+ cloned := (proto.Clone(device)).(*voltha.Device)
+ cloned.Id = CreateDeviceId()
+ cloned.AdminState = voltha.AdminState_PREPROVISIONED
+ agent.deviceId = cloned.Id
+ agent.lastData = cloned
agent.deviceMgr = deviceMgr
agent.exitChannel = make(chan int, 1)
agent.clusterDataProxy = cdProxy
+ agent.lockDevice = sync.RWMutex{}
return &agent
}
func (agent *DeviceAgent) start(ctx context.Context) {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
log.Debugw("starting-device-agent", log.Fields{"device": agent.lastData})
// Add the initial device to the local model
if added := agent.clusterDataProxy.Add("/devices", agent.lastData, ""); added == nil {
log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceId})
}
+ agent.deviceProxy = agent.clusterDataProxy.Root.GetProxy("/devices/"+agent.deviceId, false)
+ //agent.deviceProxy = agent.clusterDataProxy.Root.Node.GetProxy("/", false)
+ agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate, nil)
log.Debug("device-agent-started")
}
func (agent *DeviceAgent) Stop(ctx context.Context) {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
log.Debug("stopping-device-agent")
agent.exitChannel <- 1
log.Debug("device-agent-stopped")
}
+func (agent *DeviceAgent) getDevice() (*voltha.Device, error) {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
+ if d, ok := device.(*voltha.Device); ok {
+ cloned := proto.Clone(d).(*voltha.Device)
+ return cloned, nil
+ }
+ }
+ return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+}
+
+//getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
+// This function is meant so that we do not have duplicate code all over the device agent functions
+func (agent *DeviceAgent) getDeviceWithoutLock() (*voltha.Device, error) {
+ if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
+ if d, ok := device.(*voltha.Device); ok {
+ cloned := proto.Clone(d).(*voltha.Device)
+ return cloned, nil
+ }
+ }
+ return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+}
+
func (agent *DeviceAgent) enableDevice(ctx context.Context) error {
- log.Debugw("enableDevice", log.Fields{"id": agent.lastData.Id, "device": agent.lastData})
- // Update the device status
- if device, err := agent.deviceMgr.getDevice(agent.deviceId); err != nil {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ log.Debugw("enableDevice", log.Fields{"id": agent.deviceId})
+ if device, err := agent.getDeviceWithoutLock(); err != nil {
return status.Errorf(codes.NotFound, "%s", agent.deviceId)
} else {
- cloned := reflect.ValueOf(device).Elem().Interface().(voltha.Device)
- cloned.AdminState = voltha.AdminState_ENABLED
- cloned.OperStatus = voltha.OperStatus_ACTIVATING
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, &cloned, false, ""); afterUpdate == nil {
- return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
- } else {
- if err := agent.adapterProxy.AdoptDevice(ctx, &cloned); err != nil {
- log.Debugw("enableDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ if device.AdminState == voltha.AdminState_ENABLED {
+ log.Debugw("device-already-enabled", log.Fields{"id": agent.deviceId})
+ //TODO: Needs customized error message
+ return nil
+ }
+ // Verify whether we need to adopt the device the first time
+ // TODO: A state machine for these state transitions would be better (we just have to handle
+ // a limited set of states now or it may be an overkill)
+ if device.AdminState == voltha.AdminState_PREPROVISIONED {
+ // First send the request to an Adapter and wait for a response
+ if err := agent.adapterProxy.AdoptDevice(ctx, device); err != nil {
+ log.Debugw("adoptDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
return err
}
- agent.lastData = &cloned
+ } else {
+ // First send the request to an Adapter and wait for a response
+ if err := agent.adapterProxy.ReEnableDevice(ctx, device); err != nil {
+ log.Debugw("renableDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ return err
+ }
+ }
+ // Received an Ack (no error found above). Now update the device in the model to the expected state
+ cloned := proto.Clone(device).(*voltha.Device)
+ cloned.AdminState = voltha.AdminState_ENABLED
+ cloned.OperStatus = voltha.OperStatus_ACTIVATING
+ if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
}
}
return nil
}
-func (agent *DeviceAgent) getNNIPorts(ctx context.Context) *voltha.Ports {
- log.Debugw("getNNIPorts", log.Fields{"id": agent.deviceId})
+func (agent *DeviceAgent) disableDevice(ctx context.Context) error {
+ agent.lockDevice.Lock()
+ //defer agent.lockDevice.Unlock()
+ log.Debugw("disableDevice", log.Fields{"id": agent.deviceId})
+ // Get the most up to date the device info
+ if device, err := agent.getDeviceWithoutLock(); err != nil {
+ return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ if device.AdminState == voltha.AdminState_DISABLED {
+ log.Debugw("device-already-disabled", log.Fields{"id": agent.deviceId})
+ //TODO: Needs customized error message
+ agent.lockDevice.Unlock()
+ return nil
+ }
+ // First send the request to an Adapter and wait for a response
+ if err := agent.adapterProxy.DisableDevice(ctx, device); err != nil {
+ log.Debugw("disableDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ agent.lockDevice.Unlock()
+ return err
+ }
+ // Received an Ack (no error found above). Now update the device in the model to the expected state
+ cloned := proto.Clone(device).(*voltha.Device)
+ cloned.AdminState = voltha.AdminState_DISABLED
+ // Set the state of all ports on that device to disable
+ for _, port := range cloned.Ports {
+ port.AdminState = voltha.AdminState_DISABLED
+ port.OperStatus = voltha.OperStatus_UNKNOWN
+ }
+ if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ agent.lockDevice.Unlock()
+ return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
+ }
+ agent.lockDevice.Unlock()
+ //TODO: callback will be invoked to handle this state change
+ //For now force the state transition to happen
+ if err := agent.deviceMgr.processTransition(device, cloned); err != nil {
+ log.Warnw("process-transition-error", log.Fields{"deviceid": device.Id, "error": err})
+ return err
+ }
+ }
+ return nil
+}
+
+func (agent *DeviceAgent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
+ log.Debugw("getPorts", log.Fields{"id": agent.deviceId, "portType": portType})
ports := &voltha.Ports{}
if device, _ := agent.deviceMgr.getDevice(agent.deviceId); device != nil {
for _, port := range device.Ports {
- if port.Type == voltha.Port_ETHERNET_NNI {
+ if port.Type == portType {
ports.Items = append(ports.Items, port)
}
}
@@ -128,15 +226,25 @@
}
}
+func (agent *DeviceAgent) processUpdate(args ...interface{}) interface{} {
+ log.Debug("!!!!!!!!!!!!!!!!!!!!!!!!!")
+ log.Debugw("processUpdate", log.Fields{"deviceId": agent.deviceId, "args": args})
+ return nil
+}
+
func (agent *DeviceAgent) updateDevice(device *voltha.Device) error {
+ agent.lockDevice.Lock()
+ //defer agent.lockDevice.Unlock()
log.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
// Get the dev info from the model
- if storedData, err := agent.deviceMgr.getDevice(device.Id); err != nil {
+ if storedData, err := agent.getDeviceWithoutLock(); err != nil {
+ agent.lockDevice.Unlock()
return status.Errorf(codes.NotFound, "%s", device.Id)
} else {
// store the changed data
- cloned := (proto.Clone(device)).(*voltha.Device)
+ cloned := proto.Clone(device).(*voltha.Device)
afterUpdate := agent.clusterDataProxy.Update("/devices/"+device.Id, cloned, false, "")
+ agent.lockDevice.Unlock()
if afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", device.Id)
}
@@ -149,26 +257,77 @@
}
}
-func (agent *DeviceAgent) updateDeviceState(operState *core_adapter.IntType, connState *core_adapter.IntType) error {
+func (agent *DeviceAgent) updateDeviceStatus(operStatus voltha.OperStatus_OperStatus, connStatus voltha.ConnectStatus_ConnectStatus) error {
+ agent.lockDevice.Lock()
+ //defer agent.lockDevice.Unlock()
// Work only on latest data
- if storeDevice, err := agent.deviceMgr.getDevice(agent.deviceId); err != nil {
+ if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
+ agent.lockDevice.Unlock()
return status.Errorf(codes.NotFound, "%s", agent.deviceId)
} else {
// clone the device
- cloned := reflect.ValueOf(storeDevice).Elem().Interface().(voltha.Device)
- if operState != nil {
- cloned.OperStatus = voltha.OperStatus_OperStatus(operState.Val)
+ cloned := proto.Clone(storeDevice).(*voltha.Device)
+ // Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
+ if s, ok := voltha.ConnectStatus_ConnectStatus_value[connStatus.String()]; ok {
+ log.Debugw("updateDeviceStatus-conn", log.Fields{"ok": ok, "val": s})
+ cloned.ConnectStatus = connStatus
}
- if connState != nil {
- cloned.ConnectStatus = voltha.ConnectStatus_ConnectStatus(connState.Val)
+ if s, ok := voltha.OperStatus_OperStatus_value[operStatus.String()]; ok {
+ log.Debugw("updateDeviceStatus-oper", log.Fields{"ok": ok, "val": s})
+ cloned.OperStatus = operStatus
}
- log.Debugw("DeviceStateUpdate-device", log.Fields{"device": cloned})
+ log.Debugw("updateDeviceStatus", log.Fields{"deviceId": cloned.Id, "operStatus": cloned.OperStatus, "connectStatus": cloned.ConnectStatus})
// Store the device
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, &cloned, false, ""); afterUpdate == nil {
+ if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ agent.lockDevice.Unlock()
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
+ agent.lockDevice.Unlock()
// Perform the state transition
- if err := agent.deviceMgr.processTransition(storeDevice, &cloned); err != nil {
+ if err := agent.deviceMgr.processTransition(storeDevice, cloned); err != nil {
+ log.Warnw("process-transition-error", log.Fields{"deviceid": agent.deviceId, "error": err})
+ return err
+ }
+ return nil
+ }
+}
+
+func (agent *DeviceAgent) updatePortState(portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_OperStatus) error {
+ agent.lockDevice.Lock()
+ //defer agent.lockDevice.Unlock()
+ // Work only on latest data
+ // TODO: Get list of ports from device directly instead of the entire device
+ if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
+ agent.lockDevice.Unlock()
+ return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ // clone the device
+ cloned := proto.Clone(storeDevice).(*voltha.Device)
+ // Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
+ if _, ok := voltha.Port_PortType_value[portType.String()]; !ok {
+ agent.lockDevice.Unlock()
+ return status.Errorf(codes.InvalidArgument, "%s", portType)
+ }
+ for _, port := range cloned.Ports {
+ if port.Type == portType && port.PortNo == portNo {
+ port.OperStatus = operStatus
+ // Set the admin status to ENABLED if the operational status is ACTIVE
+ // TODO: Set by northbound system?
+ if operStatus == voltha.OperStatus_ACTIVE {
+ port.AdminState = voltha.AdminState_ENABLED
+ }
+ break
+ }
+ }
+ log.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
+ // Store the device
+ if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ agent.lockDevice.Unlock()
+ return status.Errorf(codes.Internal, "%s", agent.deviceId)
+ }
+ agent.lockDevice.Unlock()
+ // Perform the state transition
+ if err := agent.deviceMgr.processTransition(storeDevice, cloned); err != nil {
log.Warnw("process-transition-error", log.Fields{"deviceid": agent.deviceId, "error": err})
return err
}
@@ -177,17 +336,18 @@
}
func (agent *DeviceAgent) updatePmConfigs(pmConfigs *voltha.PmConfigs) error {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
log.Debug("updatePmConfigs")
// Work only on latest data
- if storeDevice, err := agent.deviceMgr.getDevice(agent.deviceId); err != nil {
+ if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
return status.Errorf(codes.NotFound, "%s", agent.deviceId)
} else {
// clone the device
- cloned := reflect.ValueOf(storeDevice).Elem().Interface().(voltha.Device)
- cp := proto.Clone(pmConfigs)
- cloned.PmConfigs = cp.(*voltha.PmConfigs)
+ cloned := proto.Clone(storeDevice).(*voltha.Device)
+ cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
// Store the device
- afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, &cloned, false, "")
+ afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
@@ -196,21 +356,57 @@
}
func (agent *DeviceAgent) addPort(port *voltha.Port) error {
- log.Debug("addPort")
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ log.Debugw("addPort", log.Fields{"deviceId": agent.deviceId})
// Work only on latest data
- if storeDevice, err := agent.deviceMgr.getDevice(agent.deviceId); err != nil {
+ if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
return status.Errorf(codes.NotFound, "%s", agent.deviceId)
} else {
// clone the device
- cloned := reflect.ValueOf(storeDevice).Elem().Interface().(voltha.Device)
+ cloned := proto.Clone(storeDevice).(*voltha.Device)
if cloned.Ports == nil {
// First port
+ log.Debugw("addPort-first-port-to-add", log.Fields{"deviceId": agent.deviceId})
cloned.Ports = make([]*voltha.Port, 0)
}
- cp := proto.Clone(port)
- cloned.Ports = append(cloned.Ports, cp.(*voltha.Port))
+ cp := proto.Clone(port).(*voltha.Port)
+ // Set the admin state of the port to ENABLE if the operational state is ACTIVE
+ // TODO: Set by northbound system?
+ if cp.OperStatus == voltha.OperStatus_ACTIVE {
+ cp.AdminState = voltha.AdminState_ENABLED
+ }
+ cloned.Ports = append(cloned.Ports, cp)
// Store the device
- afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, &cloned, false, "")
+ afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
+ if afterUpdate == nil {
+ return status.Errorf(codes.Internal, "%s", agent.deviceId)
+ }
+ return nil
+ }
+}
+
+func (agent *DeviceAgent) addPeerPort(port *voltha.Port_PeerPort) error {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ log.Debug("addPeerPort")
+ // Work only on latest data
+ if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
+ return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ // clone the device
+ cloned := proto.Clone(storeDevice).(*voltha.Device)
+ // Get the peer port on the device based on the port no
+ for _, peerPort := range cloned.Ports {
+ if peerPort.PortNo == port.PortNo { // found port
+ cp := proto.Clone(port).(*voltha.Port_PeerPort)
+ peerPort.Peers = append(peerPort.Peers, cp)
+ log.Debugw("found-peer", log.Fields{"portNo": port.PortNo, "deviceId": agent.deviceId})
+ break
+ }
+ }
+ // Store the device
+ afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
@@ -220,12 +416,14 @@
// TODO: A generic device update by attribute
func (agent *DeviceAgent) updateDeviceAttribute(name string, value interface{}) {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
if value == nil {
return
}
var storeDevice *voltha.Device
var err error
- if storeDevice, err = agent.deviceMgr.getDevice(agent.deviceId); err != nil {
+ if storeDevice, err = agent.getDeviceWithoutLock(); err != nil {
return
}
updated := false
@@ -247,10 +445,10 @@
}
}
}
- log.Debugw("update-field-status", log.Fields{"device": storeDevice, "name": name, "updated": updated})
+ log.Debugw("update-field-status", log.Fields{"deviceId": storeDevice.Id, "name": name, "updated": updated})
// Save the data
- cloned := reflect.ValueOf(storeDevice).Elem().Interface().(voltha.Device)
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, &cloned, false, ""); afterUpdate == nil {
+ cloned := proto.Clone(storeDevice).(*voltha.Device)
+ if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
log.Warnw("attribute-update-failed", log.Fields{"attribute": name, "value": value})
}
return
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index fd18c10..e3dbed2 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -18,6 +18,7 @@
import (
"context"
"errors"
+ "github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/kafka"
@@ -95,18 +96,18 @@
}
func (dMgr *DeviceManager) createDevice(ctx context.Context, device *voltha.Device, ch chan interface{}) {
- log.Debugw("createDevice-start", log.Fields{"device": device, "aproxy": dMgr.adapterProxy})
+ log.Debugw("createDevice", log.Fields{"device": device, "aproxy": dMgr.adapterProxy})
// Create and start a device agent for that device
agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy)
dMgr.addDeviceAgentToMap(agent)
agent.start(ctx)
- sendResponse(ctx, ch, nil)
+ sendResponse(ctx, ch, agent.lastData)
}
func (dMgr *DeviceManager) enableDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
- log.Debugw("enableDevice-start", log.Fields{"deviceid": id})
+ log.Debugw("enableDevice", log.Fields{"deviceid": id})
var res interface{}
if agent := dMgr.getDeviceAgent(id.Id); agent != nil {
@@ -119,33 +120,44 @@
sendResponse(ctx, ch, res)
}
-func (dMgr *DeviceManager) getDevice(id string) (*voltha.Device, error) {
- log.Debugw("getDevice-start", log.Fields{"deviceid": id})
+func (dMgr *DeviceManager) disableDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
+ log.Debugw("disableDevice", log.Fields{"deviceid": id})
- if device := dMgr.clusterDataProxy.Get("/devices/"+id, 1, false, ""); device == nil {
- return nil, status.Errorf(codes.NotFound, "%s", id)
+ var res interface{}
+ if agent := dMgr.getDeviceAgent(id.Id); agent != nil {
+ res = agent.disableDevice(ctx)
+ log.Debugw("disableDevice-result", log.Fields{"result": res})
} else {
- cloned := reflect.ValueOf(device).Elem().Interface().(voltha.Device)
- return &cloned, nil
+ res = status.Errorf(codes.NotFound, "%s", id.Id)
}
+
+ sendResponse(ctx, ch, res)
+}
+
+func (dMgr *DeviceManager) getDevice(id string) (*voltha.Device, error) {
+ log.Debugw("getDevice", log.Fields{"deviceid": id})
+ if agent := dMgr.getDeviceAgent(id); agent != nil {
+ return agent.getDevice()
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", id)
}
func (dMgr *DeviceManager) ListDevices() (*voltha.Devices, error) {
- log.Debug("ListDevices-start")
+ log.Debug("ListDevices")
result := &voltha.Devices{}
dMgr.lockDeviceAgentsMap.Lock()
defer dMgr.lockDeviceAgentsMap.Unlock()
for _, agent := range dMgr.deviceAgents {
- if device := dMgr.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
- cloned := reflect.ValueOf(device).Elem().Interface().(voltha.Device)
- result.Items = append(result.Items, &cloned)
+ if device, err := agent.getDevice(); err == nil {
+ cloned := proto.Clone(device).(*voltha.Device)
+ result.Items = append(result.Items, cloned)
}
}
return result, nil
}
func (dMgr *DeviceManager) updateDevice(device *voltha.Device) error {
- log.Debugw("updateDevice-start", log.Fields{"deviceid": device.Id, "device": device})
+ log.Debugw("updateDevice", log.Fields{"deviceid": device.Id, "device": device})
if agent := dMgr.getDeviceAgent(device.Id); agent != nil {
return agent.updateDevice(device)
@@ -155,9 +167,23 @@
func (dMgr *DeviceManager) addPort(deviceId string, port *voltha.Port) error {
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
- return agent.addPort(port)
+ if err := agent.addPort(port); err != nil {
+ return err
+ }
+ // Setup peer ports
+ meAsPeer := &voltha.Port_PeerPort{DeviceId: deviceId, PortNo: port.PortNo}
+ for _, peerPort := range port.Peers {
+ if agent := dMgr.getDeviceAgent(peerPort.DeviceId); agent != nil {
+ if err := agent.addPeerPort(meAsPeer); err != nil {
+ log.Errorw("failed-to-add-peer", log.Fields{"peer-device-id": peerPort.DeviceId})
+ return err
+ }
+ }
+ }
+ return nil
+ } else {
+ return status.Errorf(codes.NotFound, "%s", deviceId)
}
- return status.Errorf(codes.NotFound, "%s", deviceId)
}
func (dMgr *DeviceManager) updatePmConfigs(deviceId string, pmConfigs *voltha.PmConfigs) error {
@@ -168,7 +194,7 @@
}
func (dMgr *DeviceManager) getSwitchCapability(ctx context.Context, deviceId string) (*core_adapter.SwitchCapability, error) {
- log.Debugw("getSwitchCapability-start", log.Fields{"deviceid": deviceId})
+ log.Debugw("getSwitchCapability", log.Fields{"deviceid": deviceId})
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
return agent.getSwitchCapability(ctx)
@@ -176,17 +202,18 @@
return nil, status.Errorf(codes.NotFound, "%s", deviceId)
}
-func (dMgr *DeviceManager) getNNIPorts(ctx context.Context, deviceId string) (*voltha.Ports, error) {
- log.Debugw("getNNIPorts-start", log.Fields{"deviceid": deviceId})
+func (dMgr *DeviceManager) getPorts(ctx context.Context, deviceId string, portType voltha.Port_PortType) (*voltha.Ports, error) {
+ log.Debugw("getPorts", log.Fields{"deviceid": deviceId, "portType": portType})
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
- return agent.getNNIPorts(ctx), nil
+ return agent.getPorts(ctx, portType), nil
}
return nil, status.Errorf(codes.NotFound, "%s", deviceId)
+
}
func (dMgr *DeviceManager) getPortCapability(ctx context.Context, deviceId string, portNo uint32) (*core_adapter.PortCapability, error) {
- log.Debugw("getPortCapability-start", log.Fields{"deviceid": deviceId})
+ log.Debugw("getPortCapability", log.Fields{"deviceid": deviceId})
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
return agent.getPortCapability(ctx, portNo)
@@ -194,20 +221,27 @@
return nil, status.Errorf(codes.NotFound, "%s", deviceId)
}
-func (dMgr *DeviceManager) updateDeviceState(deviceId string, operState *core_adapter.IntType, connState *core_adapter.IntType) error {
- log.Debugw("updateDeviceState-start", log.Fields{"deviceid": deviceId, "operState": operState, "connState": connState})
+func (dMgr *DeviceManager) updateDeviceStatus(deviceId string, operStatus voltha.OperStatus_OperStatus, connStatus voltha.ConnectStatus_ConnectStatus) error {
+ log.Debugw("updateDeviceStatus", log.Fields{"deviceid": deviceId, "operStatus": operStatus, "connStatus": connStatus})
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
- return agent.updateDeviceState(operState, connState)
+ return agent.updateDeviceStatus(operStatus, connStatus)
+ }
+ return status.Errorf(codes.NotFound, "%s", deviceId)
+}
+
+func (dMgr *DeviceManager) updatePortState(deviceId string, portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_OperStatus) error {
+ log.Debugw("updatePortState", log.Fields{"deviceid": deviceId, "portType": portType, "portNo": portNo, "operStatus": operStatus})
+ if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+ return agent.updatePortState(portType, portNo, operStatus)
}
return status.Errorf(codes.NotFound, "%s", deviceId)
}
func (dMgr *DeviceManager) childDeviceDetected(parentDeviceId string, parentPortNo int64, deviceType string, channelId int64) error {
- log.Debugw("childDeviceDetected-start", log.Fields{"parentDeviceId": parentDeviceId})
+ log.Debugw("childDeviceDetected", log.Fields{"parentDeviceId": parentDeviceId})
// Create the ONU device
childDevice := &voltha.Device{}
- childDevice.Id = CreateDeviceId()
childDevice.Type = deviceType
childDevice.ParentId = parentDeviceId
childDevice.ParentPortNo = uint32(parentPortNo)
@@ -220,7 +254,7 @@
agent.start(nil)
// Activate the child device
- if agent := dMgr.getDeviceAgent(childDevice.Id); agent != nil {
+ if agent := dMgr.getDeviceAgent(agent.deviceId); agent != nil {
return agent.enableDevice(nil)
}
@@ -229,16 +263,25 @@
func (dMgr *DeviceManager) processTransition(previous *voltha.Device, current *voltha.Device) error {
// This will be triggered on every update to the device.
- handler := dMgr.stateTransitions.GetTransitionHandler(previous, current)
- if handler != nil {
- log.Debugw("found-handler", log.Fields{"handler": funcName(handler)})
- return handler(previous, current)
+ handlers := dMgr.stateTransitions.GetTransitionHandler(previous, current)
+ if handlers == nil {
+ log.Debugw("handlers-not-found", log.Fields{"deviceId": current.Id})
+ return nil
}
- log.Debugw("handler-not-found", log.Fields{"deviceId": current.Id})
+ for _, handler := range handlers {
+ log.Debugw("running-handler", log.Fields{"handler": funcName(handler)})
+ if err := handler(current); err != nil {
+ return err
+ }
+ }
+ //if handler != nil {
+ // log.Debugw("found-handlers", log.Fields{"handlers": funcName(handler)})
+ // return handler(current)
+ //}
return nil
}
-func (dMgr *DeviceManager) createLogicalDevice(pDevice *voltha.Device, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) createLogicalDevice(cDevice *voltha.Device) error {
log.Info("createLogicalDevice")
var logicalId *string
var err error
@@ -251,7 +294,80 @@
return nil
}
-func (dMgr *DeviceManager) addUNILogicalPort(pDevice *voltha.Device, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) deleteLogicalDevice(cDevice *voltha.Device) error {
+ log.Info("deleteLogicalDevice")
+ var err error
+ if err = dMgr.logicalDeviceMgr.DeleteLogicalDevice(nil, cDevice); err != nil {
+ log.Warnw("deleteLogical-device-error", log.Fields{"deviceId": cDevice.Id})
+ return err
+ }
+ // Remove the logical device Id from the parent device
+ logicalId := ""
+ dMgr.UpdateDeviceAttribute(cDevice.Id, "ParentId", logicalId)
+ return nil
+}
+
+func (dMgr *DeviceManager) deleteLogicalPort(cDevice *voltha.Device) error {
+ log.Info("deleteLogicalPort")
+ var err error
+ if err = dMgr.logicalDeviceMgr.DeleteLogicalPort(nil, cDevice); err != nil {
+ log.Warnw("deleteLogical-port-error", log.Fields{"deviceId": cDevice.Id})
+ return err
+ }
+ //// Remove the logical device Id from the parent device
+ //logicalId := ""
+ //dMgr.UpdateDeviceAttribute(cDevice.Id, "ParentId", logicalId)
+ return nil
+}
+
+func (dMgr *DeviceManager) getParentDevice(childDevice *voltha.Device) *voltha.Device {
+ // Sanity check
+ if childDevice.Root {
+ // childDevice is the parent device
+ return childDevice
+ }
+ parentDevice, _ := dMgr.getDevice(childDevice.ParentId)
+ return parentDevice
+}
+
+func (dMgr *DeviceManager) disableAllChildDevices(cDevice *voltha.Device) error {
+ log.Debug("disableAllChildDevices")
+ var childDeviceIds []string
+ var err error
+ if childDeviceIds, err = dMgr.getAllChildDeviceIds(cDevice); err != nil {
+ return status.Errorf(codes.NotFound, "%s", cDevice.Id)
+ }
+ if len(childDeviceIds) == 0 {
+ log.Debugw("no-child-device", log.Fields{"deviceId": cDevice.Id})
+ }
+ for _, childDeviceId := range childDeviceIds {
+ if agent := dMgr.getDeviceAgent(childDeviceId); agent != nil {
+ if err = agent.disableDevice(nil); err != nil {
+ log.Errorw("failure-disable-device", log.Fields{"deviceId": childDeviceId, "error": err.Error()})
+ }
+ }
+ }
+ return nil
+}
+
+func (dMgr *DeviceManager) getAllChildDeviceIds(cDevice *voltha.Device) ([]string, error) {
+ log.Info("getAllChildDeviceIds")
+ // Get latest device info
+ var device *voltha.Device
+ var err error
+ if device, err = dMgr.getDevice(cDevice.Id); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", cDevice.Id)
+ }
+ childDeviceIds := make([]string, 0)
+ for _, port := range device.Ports {
+ for _, peer := range port.Peers {
+ childDeviceIds = append(childDeviceIds, peer.DeviceId)
+ }
+ }
+ return childDeviceIds, nil
+}
+
+func (dMgr *DeviceManager) addUNILogicalPort(cDevice *voltha.Device) error {
log.Info("addUNILogicalPort")
if err := dMgr.logicalDeviceMgr.AddUNILogicalPort(nil, cDevice); err != nil {
log.Warnw("addUNILogicalPort-error", log.Fields{"device": cDevice, "err": err})
@@ -260,32 +376,32 @@
return nil
}
-func (dMgr *DeviceManager) activateDevice(pDevice *voltha.Device, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) activateDevice(cDevice *voltha.Device) error {
log.Info("activateDevice")
return nil
}
-func (dMgr *DeviceManager) disableDevice(pDevice *voltha.Device, cDevice *voltha.Device) error {
- log.Info("disableDevice")
+func (dMgr *DeviceManager) disableDeviceHandler(cDevice *voltha.Device) error {
+ log.Info("disableDevice-donothing")
return nil
}
-func (dMgr *DeviceManager) abandonDevice(pDevice *voltha.Device, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) abandonDevice(cDevice *voltha.Device) error {
log.Info("abandonDevice")
return nil
}
-func (dMgr *DeviceManager) reEnableDevice(pDevice *voltha.Device, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) reEnableDevice(cDevice *voltha.Device) error {
log.Info("reEnableDevice")
return nil
}
-func (dMgr *DeviceManager) noOp(pDevice *voltha.Device, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) noOp(cDevice *voltha.Device) error {
log.Info("noOp")
return nil
}
-func (dMgr *DeviceManager) notAllowed(pDevice *voltha.Device, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) notAllowed(pcDevice *voltha.Device) error {
log.Info("notAllowed")
return errors.New("Transition-not-allowed")
}
@@ -304,7 +420,7 @@
func (dMgr *DeviceManager) GetParentDeviceId(deviceId string) *string {
if device, _ := dMgr.getDevice(deviceId); device != nil {
- log.Infow("GetParentDeviceId", log.Fields{"device": device})
+ log.Infow("GetParentDeviceId", log.Fields{"deviceId": device.Id, "parentId": device.ParentId})
return &device.ParentId
}
return nil
diff --git a/rw_core/core/device_state_transitions.go b/rw_core/core/device_state_transitions.go
index 0f0239c..a84f03a 100644
--- a/rw_core/core/device_state_transitions.go
+++ b/rw_core/core/device_state_transitions.go
@@ -34,13 +34,13 @@
Operational voltha.OperStatus_OperStatus
}
-type TransitionHandler func(*voltha.Device, *voltha.Device) error
+type TransitionHandler func(*voltha.Device) error
type Transition struct {
deviceType DeviceType
previousState DeviceState
currentState DeviceState
- handler TransitionHandler
+ handlers []TransitionHandler
}
type TransitionMap struct {
@@ -55,88 +55,93 @@
deviceType: any,
previousState: DeviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
currentState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- handler: dMgr.activateDevice})
+ handlers: []TransitionHandler{dMgr.activateDevice}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
previousState: DeviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
currentState: DeviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- handler: dMgr.notAllowed})
+ handlers: []TransitionHandler{dMgr.notAllowed}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
previousState: DeviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
currentState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- handler: dMgr.activateDevice})
+ handlers: []TransitionHandler{dMgr.activateDevice}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
previousState: DeviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
currentState: DeviceState{Admin: voltha.AdminState_DOWNLOADING_IMAGE, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- handler: dMgr.notAllowed})
+ handlers: []TransitionHandler{dMgr.notAllowed}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
currentState: DeviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- handler: dMgr.notAllowed})
+ handlers: []TransitionHandler{dMgr.notAllowed}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: parent,
previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVATING},
currentState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVE},
- handler: dMgr.createLogicalDevice})
+ handlers: []TransitionHandler{dMgr.createLogicalDevice}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: child,
previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVATING},
currentState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVE},
- handler: dMgr.addUNILogicalPort})
+ handlers: []TransitionHandler{dMgr.addUNILogicalPort}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
- deviceType: any,
+ deviceType: parent,
previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
currentState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- handler: dMgr.disableDevice})
-
+ handlers: []TransitionHandler{dMgr.deleteLogicalDevice, dMgr.disableAllChildDevices}})
+ transitionMap.transitions = append(transitionMap.transitions,
+ Transition{
+ deviceType: child,
+ previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+ currentState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+ handlers: []TransitionHandler{dMgr.deleteLogicalPort}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
currentState: DeviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- handler: dMgr.abandonDevice})
+ handlers: []TransitionHandler{dMgr.abandonDevice}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
previousState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
currentState: DeviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- handler: dMgr.notAllowed})
+ handlers: []TransitionHandler{dMgr.notAllowed}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
previousState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
currentState: DeviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- handler: dMgr.abandonDevice})
+ handlers: []TransitionHandler{dMgr.abandonDevice}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
previousState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
currentState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- handler: dMgr.reEnableDevice})
+ handlers: []TransitionHandler{dMgr.reEnableDevice}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
previousState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
currentState: DeviceState{Admin: voltha.AdminState_DOWNLOADING_IMAGE, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- handler: dMgr.notAllowed})
+ handlers: []TransitionHandler{dMgr.notAllowed}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
previousState: DeviceState{Admin: voltha.AdminState_DOWNLOADING_IMAGE, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
currentState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- handler: dMgr.notAllowed})
+ handlers: []TransitionHandler{dMgr.notAllowed}})
return &transitionMap
}
@@ -146,38 +151,38 @@
}
// isMatched matches a state transition. It returns whether there is a match and if there is whether it is an exact match
-func getHandler(previous *DeviceState, current *DeviceState, transition *Transition) (TransitionHandler, bool) {
+func getHandler(previous *DeviceState, current *DeviceState, transition *Transition) ([]TransitionHandler, bool) {
// Do we have an exact match?
if *previous == transition.previousState && *current == transition.currentState {
- return transition.handler, true
+ return transition.handlers, true
}
// If the admin state changed then prioritize it first
if previous.Admin != current.Admin {
if previous.Admin == transition.previousState.Admin && current.Admin == transition.currentState.Admin {
- return transition.handler, false
+ return transition.handlers, false
}
}
// If the operational state changed then prioritize it in second position
if previous.Operational != current.Operational {
if previous.Operational == transition.previousState.Operational && current.Operational == transition.currentState.Operational {
- return transition.handler, false
+ return transition.handlers, false
}
}
// If the connection state changed then prioritize it in third position
if previous.Connection != current.Connection {
if previous.Connection == transition.previousState.Connection && current.Connection == transition.currentState.Connection {
- return transition.handler, false
+ return transition.handlers, false
}
}
return nil, false
}
-func (tMap *TransitionMap) GetTransitionHandler(pDevice *voltha.Device, cDevice *voltha.Device) TransitionHandler {
+func (tMap *TransitionMap) GetTransitionHandler(pDevice *voltha.Device, cDevice *voltha.Device) []TransitionHandler {
//1. Get the previous and current set of states
pState := getDeviceStates(pDevice)
cState := getDeviceStates(cDevice)
- log.Infow("DeviceType", log.Fields{"device": pDevice})
+ //log.Infow("DeviceType", log.Fields{"device": pDevice})
deviceType := parent
if !pDevice.Root {
log.Info("device is child")
@@ -186,8 +191,8 @@
log.Infof("deviceType:%d-deviceId:%s-previous:%v-current:%v", deviceType, pDevice.Id, pState, cState)
//2. Go over transition array to get the right transition
- var currentMatch TransitionHandler
- var tempHandler TransitionHandler
+ var currentMatch []TransitionHandler
+ var tempHandler []TransitionHandler
var exactMatch bool
var deviceTypeMatch bool
for _, aTransition := range tMap.transitions {
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index bd28322..d446438 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -48,19 +48,14 @@
func (handler *APIHandler) UpdateLogLevel(ctx context.Context, logging *voltha.Logging) (*empty.Empty, error) {
log.Debugw("UpdateLogLevel-request", log.Fields{"newloglevel": logging.Level, "intval": int(logging.Level)})
- if isTestMode(ctx) {
- out := new(empty.Empty)
- log.SetPackageLogLevel(logging.PackageName, int(logging.Level))
- return out, nil
- }
- return nil, errors.New("Unimplemented")
-
+ out := new(empty.Empty)
+ log.SetPackageLogLevel(logging.PackageName, int(logging.Level))
+ return out, nil
}
func processEnableDevicePort(ctx context.Context, id *voltha.LogicalPortId, ch chan error) {
log.Debugw("processEnableDevicePort", log.Fields{"id": id, "test": common.TestModeKeys_api_test.String()})
ch <- status.Errorf(100, "%d-%s", 100, "erreur")
-
}
func (handler *APIHandler) EnableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
@@ -141,15 +136,17 @@
go handler.deviceMgr.createDevice(ctx, device, ch)
select {
case res := <-ch:
- if res == nil {
- return &voltha.Device{Id: device.Id}, nil
- } else if err, ok := res.(error); ok {
- return &voltha.Device{Id: device.Id}, err
- } else {
- log.Warnw("create-device-unexpected-return-type", log.Fields{"result": res})
- err = status.Errorf(codes.Internal, "%s", res)
- return &voltha.Device{Id: device.Id}, err
+ if res != nil {
+ if err, ok := res.(error); ok {
+ return &voltha.Device{}, err
+ }
+ if d, ok := res.(*voltha.Device); ok {
+ return d, nil
+ }
}
+ log.Warnw("create-device-unexpected-return-type", log.Fields{"result": res})
+ err := status.Errorf(codes.Internal, "%s", res)
+ return &voltha.Device{}, err
case <-ctx.Done():
log.Debug("createdevice-client-timeout")
return nil, ctx.Err()
@@ -188,6 +185,24 @@
out := new(empty.Empty)
return out, nil
}
+ ch := make(chan interface{})
+ defer close(ch)
+ go handler.deviceMgr.disableDevice(ctx, id, ch)
+ select {
+ case res := <-ch:
+ if res == nil {
+ return new(empty.Empty), nil
+ } else if err, ok := res.(error); ok {
+ return new(empty.Empty), err
+ } else {
+ log.Warnw("disable-device-unexpected-return-type", log.Fields{"result": res})
+ err = status.Errorf(codes.Internal, "%s", res)
+ return new(empty.Empty), err
+ }
+ case <-ctx.Done():
+ log.Debug("enabledevice-client-timeout")
+ return nil, ctx.Err()
+ }
return nil, errors.New("Unimplemented")
}
diff --git a/rw_core/core/grpc_nbi_api_handler_client_test.go b/rw_core/core/grpc_nbi_api_handler_client_test.go
index 6a32ee5..21300cc 100644
--- a/rw_core/core/grpc_nbi_api_handler_client_test.go
+++ b/rw_core/core/grpc_nbi_api_handler_client_test.go
@@ -39,7 +39,6 @@
Prerequite: These tests require the rw_core to run prior to executing these test cases.
*/
-
func setup() {
var err error
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 117c869..218478f 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -25,17 +25,18 @@
"github.com/opencord/voltha-go/protos/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
- "reflect"
+ "sync"
)
type LogicalDeviceAgent struct {
- logicalDeviceId string
- lastData *voltha.LogicalDevice
- rootDeviceId string
- deviceMgr *DeviceManager
- ldeviceMgr *LogicalDeviceManager
- clusterDataProxy *model.Proxy
- exitChannel chan int
+ logicalDeviceId string
+ lastData *voltha.LogicalDevice
+ rootDeviceId string
+ deviceMgr *DeviceManager
+ ldeviceMgr *LogicalDeviceManager
+ clusterDataProxy *model.Proxy
+ exitChannel chan int
+ lockLogicalDevice sync.RWMutex
}
func NewLogicalDeviceAgent(id string, device *voltha.Device, ldeviceMgr *LogicalDeviceManager, deviceMgr *DeviceManager,
@@ -47,11 +48,12 @@
agent.deviceMgr = deviceMgr
agent.clusterDataProxy = cdProxy
agent.ldeviceMgr = ldeviceMgr
+ agent.lockLogicalDevice = sync.RWMutex{}
return &agent
}
func (agent *LogicalDeviceAgent) Start(ctx context.Context) error {
- log.Info("starting-logical_device-agent")
+ log.Infow("starting-logical_device-agent", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
//Build the logical device based on information retrieved from the device adapter
var switchCap *ca.SwitchCapability
var err error
@@ -68,7 +70,7 @@
//hence. may need to extract the port by the NNI port id defined by the adapter during device
//creation
var nniPorts *voltha.Ports
- if nniPorts, err = agent.deviceMgr.getNNIPorts(ctx, agent.rootDeviceId); err != nil {
+ if nniPorts, err = agent.deviceMgr.getPorts(ctx, agent.rootDeviceId, voltha.Port_ETHERNET_NNI); err != nil {
log.Errorw("error-creating-logical-port", log.Fields{"error": err})
}
var portCap *ca.PortCapability
@@ -80,8 +82,11 @@
}
lp := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
+ lp.DeviceId = agent.rootDeviceId
ld.Ports = append(ld.Ports, lp)
}
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
// Save the logical device
if added := agent.clusterDataProxy.Add("/logical_devices", ld, ""); added == nil {
log.Errorw("failed-to-add-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
@@ -92,8 +97,30 @@
return nil
}
+func (agent *LogicalDeviceAgent) getLogicalDevice() (*voltha.LogicalDevice, error) {
+ log.Debug("getLogicalDevice")
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
+ logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 1, false, "")
+ if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
+ cloned := proto.Clone(lDevice).(*voltha.LogicalDevice)
+ return cloned, nil
+ }
+ return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
+}
+
+func (agent *LogicalDeviceAgent) getLogicalDeviceWithoutLock() (*voltha.LogicalDevice, error) {
+ log.Debug("getLogicalDeviceWithoutLock")
+ logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 1, false, "")
+ if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
+ cloned := proto.Clone(lDevice).(*voltha.LogicalDevice)
+ return cloned, nil
+ }
+ return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
+}
+
func (agent *LogicalDeviceAgent) addUNILogicalPort(ctx context.Context, childDevice *voltha.Device, portNo uint32) error {
- log.Info("addUNILogicalPort-start")
+ log.Infow("addUNILogicalPort-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
// Build the logical device based on information retrieved from the device adapter
var portCap *ca.PortCapability
var err error
@@ -101,29 +128,67 @@
log.Errorw("error-creating-logical-port", log.Fields{"error": err})
return err
}
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
// Get stored logical device
- if ldevice, err := agent.ldeviceMgr.getLogicalDevice(agent.logicalDeviceId); err != nil {
+ if ldevice, err := agent.getLogicalDeviceWithoutLock(); err != nil {
return status.Error(codes.NotFound, agent.logicalDeviceId)
} else {
- cloned := reflect.ValueOf(ldevice).Elem().Interface().(voltha.LogicalDevice)
- lp := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
+ cloned := proto.Clone(ldevice).(*voltha.LogicalDevice)
+ lp := proto.Clone(portCap.Port).(*voltha.LogicalPort)
+ lp.DeviceId = childDevice.Id
cloned.Ports = append(cloned.Ports, lp)
- afterUpdate := agent.clusterDataProxy.Update("/logical_devices/"+agent.logicalDeviceId, &cloned, false, "")
- if afterUpdate == nil {
- return status.Errorf(codes.Internal, "failed-add-UNI-port:%s", agent.logicalDeviceId)
- }
+ return agent.updateLogicalDeviceWithoutLock(cloned)
+ }
+}
+
+//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
+func (agent *LogicalDeviceAgent) updateLogicalDeviceWithoutLock(logicalDevice *voltha.LogicalDevice) error {
+ cloned := proto.Clone(logicalDevice).(*voltha.LogicalDevice)
+ afterUpdate := agent.clusterDataProxy.Update("/logical_devices/"+agent.logicalDeviceId, cloned, false, "")
+ if afterUpdate == nil {
+ return status.Errorf(codes.Internal, "failed-updating-logical-device:%s", agent.logicalDeviceId)
+ }
+ return nil
+}
+
+// deleteLogicalPort removes the logical port associated with a child device
+func (agent *LogicalDeviceAgent) deleteLogicalPort(device *voltha.Device) error {
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
+ // Get the most up to date logical device
+ var logicaldevice *voltha.LogicalDevice
+ if logicaldevice, _ = agent.getLogicalDeviceWithoutLock(); logicaldevice == nil {
+ log.Debugw("no-logical-device", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "deviceId": device.Id})
return nil
}
+ index := -1
+ for i, logicalPort := range logicaldevice.Ports {
+ if logicalPort.DeviceId == device.Id {
+ index = i
+ break
+ }
+ }
+ if index >= 0 {
+ copy(logicaldevice.Ports[index:], logicaldevice.Ports[index+1:])
+ logicaldevice.Ports[len(logicaldevice.Ports)-1] = nil
+ logicaldevice.Ports = logicaldevice.Ports[:len(logicaldevice.Ports)-1]
+ log.Debugw("logical-port-deleted", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+ return agent.updateLogicalDeviceWithoutLock(logicaldevice)
+ }
+ return nil
}
func (agent *LogicalDeviceAgent) Stop(ctx context.Context) {
log.Info("stopping-logical_device-agent")
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
+ //Remove the logical device from the model
+ if removed := agent.clusterDataProxy.Remove("/logical_devices/"+agent.logicalDeviceId, ""); removed == nil {
+ log.Errorw("failed-to-remove-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
+ } else {
+ log.Debugw("logicaldevice-removed", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
+ }
agent.exitChannel <- 1
log.Info("logical_device-agent-stopped")
}
-
-func (agent *LogicalDeviceAgent) getLogicalDevice(ctx context.Context) *voltha.LogicalDevice {
- log.Debug("getLogicalDevice")
- cp := proto.Clone(agent.lastData)
- return cp.(*voltha.LogicalDevice)
-}
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index aa22d57..bef078c 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -24,7 +24,6 @@
"github.com/opencord/voltha-go/protos/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
- "reflect"
"strings"
"sync"
)
@@ -78,33 +77,36 @@
return nil
}
+func (ldMgr *LogicalDeviceManager) deleteLogicalDeviceAgent(logicalDeviceId string) {
+ ldMgr.lockLogicalDeviceAgentsMap.Lock()
+ defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
+ delete(ldMgr.logicalDeviceAgents, logicalDeviceId)
+}
+
+// getLogicalDevice provides a cloned most up to date logical device
func (ldMgr *LogicalDeviceManager) getLogicalDevice(id string) (*voltha.LogicalDevice, error) {
- log.Debugw("getlogicalDevice-start", log.Fields{"logicaldeviceid": id})
- logicalDevice := ldMgr.clusterDataProxy.Get("/logical_devices/"+id, 1, false, "")
- if logicalDevice != nil {
- cloned := reflect.ValueOf(logicalDevice).Elem().Interface().(voltha.LogicalDevice)
- return &cloned, nil
+ log.Debugw("getlogicalDevice", log.Fields{"logicaldeviceid": id})
+ if agent := ldMgr.getLogicalDeviceAgent(id); agent != nil {
+ return agent.getLogicalDevice()
}
return nil, status.Errorf(codes.NotFound, "%s", id)
}
func (ldMgr *LogicalDeviceManager) listLogicalDevices() (*voltha.LogicalDevices, error) {
- log.Debug("listLogicalDevices-start")
+ log.Debug("listLogicalDevices")
result := &voltha.LogicalDevices{}
ldMgr.lockLogicalDeviceAgentsMap.Lock()
defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
for _, agent := range ldMgr.logicalDeviceAgents {
- logicalDevice := ldMgr.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 1, false, "")
- if logicalDevice != nil {
- cloned := reflect.ValueOf(logicalDevice).Elem().Interface().(voltha.LogicalDevice)
- result.Items = append(result.Items, &cloned)
+ if lDevice, err := agent.getLogicalDevice(); err == nil {
+ result.Items = append(result.Items, lDevice)
}
}
return result, nil
}
func (ldMgr *LogicalDeviceManager) CreateLogicalDevice(ctx context.Context, device *voltha.Device) (*string, error) {
- log.Infow("creating-logical-device-start", log.Fields{"deviceId": device.Id})
+ log.Debugw("creating-logical-device", log.Fields{"deviceId": device.Id})
// Sanity check
if !device.Root {
return nil, errors.New("Device-not-root")
@@ -116,18 +118,74 @@
// in the Device model. May need to be moved out.
macAddress := device.MacAddress
id := strings.Replace(macAddress, ":", "", -1)
- log.Debugw("setting-logical-device-id", log.Fields{"logicaldeviceId": id})
+ if id == "" {
+ log.Errorw("mac-address-not-set", log.Fields{"deviceId": device.Id})
+ return nil, errors.New("mac-address-not-set")
+ }
+ log.Debugw("logical-device-id", log.Fields{"logicaldeviceId": id})
agent := NewLogicalDeviceAgent(id, device, ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy)
ldMgr.addLogicalDeviceAgentToMap(agent)
go agent.Start(ctx)
- log.Info("creating-logical-device-ends")
+ log.Debug("creating-logical-device-ends")
return &id, nil
}
+func (ldMgr *LogicalDeviceManager) DeleteLogicalDevice(ctx context.Context, device *voltha.Device) error {
+ log.Debugw("deleting-logical-device", log.Fields{"deviceId": device.Id})
+ // Sanity check
+ if !device.Root {
+ return errors.New("Device-not-root")
+ }
+ logDeviceId := device.ParentId
+ if agent := ldMgr.getLogicalDeviceAgent(logDeviceId); agent != nil {
+ // Stop the logical device agent
+ agent.Stop(ctx)
+ //Remove the logical device agent from the Map
+ ldMgr.deleteLogicalDeviceAgent(logDeviceId)
+ }
+
+ log.Debug("deleting-logical-device-ends")
+ return nil
+}
+
+func (ldMgr *LogicalDeviceManager) getLogicalDeviceId(device *voltha.Device) (*string, error) {
+ // Device can either be a parent or a child device
+ if device.Root {
+ // Parent device. The ID of a parent device is the logical device ID
+ return &device.ParentId, nil
+ }
+ // Device is child device
+ // retrieve parent device using child device ID
+ if parentDevice := ldMgr.deviceMgr.getParentDevice(device); parentDevice != nil {
+ return &parentDevice.ParentId, nil
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", device.Id)
+}
+
+// DeleteLogicalDevice removes the logical port associated with a child device
+func (ldMgr *LogicalDeviceManager) DeleteLogicalPort(ctx context.Context, device *voltha.Device) error {
+ log.Debugw("deleting-logical-port", log.Fields{"deviceId": device.Id})
+ // Sanity check
+ if device.Root {
+ return errors.New("Device-root")
+ }
+ logDeviceId, _ := ldMgr.getLogicalDeviceId(device)
+ if logDeviceId == nil {
+ log.Debugw("no-logical-device-present", log.Fields{"deviceId": device.Id})
+ return nil
+ }
+ if agent := ldMgr.getLogicalDeviceAgent(*logDeviceId); agent != nil {
+ agent.deleteLogicalPort(device)
+ }
+
+ log.Debug("deleting-logical-port-ends")
+ return nil
+}
+
func (ldMgr *LogicalDeviceManager) AddUNILogicalPort(ctx context.Context, childDevice *voltha.Device) error {
- log.Infow("AddUNILogicalPort-start", log.Fields{"deviceId": childDevice.Id})
+ log.Debugw("AddUNILogicalPort", log.Fields{"deviceId": childDevice.Id})
// Sanity check
if childDevice.Root {
return errors.New("Device-root")
@@ -137,7 +195,7 @@
parentId := childDevice.ParentId
logDeviceId := ldMgr.deviceMgr.GetParentDeviceId(parentId)
- log.Infow("AddUNILogicalPort", log.Fields{"logDeviceId": logDeviceId, "parentId": parentId})
+ log.Debugw("AddUNILogicalPort", log.Fields{"logDeviceId": logDeviceId, "parentId": parentId})
if agent := ldMgr.getLogicalDeviceAgent(*logDeviceId); agent != nil {
return agent.addUNILogicalPort(ctx, childDevice, childDevice.ProxyAddress.ChannelId)