[VOL-1036] Device management implementation. This update includes
the the ability to reboot and delete a device. It contains changes
to both the Go Core and the Twisted ponsim adapters.
Change-Id: I15539827c654d7186cdae3300a107ffc8e921756
diff --git a/adapters/iadapter.py b/adapters/iadapter.py
index 3388d5a..8cdcd6e 100644
--- a/adapters/iadapter.py
+++ b/adapters/iadapter.py
@@ -109,18 +109,20 @@
def disable_device(self, device):
log.info('disable-device', device_id=device.id)
- reactor.callLater(1, self.devices_handlers[device.id].disable)
- log.debug('disable_device_done', device_id=device.id)
+ reactor.callLater(0, self.devices_handlers[device.id].disable)
+ log.debug('disable-device-done', device_id=device.id)
return device
def reenable_device(self, device):
log.info('reenable-device', device_id=device.id)
reactor.callLater(0, self.devices_handlers[device.id].reenable)
+ log.info('reenable-device-done', device_id=device.id)
return device
def reboot_device(self, device):
log.info('reboot-device', device_id=device.id)
reactor.callLater(0, self.devices_handlers[device.id].reboot)
+ log.info('reboot-device-done', device_id=device.id)
return device
def download_image(self, device, request):
@@ -139,14 +141,15 @@
raise NotImplementedError()
def self_test_device(self, device):
- log.info('self-test-req', device_id=device.id)
+ log.info('self-test', device_id=device.id)
result = reactor.callLater(0, self.devices_handlers[device.id].self_test_device)
+ log.info('self-test-done', device_id=device.id)
return result
def delete_device(self, device):
log.info('delete-device', device_id=device.id)
- # TODO: Update the logical device mapping
reactor.callLater(0, self.devices_handlers[device.id].delete)
+ log.info('delete-device-done', device_id=device.id)
return device
def get_device_details(self, device):
diff --git a/adapters/kafka/adapter_request_facade.py b/adapters/kafka/adapter_request_facade.py
index 2517a31..f4898a3 100644
--- a/adapters/kafka/adapter_request_facade.py
+++ b/adapters/kafka/adapter_request_facade.py
@@ -156,10 +156,12 @@
return self.adapter.self_test_device(device)
def delete_device(self, device):
- # Remove all child devices
- self.delete_all_child_devices(device.id)
-
- return self.adapter.delete_device(device)
+ d = Device()
+ if device:
+ device.Unpack(d)
+ return (True, self.adapter.delete_device(d))
+ else:
+ return (False, d)
def get_device_details(self, device):
return self.adapter.get_device_details(device)
diff --git a/adapters/kafka/core_proxy.py b/adapters/kafka/core_proxy.py
index 32a4c9d..512262f 100644
--- a/adapters/kafka/core_proxy.py
+++ b/adapters/kafka/core_proxy.py
@@ -259,6 +259,31 @@
connect_status=c_status)
returnValue(res)
+
+ @wrap_request(None)
+ @inlineCallbacks
+ def children_state_update(self, device_id,
+ oper_status=None,
+ connect_status=None):
+ id = ID()
+ id.id = device_id
+ o_status = IntType()
+ if oper_status or oper_status==OperStatus.UNKNOWN:
+ o_status.val = oper_status
+ else:
+ o_status.val = -1
+ c_status = IntType()
+ if connect_status or connect_status==ConnectStatus.UNKNOWN:
+ c_status.val = connect_status
+ else:
+ c_status.val = -1
+
+ res = yield self.invoke(rpc="ChildrenStateUpdate",
+ device_id=id,
+ oper_status=o_status,
+ connect_status=c_status)
+ returnValue(res)
+
@wrap_request(None)
@inlineCallbacks
def port_state_update(self,
diff --git a/adapters/ponsim_olt/ponsim_olt.py b/adapters/ponsim_olt/ponsim_olt.py
index 95bafaa..e47d14d 100644
--- a/adapters/ponsim_olt/ponsim_olt.py
+++ b/adapters/ponsim_olt/ponsim_olt.py
@@ -48,6 +48,11 @@
from adapters.protos.ponsim_pb2 import FlowTable, PonSimFrame
from adapters.protos.core_adapter_pb2 import SwitchCapability, PortCapability
from adapters.common.utils.registry import registry
+from adapters.kafka.kafka_proxy import get_kafka_proxy
+from simplejson import dumps
+from google.protobuf.json_format import MessageToDict
+from google.protobuf.message import Message
+
_ = third_party
log = structlog.get_logger()
@@ -370,6 +375,7 @@
)
)
+ # TODO - change for core 2.0
def reconcile(self, device):
self.log.info('reconciling-OLT-device-starts')
@@ -537,34 +543,23 @@
def reboot(self):
self.log.info('rebooting', device_id=self.device_id)
- # Update the operational status to ACTIVATING and connect status to
- # UNREACHABLE
- device = self.adapter_agent.get_device(self.device_id)
- previous_oper_status = device.oper_status
- previous_conn_status = device.connect_status
- device.oper_status = OperStatus.ACTIVATING
- device.connect_status = ConnectStatus.UNREACHABLE
- self.adapter_agent.device_update(device)
+ yield self.adapter_agent.device_state_update(self.device_id, connect_status=ConnectStatus.UNREACHABLE)
- # Update the child devices connect state to UNREACHABLE
- self.adapter_agent.update_child_devices_state(self.device_id,
+ # Update the child devices connect state to UNREACHABLE
+ yield self.adapter_agent.children_state_update(self.device_id,
connect_status=ConnectStatus.UNREACHABLE)
# Sleep 10 secs, simulating a reboot
# TODO: send alert and clear alert after the reboot
yield asleep(10)
- # Change the operational status back to its previous state. With a
- # real OLT the operational state should be the state the device is
- # after a reboot.
- # Get the latest device reference
- device = self.adapter_agent.get_device(self.device_id)
- device.oper_status = previous_oper_status
- device.connect_status = previous_conn_status
- self.adapter_agent.device_update(device)
+ # Change the connection status back to REACHABLE. With a
+ # real OLT the connection state must be the actual state
+ yield self.adapter_agent.device_state_update(self.device_id, connect_status=ConnectStatus.REACHABLE)
+
# Update the child devices connect state to REACHABLE
- self.adapter_agent.update_child_devices_state(self.device_id,
+ yield self.adapter_agent.children_state_update(self.device_id,
connect_status=ConnectStatus.REACHABLE)
self.log.info('rebooted', device_id=self.device_id)
@@ -652,6 +647,8 @@
def start_kpi_collection(self, device_id):
+ kafka_cluster_proxy = get_kafka_proxy()
+
def _collect(device_id, prefix):
try:
@@ -675,8 +672,11 @@
}
)
- # Step 3: submit
- self.adapter_agent.submit_kpis(kpi_event)
+ # Step 3: submit directlt to kafka bus
+ if kafka_cluster_proxy:
+ if isinstance(kpi_event, Message):
+ kpi_event = dumps(MessageToDict(kpi_event, True, True))
+ kafka_cluster_proxy.send_message("voltha.kpis", kpi_event)
except Exception as e:
log.exception('failed-to-submit-kpis', e=e)
diff --git a/adapters/ponsim_onu/ponsim_onu.py b/adapters/ponsim_onu/ponsim_onu.py
index d1b2e27..a9d3710 100644
--- a/adapters/ponsim_onu/ponsim_onu.py
+++ b/adapters/ponsim_onu/ponsim_onu.py
@@ -79,14 +79,6 @@
self.log.info('activating')
# TODO: Register for proxy address
- # # first we verify that we got parent reference and proxy info
- # assert device.parent_id
- # assert device.proxy_address.device_id
- # assert device.proxy_address.channel_id
- #
- # # register for proxied messages right away
- # self.proxy_address = device.proxy_address
- # self.adapter_agent.register_for_proxied_messages(device.proxy_address)
# populate device info
device.root = False
@@ -95,7 +87,7 @@
# device.connect_status = ConnectStatus.REACHABLE
yield self.adapter_agent.device_update(device)
- # register physical ports
+ # register physical ports
self.uni_port = Port(
port_no=2,
label='UNI facing Ethernet port',
@@ -147,12 +139,6 @@
)
)
- # def _get_uni_port(self):
- # ports = self.adapter_agent.get_ports(self.device_id, Port.ETHERNET_UNI)
- # if ports:
- # # For now, we use on one uni port
- # return ports[0]
-
@inlineCallbacks
def _get_uni_port(self):
ports = yield self.adapter_agent.get_ports(self.device_id, Port.ETHERNET_UNI)
@@ -163,12 +149,6 @@
ports = yield self.adapter_agent.get_ports(self.device_id, Port.PON_ONU)
returnValue(ports)
- # def _get_pon_port(self):
- # ports = self.adapter_agent.get_ports(self.device_id, Port.PON_ONU)
- # if ports:
- # # For now, we use on one uni port
- # return ports[0]
-
def reconcile(self, device):
self.log.info('reconciling-ONU-device-starts')
@@ -228,27 +208,17 @@
def reboot(self):
self.log.info('rebooting', device_id=self.device_id)
- # Update the operational status to ACTIVATING and connect status to
- # UNREACHABLE
- device = self.adapter_agent.get_device(self.device_id)
- previous_oper_status = device.oper_status
- previous_conn_status = device.connect_status
- device.oper_status = OperStatus.ACTIVATING
- device.connect_status = ConnectStatus.UNREACHABLE
- self.adapter_agent.update_device(device)
+ # Update the connect status to UNREACHABLE
+ yield self.adapter_agent.device_state_update(self.device_id, connect_status=ConnectStatus.UNREACHABLE)
# Sleep 10 secs, simulating a reboot
# TODO: send alert and clear alert after the reboot
yield asleep(10)
- # Change the operational status back to its previous state. With a
- # real OLT the operational state should be the state the device is
- # after a reboot.
- # Get the latest device reference
- device = self.adapter_agent.get_device(self.device_id)
- device.oper_status = previous_oper_status
- device.connect_status = previous_conn_status
- self.adapter_agent.update_device(device)
+ # Change the connection status back to REACHABLE. With a
+ # real ONU the connection state must be the actual state
+ yield self.adapter_agent.device_state_update(self.device_id, connect_status=ConnectStatus.REACHABLE)
+
self.log.info('rebooted', device_id=self.device_id)
def self_test_device(self, device):
@@ -278,18 +248,6 @@
def reenable(self):
self.log.info('re-enabling', device_id=self.device_id)
try:
- # Get the latest device reference
- # device = self.adapter_agent.get_device(self.device_id)
-
- # First we verify that we got parent reference and proxy info
- # assert device.parent_id
- # assert device.proxy_address.device_id
- # assert device.proxy_address.channel_id
-
- # Re-register for proxied messages right away
- # self.proxy_address = device.proxy_address
- # self.adapter_agent.register_for_proxied_messages(
- # device.proxy_address)
# Refresh the port reference - we only use one port for now
ports = yield self._get_uni_port()
@@ -314,47 +272,6 @@
port_no=self.pon_port.port_no,
oper_status=OperStatus.ACTIVE)
-
- # # Re-enable the ports on that device
- # self.adapter_agent.enable_all_ports(self.device_id)
-
-
- # Add the pon port reference to the parent
- # self.adapter_agent.add_port_reference_to_parent(device.id,
- # self.pon_port)
-
- # Update the connect status to REACHABLE
- # device.connect_status = ConnectStatus.REACHABLE
- # self.adapter_agent.update_device(device)
-
- # re-add uni port to logical device
- # parent_device = self.adapter_agent.get_device(device.parent_id)
- # logical_device_id = parent_device.parent_id
- # assert logical_device_id
- # port_no = device.proxy_address.channel_id
- # cap = OFPPF_1GB_FD | OFPPF_FIBER
- # self.adapter_agent.add_logical_port(logical_device_id, LogicalPort(
- # id='uni-{}'.format(port_no),
- # ofp_port=ofp_port(
- # port_no=port_no,
- # hw_addr=mac_str_to_tuple('00:00:00:00:00:%02x' % port_no),
- # name='uni-{}'.format(port_no),
- # config=0,
- # state=OFPPS_LIVE,
- # curr=cap,
- # advertised=cap,
- # peer=cap,
- # curr_speed=OFPPF_1GB_FD,
- # max_speed=OFPPF_1GB_FD
- # ),
- # device_id=device.id,
- # device_port_no=self.uni_port.port_no
- # ))
-
- # device = self.adapter_agent.get_device(device.id)
- # device.oper_status = OperStatus.ACTIVE
- # self.adapter_agent.update_device(device)
-
yield self.adapter_agent.device_state_update(self.device_id, oper_status=OperStatus.ACTIVE, connect_status=ConnectStatus.REACHABLE)
self.log.info('re-enabled', device_id=self.device_id)
@@ -364,8 +281,6 @@
def delete(self):
self.log.info('deleting', device_id=self.device_id)
- # A delete request may be received when an OLT is dsiabled
-
# TODO:
# 1) Remove all flows from the device
# 2) Remove the device from ponsim
diff --git a/protos/common.proto b/protos/common.proto
index aca0a36..d1234d5 100644
--- a/protos/common.proto
+++ b/protos/common.proto
@@ -42,15 +42,17 @@
PREPROVISIONED = 1;
// The device is enabled for activation and operation
- ENABLED = 3;
+ ENABLED = 2;
// The device is disabled and shall not perform its intended forwarding
// functions other than being available for re-activation.
- DISABLED = 2;
+ DISABLED = 3;
- // The deive is in the state of image download
+ // The device is in the state of image download
DOWNLOADING_IMAGE = 4;
+ // The device is marked to be deleted
+ DELETED = 5;
}
}
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index bccb227..6d78aa4 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -81,6 +81,116 @@
return unPackResponse(rpc, device.Id, success, result)
}
+
+
+func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) error {
+ log.Debugw("ReEnableDevice", log.Fields{"deviceId": device.Id})
+ rpc := "reenable_device"
+ topic := kafka.Topic{Name: device.Type}
+ args := make([]*kafka.KVArg, 1)
+ args[0] = &kafka.KVArg{
+ Key: "device",
+ Value: device,
+ }
+ success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
+ log.Debugw("ReEnableDevice-response", log.Fields{"deviceid": device.Id, "success": success})
+ return unPackResponse(rpc, device.Id, success, result)
+}
+
+func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) error {
+ log.Debugw("RebootDevice", log.Fields{"deviceId": device.Id})
+ rpc := "reboot_device"
+ topic := kafka.Topic{Name: device.Type}
+ args := make([]*kafka.KVArg, 1)
+ args[0] = &kafka.KVArg{
+ Key: "device",
+ Value: device,
+ }
+ success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
+ log.Debugw("RebootDevice-response", log.Fields{"deviceid": device.Id, "success": success})
+ return unPackResponse(rpc, device.Id, success, result)
+}
+
+func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) error {
+ log.Debugw("DeleteDevice", log.Fields{"deviceId": device.Id})
+ rpc := "delete_device"
+ topic := kafka.Topic{Name: device.Type}
+ args := make([]*kafka.KVArg, 1)
+ args[0] = &kafka.KVArg{
+ Key: "device",
+ Value: device,
+ }
+ success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
+ log.Debugw("DeleteDevice-response", log.Fields{"deviceid": device.Id, "success": success})
+ return unPackResponse(rpc, device.Id, success, result)
+}
+
+func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ca.SwitchCapability, error) {
+ log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
+ topic := kafka.Topic{Name: device.Type}
+ args := make([]*kafka.KVArg, 1)
+ args[0] = &kafka.KVArg{
+ Key: "device",
+ Value: device,
+ }
+ success, result := ap.kafkaProxy.InvokeRPC(ctx, "get_ofp_device_info", &topic, true, args...)
+ log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
+ if success {
+ unpackResult := &ca.SwitchCapability{}
+ if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+ }
+ return unpackResult, nil
+ } else {
+ unpackResult := &ca.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ log.Debugw("GetOfpDeviceInfo-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
+ // TODO: Need to get the real error code
+ return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
+ }
+}
+
+func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ca.PortCapability, error) {
+ log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
+ topic := kafka.Topic{Name: device.Type}
+ args := make([]*kafka.KVArg, 2)
+ args[0] = &kafka.KVArg{
+ Key: "device",
+ Value: device,
+ }
+ pNo := &ca.IntType{Val: int64(portNo)}
+ args[1] = &kafka.KVArg{
+ Key: "port_no",
+ Value: pNo,
+ }
+
+ success, result := ap.kafkaProxy.InvokeRPC(ctx, "get_ofp_port_info", &topic, true, args...)
+ log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
+ if success {
+ unpackResult := &ca.PortCapability{}
+ if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+ }
+ return unpackResult, nil
+ } else {
+ unpackResult := &ca.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ log.Debugw("GetOfpPortInfo-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
+ // TODO: Need to get the real error code
+ return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
+ }
+}
+
+//TODO: Implement the functions below
+
func (ap *AdapterProxy) AdapterDescriptor() (*voltha.Adapter, error) {
log.Debug("AdapterDescriptor")
return nil, nil
@@ -106,30 +216,6 @@
return nil
}
-func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) error {
- log.Debugw("ReEnableDevice", log.Fields{"deviceId": device.Id})
- rpc := "reenable_device"
- topic := kafka.Topic{Name: device.Type}
- args := make([]*kafka.KVArg, 1)
- args[0] = &kafka.KVArg{
- Key: "device",
- Value: device,
- }
- success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
- log.Debugw("ReEnableDevice-response", log.Fields{"deviceid": device.Id, "success": success})
- return unPackResponse(rpc, device.Id, success, result)
-}
-
-func (ap *AdapterProxy) RebootDevice(device *voltha.Device) error {
- log.Debug("RebootDevice")
- return nil
-}
-
-func (ap *AdapterProxy) DeleteDevice(device *voltha.Device) error {
- log.Debug("DeleteDevice")
- return nil
-}
-
func (ap *AdapterProxy) GetDeviceDetails(device voltha.Device) (*voltha.Device, error) {
log.Debug("GetDeviceDetails")
return nil, nil
@@ -193,68 +279,4 @@
func (ap *AdapterProxy) UnSuppressAlarm(filter voltha.AlarmFilter) error {
log.Debug("UnSuppressAlarm")
return nil
-}
-
-func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ca.SwitchCapability, error) {
- log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
- topic := kafka.Topic{Name: device.Type}
- args := make([]*kafka.KVArg, 1)
- args[0] = &kafka.KVArg{
- Key: "device",
- Value: device,
- }
- success, result := ap.kafkaProxy.InvokeRPC(ctx, "get_ofp_device_info", &topic, true, args...)
- log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
- if success {
- unpackResult := &ca.SwitchCapability{}
- if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
- log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
- return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
- }
- return unpackResult, nil
- } else {
- unpackResult := &ca.Error{}
- var err error
- if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
- log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
- }
- log.Debugw("GetOfpDeviceInfo-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
- // TODO: Need to get the real error code
- return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
- }
-}
-
-func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ca.PortCapability, error) {
- log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
- topic := kafka.Topic{Name: device.Type}
- args := make([]*kafka.KVArg, 2)
- args[0] = &kafka.KVArg{
- Key: "device",
- Value: device,
- }
- pNo := &ca.IntType{Val: int64(portNo)}
- args[1] = &kafka.KVArg{
- Key: "port_no",
- Value: pNo,
- }
-
- success, result := ap.kafkaProxy.InvokeRPC(ctx, "get_ofp_port_info", &topic, true, args...)
- log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
- if success {
- unpackResult := &ca.PortCapability{}
- if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
- log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
- return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
- }
- return unpackResult, nil
- } else {
- unpackResult := &ca.Error{}
- var err error
- if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
- log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
- }
- log.Debugw("GetOfpPortInfo-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
- // TODO: Need to get the real error code
- return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
- }
-}
+}
\ No newline at end of file
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 7ae9f1a..0c0609e 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -246,14 +246,12 @@
}
}
}
-
log.Debugw("ChildDeviceDetected", log.Fields{"parentDeviceId": pID.Id, "parentPortNo": portNo.Val,
"deviceType": dt.Val, "channelId": chnlId.Val})
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
-
// Run child detection in it's own go routine as it can be a lengthy process
go rhp.deviceMgr.childDeviceDetected(pID.Id, portNo.Val, dt.Val, chnlId.Val)
@@ -281,29 +279,60 @@
log.Warnw("cannot-unmarshal-operStatus", log.Fields{"error": err})
return nil, err
}
- //if operStatus.Val == -1 {
- // operStatus = nil
- //}
case "connect_status":
if err := ptypes.UnmarshalAny(arg.Value, connStatus); err != nil {
log.Warnw("cannot-unmarshal-connStatus", log.Fields{"error": err})
return nil, err
}
- //if connStatus.Val == -1 {
- // connStatus = nil
- //}
}
}
log.Debugw("DeviceStateUpdate", log.Fields{"deviceId": deviceId.Id, "oper-status": operStatus, "conn-status": connStatus})
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
-
// When the enum is not set (i.e. -1), Go still convert to the Enum type with the value being -1
go rhp.deviceMgr.updateDeviceStatus(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val), voltha.ConnectStatus_ConnectStatus(connStatus.Val))
return new(empty.Empty), nil
}
+func (rhp *AdapterRequestHandlerProxy) ChildrenStateUpdate(args []*ca.Argument) (*empty.Empty, error) {
+ if len(args) < 2 {
+ log.Warn("invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+ deviceId := &voltha.ID{}
+ operStatus := &ca.IntType{}
+ connStatus := &ca.IntType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device_id":
+ if err := ptypes.UnmarshalAny(arg.Value, deviceId); err != nil {
+ log.Warnw("cannot-unmarshal-device-id", log.Fields{"error": err})
+ return nil, err
+ }
+ case "oper_status":
+ if err := ptypes.UnmarshalAny(arg.Value, operStatus); err != nil {
+ log.Warnw("cannot-unmarshal-operStatus", log.Fields{"error": err})
+ return nil, err
+ }
+ case "connect_status":
+ if err := ptypes.UnmarshalAny(arg.Value, connStatus); err != nil {
+ log.Warnw("cannot-unmarshal-connStatus", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+ log.Debugw("ChildrenStateUpdate", log.Fields{"deviceId": deviceId.Id, "oper-status": operStatus, "conn-status": connStatus})
+ if rhp.TestMode { // Execute only for test cases
+ return nil, nil
+ }
+
+ // When the enum is not set (i.e. -1), Go still convert to the Enum type with the value being -1
+ go rhp.deviceMgr.updateChildrenStatus(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val), voltha.ConnectStatus_ConnectStatus(connStatus.Val))
+ return new(empty.Empty), nil
+}
+
func (rhp *AdapterRequestHandlerProxy) PortStateUpdate(args []*ca.Argument) (*empty.Empty, error) {
if len(args) < 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
@@ -369,21 +398,14 @@
}
}
}
-
log.Debugw("PortCreated", log.Fields{"deviceId": deviceId.Id, "port": port})
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
-
// Run port creation in its own go routine
go rhp.deviceMgr.addPort(deviceId.Id, port)
- //if err := rhp.deviceMgr.addPort(deviceId.Id, port); err != nil {
- // log.Debugw("addport-error", log.Fields{"deviceId": deviceId.Id, "error": err})
- // return nil, status.Errorf(codes.Internal, "%s", err.Error())
- //}
- // Return an Ack
return new(empty.Empty), nil
}
@@ -409,7 +431,6 @@
}
}
}
-
log.Debugw("DevicePMConfigUpdate", log.Fields{"deviceId": pmConfigs.Id, "configs": pmConfigs,
"init": init})
@@ -420,11 +441,5 @@
// Run PM config update in its own go routine
go rhp.deviceMgr.updatePmConfigs(pmConfigs.Id, pmConfigs)
- //if err := rhp.deviceMgr.updatePmConfigs(pmConfigs.Id, pmConfigs); err != nil {
- // log.Debugw("update-pmconfigs-error", log.Fields{"deviceId": pmConfigs.Id, "error": err})
- // return nil, status.Errorf(codes.Internal, "%s", err.Error())
- //}
- // Return an Ack
return new(empty.Empty), nil
-
}
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 480e32f..a8d5abf 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -62,8 +62,8 @@
log.Info("starting-core")
core.startKafkaMessagingProxy(ctx)
log.Info("values", log.Fields{"kmp": core.kmp})
- core.deviceMgr = NewDeviceManager(core.kmp, core.clusterDataProxy)
- core.logicalDeviceMgr = NewLogicalDeviceManager(core.deviceMgr, core.kmp, core.clusterDataProxy)
+ core.deviceMgr = newDeviceManager(core.kmp, core.clusterDataProxy)
+ core.logicalDeviceMgr = newLogicalDeviceManager(core.deviceMgr, core.kmp, core.clusterDataProxy)
core.registerAdapterRequestHandler(ctx, core.deviceMgr, core.logicalDeviceMgr, core.localDataProxy, core.clusterDataProxy)
go core.startDeviceManager(ctx)
go core.startLogicalDeviceManager(ctx)
@@ -137,12 +137,12 @@
// callbacks. For now, until the model is ready, devicemanager will keep a reference to the
// logicaldevicemanager to initiate the creation of logical devices
log.Info("starting-DeviceManager")
- core.deviceMgr.Start(ctx, core.logicalDeviceMgr)
+ core.deviceMgr.start(ctx, core.logicalDeviceMgr)
log.Info("started-DeviceManager")
}
func (core *Core) startLogicalDeviceManager(ctx context.Context) {
log.Info("starting-Logical-DeviceManager")
- core.logicalDeviceMgr.Start(ctx)
+ core.logicalDeviceMgr.start(ctx)
log.Info("started-Logical-DeviceManager")
}
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index d9dacbc..52ab584 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -40,6 +40,8 @@
lockDevice sync.RWMutex
}
+//newDeviceAgent creates a new device agent along as creating a unique ID for the device and set the device state to
+//preprovisioning
func newDeviceAgent(ap *AdapterProxy, device *voltha.Device, deviceMgr *DeviceManager, cdProxy *model.Proxy) *DeviceAgent {
var agent DeviceAgent
agent.adapterProxy = ap
@@ -55,6 +57,7 @@
return &agent
}
+// start save the device to the data model and registers for callbacks on that device
func (agent *DeviceAgent) start(ctx context.Context) {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
@@ -64,12 +67,12 @@
log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceId})
}
agent.deviceProxy = agent.clusterDataProxy.Root.GetProxy("/devices/"+agent.deviceId, false)
- //agent.deviceProxy = agent.clusterDataProxy.Root.Node.GetProxy("/", false)
agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate, nil)
log.Debug("device-agent-started")
}
-func (agent *DeviceAgent) Stop(ctx context.Context) {
+// stop stops the device agent. Not much to do for now
+func (agent *DeviceAgent) stop(ctx context.Context) {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
log.Debug("stopping-device-agent")
@@ -77,6 +80,7 @@
log.Debug("device-agent-stopped")
}
+// getDevice retrieves the latest device information from the data model
func (agent *DeviceAgent) getDevice() (*voltha.Device, error) {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
@@ -89,7 +93,7 @@
return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
}
-//getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
+// getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
// This function is meant so that we do not have duplicate code all over the device agent functions
func (agent *DeviceAgent) getDeviceWithoutLock() (*voltha.Device, error) {
if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
@@ -101,6 +105,7 @@
return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
}
+// enableDevice activates a preprovisioned or disable device
func (agent *DeviceAgent) enableDevice(ctx context.Context) error {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
@@ -113,6 +118,7 @@
//TODO: Needs customized error message
return nil
}
+ //TODO: if parent device is disabled then do not enable device
// Verify whether we need to adopt the device the first time
// TODO: A state machine for these state transitions would be better (we just have to handle
// a limited set of states now or it may be an overkill)
@@ -140,6 +146,7 @@
return nil
}
+//disableDevice disable a device
func (agent *DeviceAgent) disableDevice(ctx context.Context) error {
agent.lockDevice.Lock()
//defer agent.lockDevice.Unlock()
@@ -183,6 +190,70 @@
return nil
}
+func (agent *DeviceAgent) rebootDevice(ctx context.Context) error {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ log.Debugw("rebootDevice", log.Fields{"id": agent.deviceId})
+ // Get the most up to date the device info
+ if device, err := agent.getDeviceWithoutLock(); err != nil {
+ return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ if device.AdminState != voltha.AdminState_DISABLED {
+ log.Debugw("device-not-disabled", log.Fields{"id": agent.deviceId})
+ //TODO: Needs customized error message
+ return status.Errorf(codes.FailedPrecondition, "deviceId:%s, expected-admin-state:%s", agent.deviceId, voltha.AdminState_DISABLED)
+ }
+ // First send the request to an Adapter and wait for a response
+ if err := agent.adapterProxy.RebootDevice(ctx, device); err != nil {
+ log.Debugw("rebootDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ return err
+ }
+ }
+ return nil
+}
+
+func (agent *DeviceAgent) deleteDevice(ctx context.Context) error {
+ agent.lockDevice.Lock()
+ log.Debugw("deleteDevice", log.Fields{"id": agent.deviceId})
+ // Get the most up to date the device info
+ if device, err := agent.getDeviceWithoutLock(); err != nil {
+ agent.lockDevice.Unlock()
+ return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ if device.AdminState != voltha.AdminState_DISABLED {
+ log.Debugw("device-not-disabled", log.Fields{"id": agent.deviceId})
+ //TODO: Needs customized error message
+ agent.lockDevice.Unlock()
+ return status.Errorf(codes.FailedPrecondition, "deviceId:%s, expected-admin-state:%s", agent.deviceId, voltha.AdminState_DISABLED)
+ }
+ // Send the request to an Adapter and wait for a response
+ if err := agent.adapterProxy.DeleteDevice(ctx, device); err != nil {
+ log.Debugw("deleteDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ agent.lockDevice.Unlock()
+ return err
+ }
+ // Set the device Admin state to DELETED in order to trigger the callback to delete
+ // child devices, if any
+ // Received an Ack (no error found above). Now update the device in the model to the expected state
+ cloned := proto.Clone(device).(*voltha.Device)
+ cloned.AdminState = voltha.AdminState_DELETED
+ if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ agent.lockDevice.Unlock()
+ return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
+ }
+ agent.lockDevice.Unlock()
+ //TODO: callback will be invoked to handle this state change
+ //For now force the state transition to happen
+ if err := agent.deviceMgr.processTransition(device, cloned); err != nil {
+ log.Warnw("process-transition-error", log.Fields{"deviceid": device.Id, "error": err})
+ return err
+ }
+
+ }
+ return nil
+}
+
+// getPorts retrieves the ports information of the device based on the port type.
func (agent *DeviceAgent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
log.Debugw("getPorts", log.Fields{"id": agent.deviceId, "portType": portType})
ports := &voltha.Ports{}
@@ -196,6 +267,8 @@
return ports
}
+// getSwitchCapability is a helper method that a logical device agent uses to retrieve the switch capability of a
+// parent device
func (agent *DeviceAgent) getSwitchCapability(ctx context.Context) (*core_adapter.SwitchCapability, error) {
log.Debugw("getSwitchCapability", log.Fields{"deviceId": agent.deviceId})
if device, err := agent.deviceMgr.getDevice(agent.deviceId); device == nil {
@@ -211,6 +284,8 @@
}
}
+// getPortCapability is a helper method that a logical device agent uses to retrieve the port capability of a
+// device
func (agent *DeviceAgent) getPortCapability(ctx context.Context, portNo uint32) (*core_adapter.PortCapability, error) {
log.Debugw("getPortCapability", log.Fields{"deviceId": agent.deviceId})
if device, err := agent.deviceMgr.getDevice(agent.deviceId); device == nil {
@@ -226,6 +301,8 @@
}
}
+// TODO: implement when callback from the data model is ready
+// processUpdate is a callback invoked whenever there is a change on the device manages by this device agent
func (agent *DeviceAgent) processUpdate(args ...interface{}) interface{} {
log.Debug("!!!!!!!!!!!!!!!!!!!!!!!!!")
log.Debugw("processUpdate", log.Fields{"deviceId": agent.deviceId, "args": args})
@@ -405,11 +482,20 @@
break
}
}
+ //To track an issue when adding peer-port.
+ log.Debugw("before-peer-added", log.Fields{"device": cloned})
// Store the device
afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
+ //To track an issue when adding peer-port.
+ if d, ok := afterUpdate.(*voltha.Device); ok {
+ log.Debugw("after-peer-added", log.Fields{"device": d})
+ } else {
+ log.Debug("after-peer-added-incorrect-type", log.Fields{"type": reflect.ValueOf(afterUpdate).Type()})
+ }
+
return nil
}
}
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index e3dbed2..1063e29 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -42,7 +42,7 @@
lockDeviceAgentsMap sync.RWMutex
}
-func NewDeviceManager(kafkaProxy *kafka.KafkaMessagingProxy, cdProxy *model.Proxy) *DeviceManager {
+func newDeviceManager(kafkaProxy *kafka.KafkaMessagingProxy, cdProxy *model.Proxy) *DeviceManager {
var deviceMgr DeviceManager
deviceMgr.exitChannel = make(chan int, 1)
deviceMgr.deviceAgents = make(map[string]*DeviceAgent)
@@ -53,14 +53,14 @@
return &deviceMgr
}
-func (dMgr *DeviceManager) Start(ctx context.Context, logicalDeviceMgr *LogicalDeviceManager) {
+func (dMgr *DeviceManager) start(ctx context.Context, logicalDeviceMgr *LogicalDeviceManager) {
log.Info("starting-device-manager")
dMgr.logicalDeviceMgr = logicalDeviceMgr
dMgr.stateTransitions = NewTransitionMap(dMgr)
log.Info("device-manager-started")
}
-func (dMgr *DeviceManager) Stop(ctx context.Context) {
+func (dMgr *DeviceManager) stop(ctx context.Context) {
log.Info("stopping-device-manager")
dMgr.exitChannel <- 1
log.Info("device-manager-stopped")
@@ -86,7 +86,14 @@
}
}
+func (dMgr *DeviceManager) deleteDeviceAgentToMap(agent *DeviceAgent) {
+ dMgr.lockDeviceAgentsMap.Lock()
+ defer dMgr.lockDeviceAgentsMap.Unlock()
+ delete(dMgr.deviceAgents, agent.deviceId)
+}
+
func (dMgr *DeviceManager) getDeviceAgent(deviceId string) *DeviceAgent {
+ // TODO If the device is not in memory it needs to be loaded first
dMgr.lockDeviceAgentsMap.Lock()
defer dMgr.lockDeviceAgentsMap.Unlock()
if agent, ok := dMgr.deviceAgents[deviceId]; ok {
@@ -108,7 +115,6 @@
func (dMgr *DeviceManager) enableDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
log.Debugw("enableDevice", log.Fields{"deviceid": id})
-
var res interface{}
if agent := dMgr.getDeviceAgent(id.Id); agent != nil {
res = agent.enableDevice(ctx)
@@ -122,7 +128,6 @@
func (dMgr *DeviceManager) disableDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
log.Debugw("disableDevice", log.Fields{"deviceid": id})
-
var res interface{}
if agent := dMgr.getDeviceAgent(id.Id); agent != nil {
res = agent.disableDevice(ctx)
@@ -134,6 +139,34 @@
sendResponse(ctx, ch, res)
}
+func (dMgr *DeviceManager) rebootDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
+ log.Debugw("rebootDevice", log.Fields{"deviceid": id})
+ var res interface{}
+ if agent := dMgr.getDeviceAgent(id.Id); agent != nil {
+ res = agent.rebootDevice(ctx)
+ log.Debugw("rebootDevice-result", log.Fields{"result": res})
+ } else {
+ res = status.Errorf(codes.NotFound, "%s", id.Id)
+ }
+ sendResponse(ctx, ch, res)
+}
+
+func (dMgr *DeviceManager) deleteDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
+ log.Debugw("deleteDevice", log.Fields{"deviceid": id})
+ var res interface{}
+ if agent := dMgr.getDeviceAgent(id.Id); agent != nil {
+ res = agent.deleteDevice(ctx)
+ if res == nil { //Success
+ agent.stop(ctx)
+ dMgr.deleteDeviceAgentToMap(agent)
+ }
+ log.Debugw("deleteDevice-result", log.Fields{"result": res})
+ } else {
+ res = status.Errorf(codes.NotFound, "%s", id.Id)
+ }
+ sendResponse(ctx, ch, res)
+}
+
func (dMgr *DeviceManager) getDevice(id string) (*voltha.Device, error) {
log.Debugw("getDevice", log.Fields{"deviceid": id})
if agent := dMgr.getDeviceAgent(id); agent != nil {
@@ -158,7 +191,6 @@
func (dMgr *DeviceManager) updateDevice(device *voltha.Device) error {
log.Debugw("updateDevice", log.Fields{"deviceid": device.Id, "device": device})
-
if agent := dMgr.getDeviceAgent(device.Id); agent != nil {
return agent.updateDevice(device)
}
@@ -195,7 +227,6 @@
func (dMgr *DeviceManager) getSwitchCapability(ctx context.Context, deviceId string) (*core_adapter.SwitchCapability, error) {
log.Debugw("getSwitchCapability", log.Fields{"deviceid": deviceId})
-
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
return agent.getSwitchCapability(ctx)
}
@@ -204,7 +235,6 @@
func (dMgr *DeviceManager) getPorts(ctx context.Context, deviceId string, portType voltha.Port_PortType) (*voltha.Ports, error) {
log.Debugw("getPorts", log.Fields{"deviceid": deviceId, "portType": portType})
-
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
return agent.getPorts(ctx, portType), nil
}
@@ -214,7 +244,6 @@
func (dMgr *DeviceManager) getPortCapability(ctx context.Context, deviceId string, portNo uint32) (*core_adapter.PortCapability, error) {
log.Debugw("getPortCapability", log.Fields{"deviceid": deviceId})
-
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
return agent.getPortCapability(ctx, portNo)
}
@@ -229,6 +258,30 @@
return status.Errorf(codes.NotFound, "%s", deviceId)
}
+func (dMgr *DeviceManager) updateChildrenStatus(deviceId string, operStatus voltha.OperStatus_OperStatus, connStatus voltha.ConnectStatus_ConnectStatus) error {
+ log.Debugw("updateChildrenStatus", log.Fields{"parentDeviceid": deviceId, "operStatus": operStatus, "connStatus": connStatus})
+ var parentDevice *voltha.Device
+ var err error
+ if parentDevice, err = dMgr.getDevice(deviceId); err != nil {
+ return status.Errorf(codes.Aborted, "%s", err.Error())
+ }
+ var childDeviceIds []string
+ if childDeviceIds, err = dMgr.getAllChildDeviceIds(parentDevice); err != nil {
+ return status.Errorf(codes.Aborted, "%s", err.Error())
+ }
+ if len(childDeviceIds) == 0 {
+ log.Debugw("no-child-device", log.Fields{"parentDeviceId": parentDevice.Id})
+ }
+ for _, childDeviceId := range childDeviceIds {
+ if agent := dMgr.getDeviceAgent(childDeviceId); agent != nil {
+ if err = agent.updateDeviceStatus(operStatus, connStatus); err != nil {
+ return status.Errorf(codes.Aborted, "childDevice:%s, error:%s", childDeviceId, err.Error())
+ }
+ }
+ }
+ return nil
+}
+
func (dMgr *DeviceManager) updatePortState(deviceId string, portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_OperStatus) error {
log.Debugw("updatePortState", log.Fields{"deviceid": deviceId, "portType": portType, "portNo": portNo, "operStatus": operStatus})
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
@@ -274,10 +327,6 @@
return err
}
}
- //if handler != nil {
- // log.Debugw("found-handlers", log.Fields{"handlers": funcName(handler)})
- // return handler(current)
- //}
return nil
}
@@ -285,7 +334,7 @@
log.Info("createLogicalDevice")
var logicalId *string
var err error
- if logicalId, err = dMgr.logicalDeviceMgr.CreateLogicalDevice(nil, cDevice); err != nil {
+ if logicalId, err = dMgr.logicalDeviceMgr.createLogicalDevice(nil, cDevice); err != nil {
log.Warnw("createlogical-device-error", log.Fields{"device": cDevice})
return err
}
@@ -297,7 +346,7 @@
func (dMgr *DeviceManager) deleteLogicalDevice(cDevice *voltha.Device) error {
log.Info("deleteLogicalDevice")
var err error
- if err = dMgr.logicalDeviceMgr.DeleteLogicalDevice(nil, cDevice); err != nil {
+ if err = dMgr.logicalDeviceMgr.deleteLogicalDevice(nil, cDevice); err != nil {
log.Warnw("deleteLogical-device-error", log.Fields{"deviceId": cDevice.Id})
return err
}
@@ -310,13 +359,10 @@
func (dMgr *DeviceManager) deleteLogicalPort(cDevice *voltha.Device) error {
log.Info("deleteLogicalPort")
var err error
- if err = dMgr.logicalDeviceMgr.DeleteLogicalPort(nil, cDevice); err != nil {
+ if err = dMgr.logicalDeviceMgr.deleteLogicalPort(nil, cDevice); err != nil {
log.Warnw("deleteLogical-port-error", log.Fields{"deviceId": cDevice.Id})
return err
}
- //// Remove the logical device Id from the parent device
- //logicalId := ""
- //dMgr.UpdateDeviceAttribute(cDevice.Id, "ParentId", logicalId)
return nil
}
@@ -330,38 +376,75 @@
return parentDevice
}
-func (dMgr *DeviceManager) disableAllChildDevices(cDevice *voltha.Device) error {
+/*
+All the functions below are callback functions where they are invoked with the latest and previous data. We can
+therefore use the data as is without trying to get the latest from the model.
+*/
+
+//disableAllChildDevices is invoked as a callback when the parent device is disabled
+func (dMgr *DeviceManager) disableAllChildDevices(parentDevice *voltha.Device) error {
log.Debug("disableAllChildDevices")
var childDeviceIds []string
var err error
- if childDeviceIds, err = dMgr.getAllChildDeviceIds(cDevice); err != nil {
- return status.Errorf(codes.NotFound, "%s", cDevice.Id)
+ if childDeviceIds, err = dMgr.getAllChildDeviceIds(parentDevice); err != nil {
+ return status.Errorf(codes.NotFound, "%s", parentDevice.Id)
}
if len(childDeviceIds) == 0 {
- log.Debugw("no-child-device", log.Fields{"deviceId": cDevice.Id})
+ log.Debugw("no-child-device", log.Fields{"parentDeviceId": parentDevice.Id})
}
+ allChildDisable := true
for _, childDeviceId := range childDeviceIds {
if agent := dMgr.getDeviceAgent(childDeviceId); agent != nil {
if err = agent.disableDevice(nil); err != nil {
log.Errorw("failure-disable-device", log.Fields{"deviceId": childDeviceId, "error": err.Error()})
+ allChildDisable = false
}
}
}
+ if !allChildDisable {
+ return err
+ }
return nil
}
-func (dMgr *DeviceManager) getAllChildDeviceIds(cDevice *voltha.Device) ([]string, error) {
- log.Info("getAllChildDeviceIds")
- // Get latest device info
- var device *voltha.Device
+//deleteAllChildDevices is invoked as a callback when the parent device is deleted
+func (dMgr *DeviceManager) deleteAllChildDevices(parentDevice *voltha.Device) error {
+ log.Debug("deleteAllChildDevices")
+ var childDeviceIds []string
var err error
- if device, err = dMgr.getDevice(cDevice.Id); err != nil {
- return nil, status.Errorf(codes.NotFound, "%s", cDevice.Id)
+ if childDeviceIds, err = dMgr.getAllChildDeviceIds(parentDevice); err != nil {
+ return status.Errorf(codes.NotFound, "%s", parentDevice.Id)
}
+ if len(childDeviceIds) == 0 {
+ log.Debugw("no-child-device", log.Fields{"parentDeviceId": parentDevice.Id})
+ }
+ allChildDeleted := true
+ for _, childDeviceId := range childDeviceIds {
+ if agent := dMgr.getDeviceAgent(childDeviceId); agent != nil {
+ if err = agent.deleteDevice(nil); err != nil {
+ log.Errorw("failure-delete-device", log.Fields{"deviceId": childDeviceId, "error": err.Error()})
+ allChildDeleted = false
+ } else {
+ agent.stop(nil)
+ dMgr.deleteDeviceAgentToMap(agent)
+ }
+ }
+ }
+ if !allChildDeleted {
+ return err
+ }
+ return nil
+}
+
+//getAllChildDeviceIds is a helper method to get all the child device IDs from the device passed as parameter
+func (dMgr *DeviceManager) getAllChildDeviceIds(parentDevice *voltha.Device) ([]string, error) {
+ log.Debugw("getAllChildDeviceIds", log.Fields{"parentDeviceId": parentDevice.Id})
childDeviceIds := make([]string, 0)
- for _, port := range device.Ports {
- for _, peer := range port.Peers {
- childDeviceIds = append(childDeviceIds, peer.DeviceId)
+ if parentDevice != nil {
+ for _, port := range parentDevice.Ports {
+ for _, peer := range port.Peers {
+ childDeviceIds = append(childDeviceIds, peer.DeviceId)
+ }
}
}
return childDeviceIds, nil
@@ -369,7 +452,7 @@
func (dMgr *DeviceManager) addUNILogicalPort(cDevice *voltha.Device) error {
log.Info("addUNILogicalPort")
- if err := dMgr.logicalDeviceMgr.AddUNILogicalPort(nil, cDevice); err != nil {
+ if err := dMgr.logicalDeviceMgr.addUNILogicalPort(nil, cDevice); err != nil {
log.Warnw("addUNILogicalPort-error", log.Fields{"device": cDevice, "err": err})
return err
}
diff --git a/rw_core/core/device_state_transitions.go b/rw_core/core/device_state_transitions.go
index a84f03a..b3fecc0 100644
--- a/rw_core/core/device_state_transitions.go
+++ b/rw_core/core/device_state_transitions.go
@@ -112,7 +112,7 @@
handlers: []TransitionHandler{dMgr.abandonDevice}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
- deviceType: any,
+ deviceType: child,
previousState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
currentState: DeviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
handlers: []TransitionHandler{dMgr.notAllowed}})
@@ -128,14 +128,12 @@
previousState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
currentState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
handlers: []TransitionHandler{dMgr.reEnableDevice}})
-
transitionMap.transitions = append(transitionMap.transitions,
Transition{
- deviceType: any,
+ deviceType: parent,
previousState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- currentState: DeviceState{Admin: voltha.AdminState_DOWNLOADING_IMAGE, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- handlers: []TransitionHandler{dMgr.notAllowed}})
-
+ currentState: DeviceState{Admin: voltha.AdminState_DELETED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+ handlers: []TransitionHandler{dMgr.deleteAllChildDevices}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index d446438..e036998 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -40,12 +40,34 @@
logicalDeviceMgr: lDeviceMgr}
return handler
}
+
+// isTestMode is a helper function to determine a function is invoked for testing only
func isTestMode(ctx context.Context) bool {
md, _ := metadata.FromIncomingContext(ctx)
_, exist := md[common.TestModeKeys_api_test.String()]
return exist
}
+// waitForNilResponseOnSuccess is a helper function to wait for a response on channel ch where an nil
+// response is expected in a successful scenario
+func waitForNilResponseOnSuccess(ctx context.Context, ch chan interface{}) (*empty.Empty, error) {
+ select {
+ case res := <-ch:
+ if res == nil {
+ return new(empty.Empty), nil
+ } else if err, ok := res.(error); ok {
+ return new(empty.Empty), err
+ } else {
+ log.Warnw("unexpected-return-type", log.Fields{"result": res})
+ err = status.Errorf(codes.Internal, "%s", res)
+ return new(empty.Empty), err
+ }
+ case <-ctx.Done():
+ log.Debug("client-timeout")
+ return nil, ctx.Err()
+ }
+}
+
func (handler *APIHandler) UpdateLogLevel(ctx context.Context, logging *voltha.Logging) (*empty.Empty, error) {
log.Debugw("UpdateLogLevel-request", log.Fields{"newloglevel": logging.Level, "intval": int(logging.Level)})
out := new(empty.Empty)
@@ -53,7 +75,7 @@
return out, nil
}
-func processEnableDevicePort(ctx context.Context, id *voltha.LogicalPortId, ch chan error) {
+func processEnableDevicePort(ctx context.Context, id *voltha.LogicalPortId, ch chan interface{}) {
log.Debugw("processEnableDevicePort", log.Fields{"id": id, "test": common.TestModeKeys_api_test.String()})
ch <- status.Errorf(100, "%d-%s", 100, "erreur")
}
@@ -64,15 +86,10 @@
out := new(empty.Empty)
return out, nil
}
- ch := make(chan error)
+ ch := make(chan interface{})
+ defer close(ch)
go processEnableDevicePort(ctx, id, ch)
- select {
- case resp := <-ch:
- close(ch)
- return new(empty.Empty), resp
- case <-ctx.Done():
- return nil, ctx.Err()
- }
+ return waitForNilResponseOnSuccess(ctx, ch)
}
func (handler *APIHandler) DisableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
@@ -126,6 +143,7 @@
return handler.logicalDeviceMgr.listLogicalDevices()
}
+// CreateDevice creates a new parent device in the data model
func (handler *APIHandler) CreateDevice(ctx context.Context, device *voltha.Device) (*voltha.Device, error) {
log.Debugw("createdevice", log.Fields{"device": *device})
if isTestMode(ctx) {
@@ -153,75 +171,52 @@
}
}
+// EnableDevice activates a device by invoking the adopt_device API on the appropriate adapter
func (handler *APIHandler) EnableDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
log.Debugw("enabledevice", log.Fields{"id": id})
if isTestMode(ctx) {
- out := new(empty.Empty)
- return out, nil
+ return new(empty.Empty), nil
}
ch := make(chan interface{})
defer close(ch)
go handler.deviceMgr.enableDevice(ctx, id, ch)
- select {
- case res := <-ch:
- if res == nil {
- return new(empty.Empty), nil
- } else if err, ok := res.(error); ok {
- return new(empty.Empty), err
- } else {
- log.Warnw("enable-device-unexpected-return-type", log.Fields{"result": res})
- err = status.Errorf(codes.Internal, "%s", res)
- return new(empty.Empty), err
- }
- case <-ctx.Done():
- log.Debug("enabledevice-client-timeout")
- return nil, ctx.Err()
- }
+ return waitForNilResponseOnSuccess(ctx, ch)
}
+// DisableDevice disables a device along with any child device it may have
func (handler *APIHandler) DisableDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
log.Debugw("disabledevice-request", log.Fields{"id": id})
if isTestMode(ctx) {
- out := new(empty.Empty)
- return out, nil
+ return new(empty.Empty), nil
}
ch := make(chan interface{})
defer close(ch)
go handler.deviceMgr.disableDevice(ctx, id, ch)
- select {
- case res := <-ch:
- if res == nil {
- return new(empty.Empty), nil
- } else if err, ok := res.(error); ok {
- return new(empty.Empty), err
- } else {
- log.Warnw("disable-device-unexpected-return-type", log.Fields{"result": res})
- err = status.Errorf(codes.Internal, "%s", res)
- return new(empty.Empty), err
- }
- case <-ctx.Done():
- log.Debug("enabledevice-client-timeout")
- return nil, ctx.Err()
- }
- return nil, errors.New("Unimplemented")
+ return waitForNilResponseOnSuccess(ctx, ch)
}
+//RebootDevice invoked the reboot API to the corresponding adapter
func (handler *APIHandler) RebootDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
- log.Debugw("disabledevice-request", log.Fields{"id": id})
+ log.Debugw("rebootDevice-request", log.Fields{"id": id})
if isTestMode(ctx) {
- out := new(empty.Empty)
- return out, nil
+ return new(empty.Empty), nil
}
- return nil, errors.New("Unimplemented")
+ ch := make(chan interface{})
+ defer close(ch)
+ go handler.deviceMgr.rebootDevice(ctx, id, ch)
+ return waitForNilResponseOnSuccess(ctx, ch)
}
+// DeleteDevice removes a device from the data model
func (handler *APIHandler) DeleteDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
log.Debugw("deletedevice-request", log.Fields{"id": id})
if isTestMode(ctx) {
- out := new(empty.Empty)
- return out, nil
+ return new(empty.Empty), nil
}
- return nil, errors.New("Unimplemented")
+ ch := make(chan interface{})
+ defer close(ch)
+ go handler.deviceMgr.deleteDevice(ctx, id, ch)
+ return waitForNilResponseOnSuccess(ctx, ch)
}
func (handler *APIHandler) DownloadImage(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 218478f..dce2db7 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -39,7 +39,7 @@
lockLogicalDevice sync.RWMutex
}
-func NewLogicalDeviceAgent(id string, device *voltha.Device, ldeviceMgr *LogicalDeviceManager, deviceMgr *DeviceManager,
+func newLogicalDeviceAgent(id string, device *voltha.Device, ldeviceMgr *LogicalDeviceManager, deviceMgr *DeviceManager,
cdProxy *model.Proxy) *LogicalDeviceAgent {
var agent LogicalDeviceAgent
agent.exitChannel = make(chan int, 1)
@@ -52,7 +52,8 @@
return &agent
}
-func (agent *LogicalDeviceAgent) Start(ctx context.Context) error {
+// start creates the logical device and add it to the data model
+func (agent *LogicalDeviceAgent) start(ctx context.Context) error {
log.Infow("starting-logical_device-agent", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
//Build the logical device based on information retrieved from the device adapter
var switchCap *ca.SwitchCapability
@@ -97,6 +98,22 @@
return nil
}
+// stop stops the logical devuce agent. This removes the logical device from the data model.
+func (agent *LogicalDeviceAgent) stop(ctx context.Context) {
+ log.Info("stopping-logical_device-agent")
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
+ //Remove the logical device from the model
+ if removed := agent.clusterDataProxy.Remove("/logical_devices/"+agent.logicalDeviceId, ""); removed == nil {
+ log.Errorw("failed-to-remove-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
+ } else {
+ log.Debugw("logicaldevice-removed", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
+ }
+ agent.exitChannel <- 1
+ log.Info("logical_device-agent-stopped")
+}
+
+// getLogicalDevice locks the logical device model and then retrieves the latest logical device information
func (agent *LogicalDeviceAgent) getLogicalDevice() (*voltha.LogicalDevice, error) {
log.Debug("getLogicalDevice")
agent.lockLogicalDevice.Lock()
@@ -109,6 +126,8 @@
return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
}
+// getLogicalDeviceWithoutLock retrieves a logical device from the model without locking it. This is used only by
+// functions that have already acquired the logical device lock to the model
func (agent *LogicalDeviceAgent) getLogicalDeviceWithoutLock() (*voltha.LogicalDevice, error) {
log.Debug("getLogicalDeviceWithoutLock")
logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 1, false, "")
@@ -119,6 +138,7 @@
return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
}
+// addUNILogicalPort creates a UNI port on the logical device that represents a child device
func (agent *LogicalDeviceAgent) addUNILogicalPort(ctx context.Context, childDevice *voltha.Device, portNo uint32) error {
log.Infow("addUNILogicalPort-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
// Build the logical device based on information retrieved from the device adapter
@@ -179,16 +199,4 @@
return nil
}
-func (agent *LogicalDeviceAgent) Stop(ctx context.Context) {
- log.Info("stopping-logical_device-agent")
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
- //Remove the logical device from the model
- if removed := agent.clusterDataProxy.Remove("/logical_devices/"+agent.logicalDeviceId, ""); removed == nil {
- log.Errorw("failed-to-remove-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
- } else {
- log.Debugw("logicaldevice-removed", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
- }
- agent.exitChannel <- 1
- log.Info("logical_device-agent-stopped")
-}
+
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index bef078c..8f8548a 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -38,7 +38,7 @@
lockLogicalDeviceAgentsMap sync.RWMutex
}
-func NewLogicalDeviceManager(deviceMgr *DeviceManager, kafkaProxy *kafka.KafkaMessagingProxy, cdProxy *model.Proxy) *LogicalDeviceManager {
+func newLogicalDeviceManager(deviceMgr *DeviceManager, kafkaProxy *kafka.KafkaMessagingProxy, cdProxy *model.Proxy) *LogicalDeviceManager {
var logicalDeviceMgr LogicalDeviceManager
logicalDeviceMgr.exitChannel = make(chan int, 1)
logicalDeviceMgr.logicalDeviceAgents = make(map[string]*LogicalDeviceAgent)
@@ -49,12 +49,12 @@
return &logicalDeviceMgr
}
-func (ldMgr *LogicalDeviceManager) Start(ctx context.Context) {
+func (ldMgr *LogicalDeviceManager) start(ctx context.Context) {
log.Info("starting-logical-device-manager")
log.Info("logical-device-manager-started")
}
-func (ldMgr *LogicalDeviceManager) Stop(ctx context.Context) {
+func (ldMgr *LogicalDeviceManager) stop(ctx context.Context) {
log.Info("stopping-logical-device-manager")
ldMgr.exitChannel <- 1
log.Info("logical-device-manager-stopped")
@@ -105,7 +105,7 @@
return result, nil
}
-func (ldMgr *LogicalDeviceManager) CreateLogicalDevice(ctx context.Context, device *voltha.Device) (*string, error) {
+func (ldMgr *LogicalDeviceManager) createLogicalDevice(ctx context.Context, device *voltha.Device) (*string, error) {
log.Debugw("creating-logical-device", log.Fields{"deviceId": device.Id})
// Sanity check
if !device.Root {
@@ -124,15 +124,15 @@
}
log.Debugw("logical-device-id", log.Fields{"logicaldeviceId": id})
- agent := NewLogicalDeviceAgent(id, device, ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy)
+ agent := newLogicalDeviceAgent(id, device, ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy)
ldMgr.addLogicalDeviceAgentToMap(agent)
- go agent.Start(ctx)
+ go agent.start(ctx)
log.Debug("creating-logical-device-ends")
return &id, nil
}
-func (ldMgr *LogicalDeviceManager) DeleteLogicalDevice(ctx context.Context, device *voltha.Device) error {
+func (ldMgr *LogicalDeviceManager) deleteLogicalDevice(ctx context.Context, device *voltha.Device) error {
log.Debugw("deleting-logical-device", log.Fields{"deviceId": device.Id})
// Sanity check
if !device.Root {
@@ -141,7 +141,7 @@
logDeviceId := device.ParentId
if agent := ldMgr.getLogicalDeviceAgent(logDeviceId); agent != nil {
// Stop the logical device agent
- agent.Stop(ctx)
+ agent.stop(ctx)
//Remove the logical device agent from the Map
ldMgr.deleteLogicalDeviceAgent(logDeviceId)
}
@@ -165,7 +165,7 @@
}
// DeleteLogicalDevice removes the logical port associated with a child device
-func (ldMgr *LogicalDeviceManager) DeleteLogicalPort(ctx context.Context, device *voltha.Device) error {
+func (ldMgr *LogicalDeviceManager) deleteLogicalPort(ctx context.Context, device *voltha.Device) error {
log.Debugw("deleting-logical-port", log.Fields{"deviceId": device.Id})
// Sanity check
if device.Root {
@@ -184,7 +184,7 @@
return nil
}
-func (ldMgr *LogicalDeviceManager) AddUNILogicalPort(ctx context.Context, childDevice *voltha.Device) error {
+func (ldMgr *LogicalDeviceManager) addUNILogicalPort(ctx context.Context, childDevice *voltha.Device) error {
log.Debugw("AddUNILogicalPort", log.Fields{"deviceId": childDevice.Id})
// Sanity check
if childDevice.Root {