[VOL-1036] Initial implementation of device lifecycle management
Change-Id: I5aa58fdcbcd852f6f5eef35d48f25f76e20c0418
diff --git a/adapters/iadapter.py b/adapters/iadapter.py
index c6ea3ca..3388d5a 100644
--- a/adapters/iadapter.py
+++ b/adapters/iadapter.py
@@ -83,12 +83,16 @@
raise NotImplementedError()
def get_ofp_device_info(self, device):
- log.debug('get_ofp_device_info', device_id=device.id)
- return self.devices_handlers[device.id].get_ofp_device_info(device)
+ log.debug('get_ofp_device_info_start', device_id=device.id)
+ ofp_device_info = self.devices_handlers[device.id].get_ofp_device_info(device)
+ log.debug('get_ofp_device_info_ends', device_id=device.id)
+ return ofp_device_info
def get_ofp_port_info(self, device, port_no):
- log.debug('get_ofp_port_info', device_id=device.id, port_no=port_no)
- return self.devices_handlers[device.id].get_ofp_port_info(device, port_no)
+ log.debug('get_ofp_port_info_start', device_id=device.id, port_no=port_no)
+ ofp_port_info = self.devices_handlers[device.id].get_ofp_port_info(device, port_no)
+ log.debug('get_ofp_port_info_ends', device_id=device.id, port_no=port_no)
+ return ofp_port_info
def adopt_device(self, device):
log.debug('adopt_device', device_id=device.id)
@@ -105,7 +109,8 @@
def disable_device(self, device):
log.info('disable-device', device_id=device.id)
- reactor.callLater(0, self.devices_handlers[device.id].disable)
+ reactor.callLater(1, self.devices_handlers[device.id].disable)
+ log.debug('disable_device_done', device_id=device.id)
return device
def reenable_device(self, device):
diff --git a/adapters/kafka/adapter_request_facade.py b/adapters/kafka/adapter_request_facade.py
index 74ed934..2517a31 100644
--- a/adapters/kafka/adapter_request_facade.py
+++ b/adapters/kafka/adapter_request_facade.py
@@ -82,17 +82,17 @@
d = Device()
if device:
device.Unpack(d)
- return (True, self.adapter.adopt_device(d))
+ return True, self.adapter.adopt_device(d)
else:
- return (False, d)
+ return False, d
def get_ofp_device_info(self, device):
d = Device()
if device:
device.Unpack(d)
- return (True, self.adapter.get_ofp_device_info(d))
+ return True, self.adapter.get_ofp_device_info(d)
else:
- return (False, d)
+ return False, d
def get_ofp_port_info(self, device, port_no):
d = Device()
@@ -104,7 +104,7 @@
p = IntType()
port_no.Unpack(p)
- return (True, self.adapter.get_ofp_port_info(d, p.val))
+ return True, self.adapter.get_ofp_port_info(d, p.val)
def reconcile_device(self, device):
@@ -114,10 +114,20 @@
return self.adapter.abandon_device(device)
def disable_device(self, device):
- return self.adapter.disable_device(device)
+ d = Device()
+ if device:
+ device.Unpack(d)
+ return True, self.adapter.disable_device(d)
+ else:
+ return False, d
def reenable_device(self, device):
- return self.adapter.reenable_device(device)
+ d = Device()
+ if device:
+ device.Unpack(d)
+ return True, self.adapter.reenable_device(d)
+ else:
+ return False, d
def reboot_device(self, device):
d = Device()
diff --git a/adapters/kafka/core_proxy.py b/adapters/kafka/core_proxy.py
index bcc4239..32a4c9d 100644
--- a/adapters/kafka/core_proxy.py
+++ b/adapters/kafka/core_proxy.py
@@ -32,18 +32,18 @@
from adapters.common.utils.id_generation import create_cluster_logical_device_ids
from adapters.interface import IAdapterInterface
from adapters.protos import third_party
-from adapters.protos.device_pb2 import Device, Port, PmConfigs
+from adapters.protos.device_pb2 import Device, Port, Ports, PmConfigs
from adapters.protos.events_pb2 import AlarmEvent, AlarmEventType, \
AlarmEventSeverity, AlarmEventState, AlarmEventCategory
from adapters.protos.events_pb2 import KpiEvent
from adapters.protos.voltha_pb2 import DeviceGroup, LogicalDevice, \
- LogicalPort, AdminState, OperStatus, AlarmFilterRuleKey, CoreInstance
+ LogicalPort, AlarmFilterRuleKey, CoreInstance
from adapters.common.utils.registry import registry, IComponent
from adapters.common.utils.id_generation import create_cluster_device_id
import re
from adapters.interface import ICoreSouthBoundInterface
from adapters.protos.core_adapter_pb2 import StrType, BoolType, IntType
-from adapters.protos.common_pb2 import ID
+from adapters.protos.common_pb2 import ID, ConnectStatus, OperStatus
from google.protobuf.message import Message
from adapters.common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
@@ -159,8 +159,17 @@
# def add_device(self, device):
# raise NotImplementedError()
+ @wrap_request(Ports)
+ @inlineCallbacks
def get_ports(self, device_id, port_type):
- raise NotImplementedError()
+ id = ID()
+ id.id = device_id
+ p_type = IntType()
+ p_type.val = port_type
+ res = yield self.invoke(rpc="GetPorts",
+ device_id=id,
+ port_type=p_type)
+ returnValue(res)
def get_child_devices(self, parent_device_id):
raise NotImplementedError()
@@ -231,20 +240,18 @@
def device_state_update(self, device_id,
oper_status=None,
connect_status=None):
-
id = ID()
id.id = device_id
o_status = IntType()
- if oper_status:
+ if oper_status or oper_status==OperStatus.UNKNOWN:
o_status.val = oper_status
else:
o_status.val = -1
c_status = IntType()
- if connect_status:
+ if connect_status or connect_status==ConnectStatus.UNKNOWN:
c_status.val = connect_status
else:
c_status.val = -1
- a_status = IntType()
res = yield self.invoke(rpc="DeviceStateUpdate",
device_id=id,
@@ -254,34 +261,52 @@
@wrap_request(None)
@inlineCallbacks
+ def port_state_update(self,
+ device_id,
+ port_type,
+ port_no,
+ oper_status):
+ id = ID()
+ id.id = device_id
+ pt = IntType()
+ pt.val = port_type
+ pNo = IntType()
+ pNo.val = port_no
+ o_status = IntType()
+ o_status.val = oper_status
+
+ res = yield self.invoke(rpc="PortStateUpdate",
+ device_id=id,
+ port_type=pt,
+ port_no=pNo,
+ oper_status=o_status)
+ returnValue(res)
+
+
+
+ @wrap_request(None)
+ @inlineCallbacks
def child_devices_state_update(self, parent_device_id,
oper_status=None,
- connect_status=None,
- admin_state=None):
+ connect_status=None):
id = ID()
id.id = parent_device_id
o_status = IntType()
- if oper_status:
+ if oper_status or oper_status==OperStatus.UNKNOWN:
o_status.val = oper_status
else:
o_status.val = -1
c_status = IntType()
- if connect_status:
+ if connect_status or connect_status==ConnectStatus.UNKNOWN:
c_status.val = connect_status
else:
c_status.val = -1
- a_status = IntType()
- if admin_state:
- a_status.val = admin_state
- else:
- a_status.val = -1
res = yield self.invoke(rpc="child_devices_state_update",
parent_device_id=id,
oper_status=o_status,
- connect_status=c_status,
- admin_state=a_status)
+ connect_status=c_status)
returnValue(res)
diff --git a/adapters/ponsim_olt/ponsim_olt.py b/adapters/ponsim_olt/ponsim_olt.py
index 5e096b4..95bafaa 100644
--- a/adapters/ponsim_olt/ponsim_olt.py
+++ b/adapters/ponsim_olt/ponsim_olt.py
@@ -25,7 +25,7 @@
import structlog
from scapy.layers.l2 import Ether, Dot1Q
from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks
+from twisted.internet.defer import inlineCallbacks, returnValue
from grpc._channel import _Rendezvous
from adapters.common.frameio.frameio import BpfProgramFilter, hexify
@@ -245,11 +245,10 @@
self.log.info('grpc-channel-closed')
+ @inlineCallbacks
def _get_nni_port(self):
- ports = self.adapter_agent.get_ports(self.device_id, Port.ETHERNET_NNI)
- if ports:
- # For now, we use on one NNI port
- return ports[0]
+ ports = yield self.adapter_agent.get_ports(self.device_id, Port.ETHERNET_NNI)
+ returnValue(ports)
@inlineCallbacks
def activate(self, device):
@@ -272,7 +271,8 @@
device.vendor = 'ponsim'
device.model = 'n/a'
device.serial_number = device.host_and_port
- device.connect_status = ConnectStatus.REACHABLE
+ device.mac_address = "AA:BB:CC:DD:EE:FF"
+ # device.connect_status = ConnectStatus.REACHABLE
yield self.adapter_agent.device_update(device)
# Now set the initial PM configuration for this device
@@ -288,7 +288,7 @@
port_no=info.nni_port,
label='NNI facing Ethernet port',
type=Port.ETHERNET_NNI,
- admin_state=AdminState.ENABLED,
+ # admin_state=AdminState.ENABLED,
oper_status=OperStatus.ACTIVE
)
self.nni_port = nni_port
@@ -297,10 +297,11 @@
port_no=1,
label='PON port',
type=Port.PON_OLT,
- admin_state=AdminState.ENABLED,
+ # admin_state=AdminState.ENABLED,
oper_status=OperStatus.ACTIVE
))
- yield self.adapter_agent.device_state_update(device.id, oper_status=OperStatus.ACTIVE)
+
+ yield self.adapter_agent.device_state_update(device.id, connect_status=ConnectStatus.REACHABLE, oper_status=OperStatus.ACTIVE)
# register ONUS
self.log.info('onu-found', onus=info.onus, len=len(info.onus))
@@ -319,7 +320,7 @@
# TODO
# Start collecting stats from the device after a brief pause
- # self.start_kpi_collection(device.id)
+ self.start_kpi_collection(device.id)
except Exception as e:
log.exception("Exception-activating", e=e)
@@ -577,138 +578,69 @@
log.info('self-test-device', device=device.id)
raise NotImplementedError()
+
+ @inlineCallbacks
def disable(self):
self.log.info('disabling', device_id=self.device_id)
self.stop_kpi_collection()
- # Get the latest device reference
- device = self.adapter_agent.get_device(self.device_id)
-
- # Update the operational status to UNKNOWN
- device.oper_status = OperStatus.UNKNOWN
- device.connect_status = ConnectStatus.UNREACHABLE
- self.adapter_agent.device_update(device)
-
- # Remove the logical device
- logical_device = self.adapter_agent.get_logical_device(
- self.logical_device_id)
- self.adapter_agent.delete_logical_device(logical_device)
-
- # Disable all child devices first
- self.adapter_agent.update_child_devices_state(self.device_id,
- admin_state=AdminState.DISABLED)
-
- # Remove the peer references from this device
- self.adapter_agent.delete_all_peer_references(self.device_id)
-
- # Set all ports to disabled
- self.adapter_agent.disable_all_ports(self.device_id)
+ # Update the operational status to UNKNOWN and connection status to UNREACHABLE
+ yield self.adapter_agent.device_state_update(self.device_id, oper_status=OperStatus.UNKNOWN, connect_status=ConnectStatus.UNREACHABLE)
self.close_channel()
self.log.info('disabled-grpc-channel')
- # Update the logice device mapping
- if self.logical_device_id in \
- self.adapter.logical_device_id_to_root_device_id:
- del self.adapter.logical_device_id_to_root_device_id[
- self.logical_device_id]
-
# TODO:
# 1) Remove all flows from the device
# 2) Remove the device from ponsim
- self.log.info('disabled', device_id=device.id)
+ self.log.info('disabled', device_id=self.device_id)
+ @inlineCallbacks
def reenable(self):
self.log.info('re-enabling', device_id=self.device_id)
- # Get the latest device reference
- device = self.adapter_agent.get_device(self.device_id)
-
# Set the ofp_port_no and nni_port in case we bypassed the reconcile
# process if the device was in DISABLED state on voltha restart
if not self.ofp_port_no and not self.nni_port:
- stub = ponsim_pb2.PonSimStub(self.get_channel())
+ yield self.get_channel()
+ stub = ponsim_pb2.PonSimStub(self.channel)
info = stub.GetDeviceInfo(Empty())
log.info('got-info', info=info)
self.ofp_port_no = info.nni_port
- self.nni_port = self._get_nni_port()
+ ports = yield self._get_nni_port()
+ # For ponsim, we are using only 1 NNI port
+ if ports.items:
+ self.nni_port = ports.items[0]
- # Update the connect status to REACHABLE
- device.connect_status = ConnectStatus.REACHABLE
- self.adapter_agent.device_update(device)
+ # Update the state of the NNI port
+ yield self.adapter_agent.port_state_update(self.device_id,
+ port_type=Port.ETHERNET_NNI,
+ port_no=self.ofp_port_no,
+ oper_status=OperStatus.ACTIVE)
- # Set all ports to enabled
- self.adapter_agent.enable_all_ports(self.device_id)
+ # Update the state of the PON port
+ yield self.adapter_agent.port_state_update(self.device_id,
+ port_type=Port.PON_OLT,
+ port_no=1,
+ oper_status=OperStatus.ACTIVE)
- ld = LogicalDevice(
- # not setting id and datapth_id will let the adapter agent pick id
- desc=ofp_desc(
- hw_desc='simulated pon',
- sw_desc='simulated pon',
- serial_num=uuid4().hex,
- dp_desc='n/a'
- ),
- switch_features=ofp_switch_features(
- n_buffers=256, # TODO fake for now
- n_tables=2, # TODO ditto
- capabilities=( # TODO and ditto
- OFPC_FLOW_STATS
- | OFPC_TABLE_STATS
- | OFPC_PORT_STATS
- | OFPC_GROUP_STATS
- )
- ),
- root_device_id=device.id
- )
- mac_address = "AA:BB:CC:DD:EE:FF"
- ld_initialized = self.adapter_agent.create_logical_device(ld,
- dpid=mac_address)
- cap = OFPPF_1GB_FD | OFPPF_FIBER
- self.adapter_agent.add_logical_port(ld_initialized.id, LogicalPort(
- id='nni',
- ofp_port=ofp_port(
- port_no=self.ofp_port_no,
- hw_addr=mac_str_to_tuple(
- '00:00:00:00:00:%02x' % self.ofp_port_no),
- name='nni',
- config=0,
- state=OFPPS_LIVE,
- curr=cap,
- advertised=cap,
- peer=cap,
- curr_speed=OFPPF_1GB_FD,
- max_speed=OFPPF_1GB_FD
- ),
- device_id=device.id,
- device_port_no=self.nni_port.port_no,
- root_port=True
- ))
+ # Set the operational state of the device to ACTIVE and connect status to REACHABLE
+ yield self.adapter_agent.device_state_update(self.device_id,
+ connect_status=ConnectStatus.REACHABLE,
+ oper_status=OperStatus.ACTIVE)
- device = self.adapter_agent.get_device(device.id)
- device.parent_id = ld_initialized.id
- device.oper_status = OperStatus.ACTIVE
- self.adapter_agent.device_update(device)
- self.logical_device_id = ld_initialized.id
+ # TODO: establish frame grpc-stream
+ # yield reactor.callInThread(self.rcv_grpc)
- # Reenable all child devices
- self.adapter_agent.update_child_devices_state(device.id,
- admin_state=AdminState.ENABLED)
+ self.start_kpi_collection(self.device_id)
- # establish frame grpc-stream
- reactor.callInThread(self.rcv_grpc)
-
- self.start_kpi_collection(device.id)
-
- self.log.info('re-enabled', device_id=device.id)
+ self.log.info('re-enabled', device_id=self.device_id)
def delete(self):
self.log.info('deleting', device_id=self.device_id)
- # Remove all child devices
- self.adapter_agent.delete_all_child_devices(self.device_id)
-
self.close_channel()
self.log.info('disabled-grpc-channel')
@@ -725,7 +657,7 @@
try:
# Step 1: gather metrics from device
port_metrics = \
- self.pm_metrics.collect_port_metrics(self.get_channel())
+ self.pm_metrics.collect_port_metrics(self.channel)
# Step 2: prepare the KpiEvent for submission
# we can time-stamp them here (or could use time derived from OLT
diff --git a/adapters/ponsim_onu/ponsim_onu.py b/adapters/ponsim_onu/ponsim_onu.py
index 7e35c7f..d1b2e27 100644
--- a/adapters/ponsim_onu/ponsim_onu.py
+++ b/adapters/ponsim_onu/ponsim_onu.py
@@ -20,7 +20,7 @@
import sys
import structlog
-from twisted.internet.defer import DeferredQueue, inlineCallbacks
+from twisted.internet.defer import DeferredQueue, inlineCallbacks, returnValue
from adapters.common.utils.asleep import asleep
from adapters.iadapter import OnuAdapter
@@ -92,10 +92,10 @@
device.root = False
device.vendor = 'ponsim'
device.model = 'n/a'
- device.connect_status = ConnectStatus.REACHABLE
+ # device.connect_status = ConnectStatus.REACHABLE
yield self.adapter_agent.device_update(device)
- # register physical ports
+ # register physical ports
self.uni_port = Port(
port_no=2,
label='UNI facing Ethernet port',
@@ -119,7 +119,7 @@
self.adapter_agent.port_created(device.id, self.uni_port)
self.adapter_agent.port_created(device.id, self.pon_port)
- yield self.adapter_agent.device_state_update(device.id, oper_status=OperStatus.ACTIVE)
+ yield self.adapter_agent.device_state_update(device.id, connect_status=ConnectStatus.REACHABLE, oper_status=OperStatus.ACTIVE)
def get_ofp_port_info(self, device, port_no):
@@ -147,17 +147,27 @@
)
)
- def _get_uni_port(self):
- ports = self.adapter_agent.get_ports(self.device_id, Port.ETHERNET_UNI)
- if ports:
- # For now, we use on one uni port
- return ports[0]
+ # def _get_uni_port(self):
+ # ports = self.adapter_agent.get_ports(self.device_id, Port.ETHERNET_UNI)
+ # if ports:
+ # # For now, we use on one uni port
+ # return ports[0]
+ @inlineCallbacks
+ def _get_uni_port(self):
+ ports = yield self.adapter_agent.get_ports(self.device_id, Port.ETHERNET_UNI)
+ returnValue(ports)
+
+ @inlineCallbacks
def _get_pon_port(self):
- ports = self.adapter_agent.get_ports(self.device_id, Port.PON_ONU)
- if ports:
- # For now, we use on one uni port
- return ports[0]
+ ports = yield self.adapter_agent.get_ports(self.device_id, Port.PON_ONU)
+ returnValue(ports)
+
+ # def _get_pon_port(self):
+ # ports = self.adapter_agent.get_ports(self.device_id, Port.PON_ONU)
+ # if ports:
+ # # For now, we use on one uni port
+ # return ports[0]
def reconcile(self, device):
self.log.info('reconciling-ONU-device-starts')
@@ -250,114 +260,104 @@
log.info('self-test-device', device=device.id)
raise NotImplementedError()
+
+ @inlineCallbacks
def disable(self):
self.log.info('disabling', device_id=self.device_id)
- # Get the latest device reference
- device = self.adapter_agent.get_device(self.device_id)
-
- # Disable all ports on that device
- self.adapter_agent.disable_all_ports(self.device_id)
-
# Update the device operational status to UNKNOWN
- device.oper_status = OperStatus.UNKNOWN
- device.connect_status = ConnectStatus.UNREACHABLE
- self.adapter_agent.update_device(device)
-
- # Remove the uni logical port from the OLT, if still present
- parent_device = self.adapter_agent.get_device(device.parent_id)
- assert parent_device
- logical_device_id = parent_device.parent_id
- assert logical_device_id
- port_no = device.proxy_address.channel_id
- port_id = 'uni-{}'.format(port_no)
- try:
- port = self.adapter_agent.get_logical_port(logical_device_id,
- port_id)
- self.adapter_agent.delete_logical_port(logical_device_id, port)
- except KeyError:
- self.log.info('logical-port-not-found', device_id=self.device_id,
- portid=port_id)
-
- # Remove pon port from parent
- self.pon_port = self._get_pon_port()
- self.adapter_agent.delete_port_reference_from_parent(self.device_id,
- self.pon_port)
-
- # Just updating the port status may be an option as well
- # port.ofp_port.config = OFPPC_NO_RECV
- # yield self.adapter_agent.update_logical_port(logical_device_id,
- # port)
- # Unregister for proxied message
- self.adapter_agent.unregister_for_proxied_messages(
- device.proxy_address)
+ yield self.adapter_agent.device_state_update(self.device_id, oper_status=OperStatus.UNKNOWN, connect_status=ConnectStatus.UNREACHABLE)
# TODO:
# 1) Remove all flows from the device
# 2) Remove the device from ponsim
- self.log.info('disabled', device_id=device.id)
+ self.log.info('disabled', device_id=self.device_id)
+ @inlineCallbacks
def reenable(self):
self.log.info('re-enabling', device_id=self.device_id)
try:
# Get the latest device reference
- device = self.adapter_agent.get_device(self.device_id)
+ # device = self.adapter_agent.get_device(self.device_id)
# First we verify that we got parent reference and proxy info
- assert device.parent_id
- assert device.proxy_address.device_id
- assert device.proxy_address.channel_id
+ # assert device.parent_id
+ # assert device.proxy_address.device_id
+ # assert device.proxy_address.channel_id
# Re-register for proxied messages right away
- self.proxy_address = device.proxy_address
- self.adapter_agent.register_for_proxied_messages(
- device.proxy_address)
+ # self.proxy_address = device.proxy_address
+ # self.adapter_agent.register_for_proxied_messages(
+ # device.proxy_address)
- # Re-enable the ports on that device
- self.adapter_agent.enable_all_ports(self.device_id)
+ # Refresh the port reference - we only use one port for now
+ ports = yield self._get_uni_port()
+ self.log.info('re-enabling-uni-ports', ports=ports)
+ if ports.items:
+ self.uni_port = ports.items[0]
- # Refresh the port reference
- self.uni_port = self._get_uni_port()
- self.pon_port = self._get_pon_port()
+ ports = yield self._get_pon_port()
+ self.log.info('re-enabling-pon-ports', ports=ports)
+ if ports.items:
+ self.pon_port = ports.items[0]
+
+ # Update the state of the UNI port
+ yield self.adapter_agent.port_state_update(self.device_id,
+ port_type=Port.ETHERNET_UNI,
+ port_no=self.uni_port.port_no,
+ oper_status=OperStatus.ACTIVE)
+
+ # Update the state of the PON port
+ yield self.adapter_agent.port_state_update(self.device_id,
+ port_type=Port.PON_ONU,
+ port_no=self.pon_port.port_no,
+ oper_status=OperStatus.ACTIVE)
+
+
+ # # Re-enable the ports on that device
+ # self.adapter_agent.enable_all_ports(self.device_id)
+
# Add the pon port reference to the parent
- self.adapter_agent.add_port_reference_to_parent(device.id,
- self.pon_port)
+ # self.adapter_agent.add_port_reference_to_parent(device.id,
+ # self.pon_port)
# Update the connect status to REACHABLE
- device.connect_status = ConnectStatus.REACHABLE
- self.adapter_agent.update_device(device)
+ # device.connect_status = ConnectStatus.REACHABLE
+ # self.adapter_agent.update_device(device)
# re-add uni port to logical device
- parent_device = self.adapter_agent.get_device(device.parent_id)
- logical_device_id = parent_device.parent_id
- assert logical_device_id
- port_no = device.proxy_address.channel_id
- cap = OFPPF_1GB_FD | OFPPF_FIBER
- self.adapter_agent.add_logical_port(logical_device_id, LogicalPort(
- id='uni-{}'.format(port_no),
- ofp_port=ofp_port(
- port_no=port_no,
- hw_addr=mac_str_to_tuple('00:00:00:00:00:%02x' % port_no),
- name='uni-{}'.format(port_no),
- config=0,
- state=OFPPS_LIVE,
- curr=cap,
- advertised=cap,
- peer=cap,
- curr_speed=OFPPF_1GB_FD,
- max_speed=OFPPF_1GB_FD
- ),
- device_id=device.id,
- device_port_no=self.uni_port.port_no
- ))
+ # parent_device = self.adapter_agent.get_device(device.parent_id)
+ # logical_device_id = parent_device.parent_id
+ # assert logical_device_id
+ # port_no = device.proxy_address.channel_id
+ # cap = OFPPF_1GB_FD | OFPPF_FIBER
+ # self.adapter_agent.add_logical_port(logical_device_id, LogicalPort(
+ # id='uni-{}'.format(port_no),
+ # ofp_port=ofp_port(
+ # port_no=port_no,
+ # hw_addr=mac_str_to_tuple('00:00:00:00:00:%02x' % port_no),
+ # name='uni-{}'.format(port_no),
+ # config=0,
+ # state=OFPPS_LIVE,
+ # curr=cap,
+ # advertised=cap,
+ # peer=cap,
+ # curr_speed=OFPPF_1GB_FD,
+ # max_speed=OFPPF_1GB_FD
+ # ),
+ # device_id=device.id,
+ # device_port_no=self.uni_port.port_no
+ # ))
- device = self.adapter_agent.get_device(device.id)
- device.oper_status = OperStatus.ACTIVE
- self.adapter_agent.update_device(device)
+ # device = self.adapter_agent.get_device(device.id)
+ # device.oper_status = OperStatus.ACTIVE
+ # self.adapter_agent.update_device(device)
- self.log.info('re-enabled', device_id=device.id)
+ yield self.adapter_agent.device_state_update(self.device_id, oper_status=OperStatus.ACTIVE, connect_status=ConnectStatus.REACHABLE)
+
+ self.log.info('re-enabled', device_id=self.device_id)
except Exception, e:
self.log.exception('error-reenabling', e=e)
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 619bd17..78a8a5a 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -49,7 +49,7 @@
DefaultReturnErrors = true
DefaultConsumerMaxwait = 50
DefaultMaxProcessingTime = 100
- DefaultRequestTimeout = 200 // 200 milliseconds - to handle a wider latency range
+ DefaultRequestTimeout = 500 // 500 milliseconds - to handle a wider latency range
)
type consumerChannels struct {
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index a0e25f3..bccb227 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -18,6 +18,7 @@
import (
"context"
"github.com/golang/protobuf/ptypes"
+ a "github.com/golang/protobuf/ptypes/any"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/kafka"
ca "github.com/opencord/voltha-go/protos/core_adapter"
@@ -37,28 +38,47 @@
return &proxy
}
+func unPackResponse(rpc string, deviceId string, success bool, response *a.Any) error {
+ if success {
+ return nil
+ } else {
+ unpackResult := &ca.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
+ log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ log.Debugw("response", log.Fields{"rpc": rpc, "deviceId": deviceId, "success": success, "error": err})
+ // TODO: Need to get the real error code
+ return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
+ }
+}
+
func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) error {
log.Debugw("AdoptDevice", log.Fields{"device": device})
+ rpc := "adopt_device"
topic := kafka.Topic{Name: device.Type}
args := make([]*kafka.KVArg, 1)
args[0] = &kafka.KVArg{
Key: "device",
Value: device,
}
- success, result := ap.kafkaProxy.InvokeRPC(ctx, "adopt_device", &topic, true, args...)
- log.Debugw("AdoptDevice-response", log.Fields{"deviceid": device.Id, "success": success, "result": result})
- if success {
- return nil
- } else {
- unpackResult := &ca.Error{}
- var err error
- if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
- log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
- }
- log.Debugw("AdoptDevice-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
- // TODO: Need to get the real error code
- return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
+ success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
+ log.Debugw("AdoptDevice-response", log.Fields{"deviceid": device.Id, "success": success})
+ return unPackResponse(rpc, device.Id, success, result)
+}
+
+func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) error {
+ log.Debugw("DisableDevice", log.Fields{"deviceId": device.Id})
+ rpc := "disable_device"
+ topic := kafka.Topic{Name: device.Type}
+ args := make([]*kafka.KVArg, 1)
+ args[0] = &kafka.KVArg{
+ Key: "device",
+ Value: device,
}
+ success, result := ap.kafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+ log.Debugw("DisableDevice-response", log.Fields{"deviceId": device.Id, "success": success})
+ return unPackResponse(rpc, device.Id, success, result)
}
func (ap *AdapterProxy) AdapterDescriptor() (*voltha.Adapter, error) {
@@ -76,7 +96,7 @@
return nil, nil
}
-func (ap *AdapterProxy) ReconcileDevice(device voltha.Device) error {
+func (ap *AdapterProxy) ReconcileDevice(device *voltha.Device) error {
log.Debug("ReconcileDevice")
return nil
}
@@ -86,22 +106,26 @@
return nil
}
-func (ap *AdapterProxy) DisableDevice(device voltha.Device) error {
- log.Debug("DisableDevice")
- return nil
+func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) error {
+ log.Debugw("ReEnableDevice", log.Fields{"deviceId": device.Id})
+ rpc := "reenable_device"
+ topic := kafka.Topic{Name: device.Type}
+ args := make([]*kafka.KVArg, 1)
+ args[0] = &kafka.KVArg{
+ Key: "device",
+ Value: device,
+ }
+ success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
+ log.Debugw("ReEnableDevice-response", log.Fields{"deviceid": device.Id, "success": success})
+ return unPackResponse(rpc, device.Id, success, result)
}
-func (ap *AdapterProxy) ReEnableDevice(device voltha.Device) error {
- log.Debug("ReEnableDevice")
- return nil
-}
-
-func (ap *AdapterProxy) RebootDevice(device voltha.Device) error {
+func (ap *AdapterProxy) RebootDevice(device *voltha.Device) error {
log.Debug("RebootDevice")
return nil
}
-func (ap *AdapterProxy) DeleteDevice(device voltha.Device) error {
+func (ap *AdapterProxy) DeleteDevice(device *voltha.Device) error {
log.Debug("DeleteDevice")
return nil
}
@@ -172,7 +196,7 @@
}
func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ca.SwitchCapability, error) {
- log.Debugw("GetOfpDeviceInfo", log.Fields{"device": device})
+ log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
topic := kafka.Topic{Name: device.Type}
args := make([]*kafka.KVArg, 1)
args[0] = &kafka.KVArg{
@@ -180,7 +204,7 @@
Value: device,
}
success, result := ap.kafkaProxy.InvokeRPC(ctx, "get_ofp_device_info", &topic, true, args...)
- log.Debugw("GetOfpDeviceInfo-response", log.Fields{"device": device, "success": success, "result": result})
+ log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
if success {
unpackResult := &ca.SwitchCapability{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
@@ -201,7 +225,7 @@
}
func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ca.PortCapability, error) {
- log.Debug("GetOfpPortInfo", log.Fields{"device": device})
+ log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
topic := kafka.Topic{Name: device.Type}
args := make([]*kafka.KVArg, 2)
args[0] = &kafka.KVArg{
@@ -215,7 +239,7 @@
}
success, result := ap.kafkaProxy.InvokeRPC(ctx, "get_ofp_port_info", &topic, true, args...)
- log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "device": device, "success": success, "result": result})
+ log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
if success {
unpackResult := &ca.PortCapability{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index bfc4ee4..7ae9f1a 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -25,6 +25,7 @@
"github.com/opencord/voltha-go/protos/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ "reflect"
)
type AdapterRequestHandlerProxy struct {
@@ -89,6 +90,24 @@
}
}
+// updatePartialDeviceData updates a subset of a device that an Adapter can update.
+// TODO: May need a specific proto to handle only a subset of a device that can be changed by an adapter
+func (rhp *AdapterRequestHandlerProxy) mergeDeviceInfoFromAdapter(device *voltha.Device) (*voltha.Device, error) {
+ // First retrieve the most up to date device info
+ var currentDevice *voltha.Device
+ var err error
+ if currentDevice, err = rhp.deviceMgr.getDevice(device.Id); err != nil {
+ return nil, err
+ }
+ cloned := reflect.ValueOf(currentDevice).Elem().Interface().(voltha.Device)
+ cloned.Root = device.Root
+ cloned.Vendor = device.Vendor
+ cloned.Model = device.Model
+ cloned.SerialNumber = device.SerialNumber
+ cloned.MacAddress = device.MacAddress
+ return &cloned, nil
+}
+
func (rhp *AdapterRequestHandlerProxy) DeviceUpdate(args []*ca.Argument) (*empty.Empty, error) {
if len(args) != 1 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
@@ -100,15 +119,21 @@
log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
return nil, err
}
- log.Debugw("DeviceUpdate", log.Fields{"device": device})
+ log.Debugw("DeviceUpdate", log.Fields{"deviceId": device.Id})
if rhp.TestMode { // Execute only for test cases
return new(empty.Empty), nil
}
- if err := rhp.deviceMgr.updateDevice(device); err != nil {
- log.Debugw("DeviceUpdate-error", log.Fields{"device": device, "error": err})
+
+ //Merge the adapter device info (only the ones an adapter can change) with the latest device data
+ if updatedDevice, err := rhp.mergeDeviceInfoFromAdapter(device); err != nil {
return nil, status.Errorf(codes.Internal, "%s", err.Error())
+ } else {
+ // An adapter request needs an Ack without having to wait for the update to be
+ // completed. We therefore run the update in its own routine.
+ go rhp.deviceMgr.updateDevice(updatedDevice)
}
+
return new(empty.Empty), nil
}
@@ -137,28 +162,30 @@
err := errors.New("invalid-number-of-args")
return nil, err
}
- pID := &voltha.ID{}
- if err := ptypes.UnmarshalAny(args[0].Value, pID); err != nil {
- log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
- return nil, err
- }
- // Porttype is an enum sent as an integer proto
+ deviceId := &voltha.ID{}
pt := &ca.IntType{}
- if err := ptypes.UnmarshalAny(args[1].Value, pt); err != nil {
- log.Warnw("cannot-unmarshal-porttype", log.Fields{"error": err})
- return nil, err
+ for _, arg := range args {
+ switch arg.Key {
+ case "device_id":
+ if err := ptypes.UnmarshalAny(arg.Value, deviceId); err != nil {
+ log.Warnw("cannot-unmarshal-device-id", log.Fields{"error": err})
+ return nil, err
+ }
+ case "port_type":
+ if err := ptypes.UnmarshalAny(arg.Value, pt); err != nil {
+ log.Warnw("cannot-unmarshal-porttype", log.Fields{"error": err})
+ return nil, err
+ }
+ }
}
-
- log.Debugw("GetPorts", log.Fields{"deviceID": pID.Id, "portype": pt.Val})
-
+ log.Debugw("GetPorts", log.Fields{"deviceID": deviceId.Id, "portype": pt.Val})
if rhp.TestMode { // Execute only for test cases
aPort := &voltha.Port{Label: "test_port"}
allPorts := &voltha.Ports{}
allPorts.Items = append(allPorts.Items, aPort)
return allPorts, nil
}
- return nil, nil
-
+ return rhp.deviceMgr.getPorts(nil, deviceId.Id, voltha.Port_PortType(pt.Val))
}
func (rhp *AdapterRequestHandlerProxy) GetChildDevices(args []*ca.Argument) (*voltha.Device, error) {
@@ -254,29 +281,69 @@
log.Warnw("cannot-unmarshal-operStatus", log.Fields{"error": err})
return nil, err
}
- if operStatus.Val == -1 {
- operStatus = nil
- }
+ //if operStatus.Val == -1 {
+ // operStatus = nil
+ //}
case "connect_status":
if err := ptypes.UnmarshalAny(arg.Value, connStatus); err != nil {
log.Warnw("cannot-unmarshal-connStatus", log.Fields{"error": err})
return nil, err
}
- if connStatus.Val == -1 {
- connStatus = nil
- }
+ //if connStatus.Val == -1 {
+ // connStatus = nil
+ //}
}
}
-
log.Debugw("DeviceStateUpdate", log.Fields{"deviceId": deviceId.Id, "oper-status": operStatus, "conn-status": connStatus})
-
if rhp.TestMode { // Execute only for test cases
return nil, nil
}
- if err := rhp.deviceMgr.updateDeviceState(deviceId.Id, operStatus, connStatus); err != nil {
- log.Debugw("DeviceUpdate-error", log.Fields{"deviceId": deviceId.Id, "error": err})
- return nil, status.Errorf(codes.Internal, "%s", err.Error())
+
+ // When the enum is not set (i.e. -1), Go still convert to the Enum type with the value being -1
+ go rhp.deviceMgr.updateDeviceStatus(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val), voltha.ConnectStatus_ConnectStatus(connStatus.Val))
+ return new(empty.Empty), nil
+}
+
+func (rhp *AdapterRequestHandlerProxy) PortStateUpdate(args []*ca.Argument) (*empty.Empty, error) {
+ if len(args) < 2 {
+ log.Warn("invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
}
+ deviceId := &voltha.ID{}
+ portType := &ca.IntType{}
+ portNo := &ca.IntType{}
+ operStatus := &ca.IntType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device_id":
+ if err := ptypes.UnmarshalAny(arg.Value, deviceId); err != nil {
+ log.Warnw("cannot-unmarshal-device-id", log.Fields{"error": err})
+ return nil, err
+ }
+ case "oper_status":
+ if err := ptypes.UnmarshalAny(arg.Value, operStatus); err != nil {
+ log.Warnw("cannot-unmarshal-operStatus", log.Fields{"error": err})
+ return nil, err
+ }
+ case "port_type":
+ if err := ptypes.UnmarshalAny(arg.Value, portType); err != nil {
+ log.Warnw("cannot-unmarshal-porttype", log.Fields{"error": err})
+ return nil, err
+ }
+ case "port_no":
+ if err := ptypes.UnmarshalAny(arg.Value, portNo); err != nil {
+ log.Warnw("cannot-unmarshal-portno", log.Fields{"error": err})
+ return nil, err
+ }
+
+ }
+ }
+ log.Debugw("PortStateUpdate", log.Fields{"deviceId": deviceId.Id, "operStatus": operStatus, "portType": portType, "portNo": portNo})
+ if rhp.TestMode { // Execute only for test cases
+ return nil, nil
+ }
+ go rhp.deviceMgr.updatePortState(deviceId.Id, voltha.Port_PortType(portType.Val), uint32(portNo.Val), voltha.OperStatus_OperStatus(operStatus.Val))
return new(empty.Empty), nil
}
@@ -309,10 +376,14 @@
return nil, nil
}
- if err := rhp.deviceMgr.addPort(deviceId.Id, port); err != nil {
- log.Debugw("addport-error", log.Fields{"deviceId": deviceId.Id, "error": err})
- return nil, status.Errorf(codes.Internal, "%s", err.Error())
- }
+ // Run port creation in its own go routine
+ go rhp.deviceMgr.addPort(deviceId.Id, port)
+
+ //if err := rhp.deviceMgr.addPort(deviceId.Id, port); err != nil {
+ // log.Debugw("addport-error", log.Fields{"deviceId": deviceId.Id, "error": err})
+ // return nil, status.Errorf(codes.Internal, "%s", err.Error())
+ //}
+ // Return an Ack
return new(empty.Empty), nil
}
@@ -346,10 +417,14 @@
return nil, nil
}
- if err := rhp.deviceMgr.updatePmConfigs(pmConfigs.Id, pmConfigs); err != nil {
- log.Debugw("update-pmconfigs-error", log.Fields{"deviceId": pmConfigs.Id, "error": err})
- return nil, status.Errorf(codes.Internal, "%s", err.Error())
- }
+ // Run PM config update in its own go routine
+ go rhp.deviceMgr.updatePmConfigs(pmConfigs.Id, pmConfigs)
+
+ //if err := rhp.deviceMgr.updatePmConfigs(pmConfigs.Id, pmConfigs); err != nil {
+ // log.Debugw("update-pmconfigs-error", log.Fields{"deviceId": pmConfigs.Id, "error": err})
+ // return nil, status.Errorf(codes.Internal, "%s", err.Error())
+ //}
+ // Return an Ack
return new(empty.Empty), nil
}
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 06f3ca3..480e32f 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -24,7 +24,6 @@
"github.com/opencord/voltha-go/protos/voltha"
"github.com/opencord/voltha-go/rw_core/config"
"google.golang.org/grpc"
- "reflect"
)
type Core struct {
@@ -35,8 +34,8 @@
grpcNBIAPIHanfler *APIHandler
config *config.RWCoreFlags
kmp *kafka.KafkaMessagingProxy
- clusterDataRoot *model.Root
- localDataRoot *model.Root
+ clusterDataRoot model.Root
+ localDataRoot model.Root
clusterDataProxy *model.Proxy
localDataProxy *model.Proxy
exitChannel chan int
@@ -52,10 +51,10 @@
core.exitChannel = make(chan int, 1)
core.config = cf
// TODO: Setup the KV store
- core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, nil, reflect.TypeOf(model.NonPersistedRevision{}))
- core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, nil, reflect.TypeOf(model.NonPersistedRevision{}))
- core.clusterDataProxy = core.clusterDataRoot.Node.GetProxy("/", false)
- core.localDataProxy = core.localDataRoot.Node.GetProxy("/", false)
+ core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, nil)
+ core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, nil)
+ core.clusterDataProxy = core.clusterDataRoot.GetProxy("/", false)
+ core.localDataProxy = core.localDataRoot.GetProxy("/", false)
return &core
}
@@ -79,7 +78,7 @@
log.Info("core-stopped")
}
-//startGRPCService creates the grpc service handler, registers it to the grpc server
+//startGRPCService creates the grpc service handlers, registers it to the grpc server
// and starts the server
func (core *Core) startGRPCService(ctx context.Context) {
// create an insecure gserver server
@@ -129,7 +128,7 @@
requestProxy := NewAdapterRequestHandlerProxy(dMgr, ldMgr, cdProxy, ldProxy)
core.kmp.SubscribeWithTarget(kafka.Topic{Name: core.config.CoreTopic}, requestProxy)
- log.Info("request-handler")
+ log.Info("request-handlers")
return nil
}
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 805dd21..d9dacbc 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -17,6 +17,9 @@
import (
"context"
+ "reflect"
+ "sync"
+
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/model"
@@ -24,7 +27,6 @@
"github.com/opencord/voltha-go/protos/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
- "reflect"
)
type DeviceAgent struct {
@@ -33,64 +35,160 @@
adapterProxy *AdapterProxy
deviceMgr *DeviceManager
clusterDataProxy *model.Proxy
+ deviceProxy *model.Proxy
exitChannel chan int
+ lockDevice sync.RWMutex
}
func newDeviceAgent(ap *AdapterProxy, device *voltha.Device, deviceMgr *DeviceManager, cdProxy *model.Proxy) *DeviceAgent {
var agent DeviceAgent
- device.Id = CreateDeviceId()
- agent.deviceId = device.Id
agent.adapterProxy = ap
- agent.lastData = device
+ cloned := (proto.Clone(device)).(*voltha.Device)
+ cloned.Id = CreateDeviceId()
+ cloned.AdminState = voltha.AdminState_PREPROVISIONED
+ agent.deviceId = cloned.Id
+ agent.lastData = cloned
agent.deviceMgr = deviceMgr
agent.exitChannel = make(chan int, 1)
agent.clusterDataProxy = cdProxy
+ agent.lockDevice = sync.RWMutex{}
return &agent
}
func (agent *DeviceAgent) start(ctx context.Context) {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
log.Debugw("starting-device-agent", log.Fields{"device": agent.lastData})
// Add the initial device to the local model
if added := agent.clusterDataProxy.Add("/devices", agent.lastData, ""); added == nil {
log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceId})
}
+ agent.deviceProxy = agent.clusterDataProxy.Root.GetProxy("/devices/"+agent.deviceId, false)
+ //agent.deviceProxy = agent.clusterDataProxy.Root.Node.GetProxy("/", false)
+ agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate, nil)
log.Debug("device-agent-started")
}
func (agent *DeviceAgent) Stop(ctx context.Context) {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
log.Debug("stopping-device-agent")
agent.exitChannel <- 1
log.Debug("device-agent-stopped")
}
+func (agent *DeviceAgent) getDevice() (*voltha.Device, error) {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
+ if d, ok := device.(*voltha.Device); ok {
+ cloned := proto.Clone(d).(*voltha.Device)
+ return cloned, nil
+ }
+ }
+ return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+}
+
+//getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
+// This function is meant so that we do not have duplicate code all over the device agent functions
+func (agent *DeviceAgent) getDeviceWithoutLock() (*voltha.Device, error) {
+ if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
+ if d, ok := device.(*voltha.Device); ok {
+ cloned := proto.Clone(d).(*voltha.Device)
+ return cloned, nil
+ }
+ }
+ return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+}
+
func (agent *DeviceAgent) enableDevice(ctx context.Context) error {
- log.Debugw("enableDevice", log.Fields{"id": agent.lastData.Id, "device": agent.lastData})
- // Update the device status
- if device, err := agent.deviceMgr.getDevice(agent.deviceId); err != nil {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ log.Debugw("enableDevice", log.Fields{"id": agent.deviceId})
+ if device, err := agent.getDeviceWithoutLock(); err != nil {
return status.Errorf(codes.NotFound, "%s", agent.deviceId)
} else {
- cloned := reflect.ValueOf(device).Elem().Interface().(voltha.Device)
- cloned.AdminState = voltha.AdminState_ENABLED
- cloned.OperStatus = voltha.OperStatus_ACTIVATING
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, &cloned, false, ""); afterUpdate == nil {
- return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
- } else {
- if err := agent.adapterProxy.AdoptDevice(ctx, &cloned); err != nil {
- log.Debugw("enableDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ if device.AdminState == voltha.AdminState_ENABLED {
+ log.Debugw("device-already-enabled", log.Fields{"id": agent.deviceId})
+ //TODO: Needs customized error message
+ return nil
+ }
+ // Verify whether we need to adopt the device the first time
+ // TODO: A state machine for these state transitions would be better (we just have to handle
+ // a limited set of states now or it may be an overkill)
+ if device.AdminState == voltha.AdminState_PREPROVISIONED {
+ // First send the request to an Adapter and wait for a response
+ if err := agent.adapterProxy.AdoptDevice(ctx, device); err != nil {
+ log.Debugw("adoptDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
return err
}
- agent.lastData = &cloned
+ } else {
+ // First send the request to an Adapter and wait for a response
+ if err := agent.adapterProxy.ReEnableDevice(ctx, device); err != nil {
+ log.Debugw("renableDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ return err
+ }
+ }
+ // Received an Ack (no error found above). Now update the device in the model to the expected state
+ cloned := proto.Clone(device).(*voltha.Device)
+ cloned.AdminState = voltha.AdminState_ENABLED
+ cloned.OperStatus = voltha.OperStatus_ACTIVATING
+ if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
}
}
return nil
}
-func (agent *DeviceAgent) getNNIPorts(ctx context.Context) *voltha.Ports {
- log.Debugw("getNNIPorts", log.Fields{"id": agent.deviceId})
+func (agent *DeviceAgent) disableDevice(ctx context.Context) error {
+ agent.lockDevice.Lock()
+ //defer agent.lockDevice.Unlock()
+ log.Debugw("disableDevice", log.Fields{"id": agent.deviceId})
+ // Get the most up to date the device info
+ if device, err := agent.getDeviceWithoutLock(); err != nil {
+ return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ if device.AdminState == voltha.AdminState_DISABLED {
+ log.Debugw("device-already-disabled", log.Fields{"id": agent.deviceId})
+ //TODO: Needs customized error message
+ agent.lockDevice.Unlock()
+ return nil
+ }
+ // First send the request to an Adapter and wait for a response
+ if err := agent.adapterProxy.DisableDevice(ctx, device); err != nil {
+ log.Debugw("disableDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ agent.lockDevice.Unlock()
+ return err
+ }
+ // Received an Ack (no error found above). Now update the device in the model to the expected state
+ cloned := proto.Clone(device).(*voltha.Device)
+ cloned.AdminState = voltha.AdminState_DISABLED
+ // Set the state of all ports on that device to disable
+ for _, port := range cloned.Ports {
+ port.AdminState = voltha.AdminState_DISABLED
+ port.OperStatus = voltha.OperStatus_UNKNOWN
+ }
+ if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ agent.lockDevice.Unlock()
+ return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
+ }
+ agent.lockDevice.Unlock()
+ //TODO: callback will be invoked to handle this state change
+ //For now force the state transition to happen
+ if err := agent.deviceMgr.processTransition(device, cloned); err != nil {
+ log.Warnw("process-transition-error", log.Fields{"deviceid": device.Id, "error": err})
+ return err
+ }
+ }
+ return nil
+}
+
+func (agent *DeviceAgent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
+ log.Debugw("getPorts", log.Fields{"id": agent.deviceId, "portType": portType})
ports := &voltha.Ports{}
if device, _ := agent.deviceMgr.getDevice(agent.deviceId); device != nil {
for _, port := range device.Ports {
- if port.Type == voltha.Port_ETHERNET_NNI {
+ if port.Type == portType {
ports.Items = append(ports.Items, port)
}
}
@@ -128,15 +226,25 @@
}
}
+func (agent *DeviceAgent) processUpdate(args ...interface{}) interface{} {
+ log.Debug("!!!!!!!!!!!!!!!!!!!!!!!!!")
+ log.Debugw("processUpdate", log.Fields{"deviceId": agent.deviceId, "args": args})
+ return nil
+}
+
func (agent *DeviceAgent) updateDevice(device *voltha.Device) error {
+ agent.lockDevice.Lock()
+ //defer agent.lockDevice.Unlock()
log.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
// Get the dev info from the model
- if storedData, err := agent.deviceMgr.getDevice(device.Id); err != nil {
+ if storedData, err := agent.getDeviceWithoutLock(); err != nil {
+ agent.lockDevice.Unlock()
return status.Errorf(codes.NotFound, "%s", device.Id)
} else {
// store the changed data
- cloned := (proto.Clone(device)).(*voltha.Device)
+ cloned := proto.Clone(device).(*voltha.Device)
afterUpdate := agent.clusterDataProxy.Update("/devices/"+device.Id, cloned, false, "")
+ agent.lockDevice.Unlock()
if afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", device.Id)
}
@@ -149,26 +257,77 @@
}
}
-func (agent *DeviceAgent) updateDeviceState(operState *core_adapter.IntType, connState *core_adapter.IntType) error {
+func (agent *DeviceAgent) updateDeviceStatus(operStatus voltha.OperStatus_OperStatus, connStatus voltha.ConnectStatus_ConnectStatus) error {
+ agent.lockDevice.Lock()
+ //defer agent.lockDevice.Unlock()
// Work only on latest data
- if storeDevice, err := agent.deviceMgr.getDevice(agent.deviceId); err != nil {
+ if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
+ agent.lockDevice.Unlock()
return status.Errorf(codes.NotFound, "%s", agent.deviceId)
} else {
// clone the device
- cloned := reflect.ValueOf(storeDevice).Elem().Interface().(voltha.Device)
- if operState != nil {
- cloned.OperStatus = voltha.OperStatus_OperStatus(operState.Val)
+ cloned := proto.Clone(storeDevice).(*voltha.Device)
+ // Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
+ if s, ok := voltha.ConnectStatus_ConnectStatus_value[connStatus.String()]; ok {
+ log.Debugw("updateDeviceStatus-conn", log.Fields{"ok": ok, "val": s})
+ cloned.ConnectStatus = connStatus
}
- if connState != nil {
- cloned.ConnectStatus = voltha.ConnectStatus_ConnectStatus(connState.Val)
+ if s, ok := voltha.OperStatus_OperStatus_value[operStatus.String()]; ok {
+ log.Debugw("updateDeviceStatus-oper", log.Fields{"ok": ok, "val": s})
+ cloned.OperStatus = operStatus
}
- log.Debugw("DeviceStateUpdate-device", log.Fields{"device": cloned})
+ log.Debugw("updateDeviceStatus", log.Fields{"deviceId": cloned.Id, "operStatus": cloned.OperStatus, "connectStatus": cloned.ConnectStatus})
// Store the device
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, &cloned, false, ""); afterUpdate == nil {
+ if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ agent.lockDevice.Unlock()
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
+ agent.lockDevice.Unlock()
// Perform the state transition
- if err := agent.deviceMgr.processTransition(storeDevice, &cloned); err != nil {
+ if err := agent.deviceMgr.processTransition(storeDevice, cloned); err != nil {
+ log.Warnw("process-transition-error", log.Fields{"deviceid": agent.deviceId, "error": err})
+ return err
+ }
+ return nil
+ }
+}
+
+func (agent *DeviceAgent) updatePortState(portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_OperStatus) error {
+ agent.lockDevice.Lock()
+ //defer agent.lockDevice.Unlock()
+ // Work only on latest data
+ // TODO: Get list of ports from device directly instead of the entire device
+ if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
+ agent.lockDevice.Unlock()
+ return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ // clone the device
+ cloned := proto.Clone(storeDevice).(*voltha.Device)
+ // Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
+ if _, ok := voltha.Port_PortType_value[portType.String()]; !ok {
+ agent.lockDevice.Unlock()
+ return status.Errorf(codes.InvalidArgument, "%s", portType)
+ }
+ for _, port := range cloned.Ports {
+ if port.Type == portType && port.PortNo == portNo {
+ port.OperStatus = operStatus
+ // Set the admin status to ENABLED if the operational status is ACTIVE
+ // TODO: Set by northbound system?
+ if operStatus == voltha.OperStatus_ACTIVE {
+ port.AdminState = voltha.AdminState_ENABLED
+ }
+ break
+ }
+ }
+ log.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
+ // Store the device
+ if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ agent.lockDevice.Unlock()
+ return status.Errorf(codes.Internal, "%s", agent.deviceId)
+ }
+ agent.lockDevice.Unlock()
+ // Perform the state transition
+ if err := agent.deviceMgr.processTransition(storeDevice, cloned); err != nil {
log.Warnw("process-transition-error", log.Fields{"deviceid": agent.deviceId, "error": err})
return err
}
@@ -177,17 +336,18 @@
}
func (agent *DeviceAgent) updatePmConfigs(pmConfigs *voltha.PmConfigs) error {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
log.Debug("updatePmConfigs")
// Work only on latest data
- if storeDevice, err := agent.deviceMgr.getDevice(agent.deviceId); err != nil {
+ if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
return status.Errorf(codes.NotFound, "%s", agent.deviceId)
} else {
// clone the device
- cloned := reflect.ValueOf(storeDevice).Elem().Interface().(voltha.Device)
- cp := proto.Clone(pmConfigs)
- cloned.PmConfigs = cp.(*voltha.PmConfigs)
+ cloned := proto.Clone(storeDevice).(*voltha.Device)
+ cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
// Store the device
- afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, &cloned, false, "")
+ afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
@@ -196,21 +356,57 @@
}
func (agent *DeviceAgent) addPort(port *voltha.Port) error {
- log.Debug("addPort")
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ log.Debugw("addPort", log.Fields{"deviceId": agent.deviceId})
// Work only on latest data
- if storeDevice, err := agent.deviceMgr.getDevice(agent.deviceId); err != nil {
+ if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
return status.Errorf(codes.NotFound, "%s", agent.deviceId)
} else {
// clone the device
- cloned := reflect.ValueOf(storeDevice).Elem().Interface().(voltha.Device)
+ cloned := proto.Clone(storeDevice).(*voltha.Device)
if cloned.Ports == nil {
// First port
+ log.Debugw("addPort-first-port-to-add", log.Fields{"deviceId": agent.deviceId})
cloned.Ports = make([]*voltha.Port, 0)
}
- cp := proto.Clone(port)
- cloned.Ports = append(cloned.Ports, cp.(*voltha.Port))
+ cp := proto.Clone(port).(*voltha.Port)
+ // Set the admin state of the port to ENABLE if the operational state is ACTIVE
+ // TODO: Set by northbound system?
+ if cp.OperStatus == voltha.OperStatus_ACTIVE {
+ cp.AdminState = voltha.AdminState_ENABLED
+ }
+ cloned.Ports = append(cloned.Ports, cp)
// Store the device
- afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, &cloned, false, "")
+ afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
+ if afterUpdate == nil {
+ return status.Errorf(codes.Internal, "%s", agent.deviceId)
+ }
+ return nil
+ }
+}
+
+func (agent *DeviceAgent) addPeerPort(port *voltha.Port_PeerPort) error {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ log.Debug("addPeerPort")
+ // Work only on latest data
+ if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
+ return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ // clone the device
+ cloned := proto.Clone(storeDevice).(*voltha.Device)
+ // Get the peer port on the device based on the port no
+ for _, peerPort := range cloned.Ports {
+ if peerPort.PortNo == port.PortNo { // found port
+ cp := proto.Clone(port).(*voltha.Port_PeerPort)
+ peerPort.Peers = append(peerPort.Peers, cp)
+ log.Debugw("found-peer", log.Fields{"portNo": port.PortNo, "deviceId": agent.deviceId})
+ break
+ }
+ }
+ // Store the device
+ afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
@@ -220,12 +416,14 @@
// TODO: A generic device update by attribute
func (agent *DeviceAgent) updateDeviceAttribute(name string, value interface{}) {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
if value == nil {
return
}
var storeDevice *voltha.Device
var err error
- if storeDevice, err = agent.deviceMgr.getDevice(agent.deviceId); err != nil {
+ if storeDevice, err = agent.getDeviceWithoutLock(); err != nil {
return
}
updated := false
@@ -247,10 +445,10 @@
}
}
}
- log.Debugw("update-field-status", log.Fields{"device": storeDevice, "name": name, "updated": updated})
+ log.Debugw("update-field-status", log.Fields{"deviceId": storeDevice.Id, "name": name, "updated": updated})
// Save the data
- cloned := reflect.ValueOf(storeDevice).Elem().Interface().(voltha.Device)
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, &cloned, false, ""); afterUpdate == nil {
+ cloned := proto.Clone(storeDevice).(*voltha.Device)
+ if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
log.Warnw("attribute-update-failed", log.Fields{"attribute": name, "value": value})
}
return
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index fd18c10..e3dbed2 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -18,6 +18,7 @@
import (
"context"
"errors"
+ "github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/kafka"
@@ -95,18 +96,18 @@
}
func (dMgr *DeviceManager) createDevice(ctx context.Context, device *voltha.Device, ch chan interface{}) {
- log.Debugw("createDevice-start", log.Fields{"device": device, "aproxy": dMgr.adapterProxy})
+ log.Debugw("createDevice", log.Fields{"device": device, "aproxy": dMgr.adapterProxy})
// Create and start a device agent for that device
agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy)
dMgr.addDeviceAgentToMap(agent)
agent.start(ctx)
- sendResponse(ctx, ch, nil)
+ sendResponse(ctx, ch, agent.lastData)
}
func (dMgr *DeviceManager) enableDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
- log.Debugw("enableDevice-start", log.Fields{"deviceid": id})
+ log.Debugw("enableDevice", log.Fields{"deviceid": id})
var res interface{}
if agent := dMgr.getDeviceAgent(id.Id); agent != nil {
@@ -119,33 +120,44 @@
sendResponse(ctx, ch, res)
}
-func (dMgr *DeviceManager) getDevice(id string) (*voltha.Device, error) {
- log.Debugw("getDevice-start", log.Fields{"deviceid": id})
+func (dMgr *DeviceManager) disableDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
+ log.Debugw("disableDevice", log.Fields{"deviceid": id})
- if device := dMgr.clusterDataProxy.Get("/devices/"+id, 1, false, ""); device == nil {
- return nil, status.Errorf(codes.NotFound, "%s", id)
+ var res interface{}
+ if agent := dMgr.getDeviceAgent(id.Id); agent != nil {
+ res = agent.disableDevice(ctx)
+ log.Debugw("disableDevice-result", log.Fields{"result": res})
} else {
- cloned := reflect.ValueOf(device).Elem().Interface().(voltha.Device)
- return &cloned, nil
+ res = status.Errorf(codes.NotFound, "%s", id.Id)
}
+
+ sendResponse(ctx, ch, res)
+}
+
+func (dMgr *DeviceManager) getDevice(id string) (*voltha.Device, error) {
+ log.Debugw("getDevice", log.Fields{"deviceid": id})
+ if agent := dMgr.getDeviceAgent(id); agent != nil {
+ return agent.getDevice()
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", id)
}
func (dMgr *DeviceManager) ListDevices() (*voltha.Devices, error) {
- log.Debug("ListDevices-start")
+ log.Debug("ListDevices")
result := &voltha.Devices{}
dMgr.lockDeviceAgentsMap.Lock()
defer dMgr.lockDeviceAgentsMap.Unlock()
for _, agent := range dMgr.deviceAgents {
- if device := dMgr.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
- cloned := reflect.ValueOf(device).Elem().Interface().(voltha.Device)
- result.Items = append(result.Items, &cloned)
+ if device, err := agent.getDevice(); err == nil {
+ cloned := proto.Clone(device).(*voltha.Device)
+ result.Items = append(result.Items, cloned)
}
}
return result, nil
}
func (dMgr *DeviceManager) updateDevice(device *voltha.Device) error {
- log.Debugw("updateDevice-start", log.Fields{"deviceid": device.Id, "device": device})
+ log.Debugw("updateDevice", log.Fields{"deviceid": device.Id, "device": device})
if agent := dMgr.getDeviceAgent(device.Id); agent != nil {
return agent.updateDevice(device)
@@ -155,9 +167,23 @@
func (dMgr *DeviceManager) addPort(deviceId string, port *voltha.Port) error {
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
- return agent.addPort(port)
+ if err := agent.addPort(port); err != nil {
+ return err
+ }
+ // Setup peer ports
+ meAsPeer := &voltha.Port_PeerPort{DeviceId: deviceId, PortNo: port.PortNo}
+ for _, peerPort := range port.Peers {
+ if agent := dMgr.getDeviceAgent(peerPort.DeviceId); agent != nil {
+ if err := agent.addPeerPort(meAsPeer); err != nil {
+ log.Errorw("failed-to-add-peer", log.Fields{"peer-device-id": peerPort.DeviceId})
+ return err
+ }
+ }
+ }
+ return nil
+ } else {
+ return status.Errorf(codes.NotFound, "%s", deviceId)
}
- return status.Errorf(codes.NotFound, "%s", deviceId)
}
func (dMgr *DeviceManager) updatePmConfigs(deviceId string, pmConfigs *voltha.PmConfigs) error {
@@ -168,7 +194,7 @@
}
func (dMgr *DeviceManager) getSwitchCapability(ctx context.Context, deviceId string) (*core_adapter.SwitchCapability, error) {
- log.Debugw("getSwitchCapability-start", log.Fields{"deviceid": deviceId})
+ log.Debugw("getSwitchCapability", log.Fields{"deviceid": deviceId})
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
return agent.getSwitchCapability(ctx)
@@ -176,17 +202,18 @@
return nil, status.Errorf(codes.NotFound, "%s", deviceId)
}
-func (dMgr *DeviceManager) getNNIPorts(ctx context.Context, deviceId string) (*voltha.Ports, error) {
- log.Debugw("getNNIPorts-start", log.Fields{"deviceid": deviceId})
+func (dMgr *DeviceManager) getPorts(ctx context.Context, deviceId string, portType voltha.Port_PortType) (*voltha.Ports, error) {
+ log.Debugw("getPorts", log.Fields{"deviceid": deviceId, "portType": portType})
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
- return agent.getNNIPorts(ctx), nil
+ return agent.getPorts(ctx, portType), nil
}
return nil, status.Errorf(codes.NotFound, "%s", deviceId)
+
}
func (dMgr *DeviceManager) getPortCapability(ctx context.Context, deviceId string, portNo uint32) (*core_adapter.PortCapability, error) {
- log.Debugw("getPortCapability-start", log.Fields{"deviceid": deviceId})
+ log.Debugw("getPortCapability", log.Fields{"deviceid": deviceId})
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
return agent.getPortCapability(ctx, portNo)
@@ -194,20 +221,27 @@
return nil, status.Errorf(codes.NotFound, "%s", deviceId)
}
-func (dMgr *DeviceManager) updateDeviceState(deviceId string, operState *core_adapter.IntType, connState *core_adapter.IntType) error {
- log.Debugw("updateDeviceState-start", log.Fields{"deviceid": deviceId, "operState": operState, "connState": connState})
+func (dMgr *DeviceManager) updateDeviceStatus(deviceId string, operStatus voltha.OperStatus_OperStatus, connStatus voltha.ConnectStatus_ConnectStatus) error {
+ log.Debugw("updateDeviceStatus", log.Fields{"deviceid": deviceId, "operStatus": operStatus, "connStatus": connStatus})
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
- return agent.updateDeviceState(operState, connState)
+ return agent.updateDeviceStatus(operStatus, connStatus)
+ }
+ return status.Errorf(codes.NotFound, "%s", deviceId)
+}
+
+func (dMgr *DeviceManager) updatePortState(deviceId string, portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_OperStatus) error {
+ log.Debugw("updatePortState", log.Fields{"deviceid": deviceId, "portType": portType, "portNo": portNo, "operStatus": operStatus})
+ if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+ return agent.updatePortState(portType, portNo, operStatus)
}
return status.Errorf(codes.NotFound, "%s", deviceId)
}
func (dMgr *DeviceManager) childDeviceDetected(parentDeviceId string, parentPortNo int64, deviceType string, channelId int64) error {
- log.Debugw("childDeviceDetected-start", log.Fields{"parentDeviceId": parentDeviceId})
+ log.Debugw("childDeviceDetected", log.Fields{"parentDeviceId": parentDeviceId})
// Create the ONU device
childDevice := &voltha.Device{}
- childDevice.Id = CreateDeviceId()
childDevice.Type = deviceType
childDevice.ParentId = parentDeviceId
childDevice.ParentPortNo = uint32(parentPortNo)
@@ -220,7 +254,7 @@
agent.start(nil)
// Activate the child device
- if agent := dMgr.getDeviceAgent(childDevice.Id); agent != nil {
+ if agent := dMgr.getDeviceAgent(agent.deviceId); agent != nil {
return agent.enableDevice(nil)
}
@@ -229,16 +263,25 @@
func (dMgr *DeviceManager) processTransition(previous *voltha.Device, current *voltha.Device) error {
// This will be triggered on every update to the device.
- handler := dMgr.stateTransitions.GetTransitionHandler(previous, current)
- if handler != nil {
- log.Debugw("found-handler", log.Fields{"handler": funcName(handler)})
- return handler(previous, current)
+ handlers := dMgr.stateTransitions.GetTransitionHandler(previous, current)
+ if handlers == nil {
+ log.Debugw("handlers-not-found", log.Fields{"deviceId": current.Id})
+ return nil
}
- log.Debugw("handler-not-found", log.Fields{"deviceId": current.Id})
+ for _, handler := range handlers {
+ log.Debugw("running-handler", log.Fields{"handler": funcName(handler)})
+ if err := handler(current); err != nil {
+ return err
+ }
+ }
+ //if handler != nil {
+ // log.Debugw("found-handlers", log.Fields{"handlers": funcName(handler)})
+ // return handler(current)
+ //}
return nil
}
-func (dMgr *DeviceManager) createLogicalDevice(pDevice *voltha.Device, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) createLogicalDevice(cDevice *voltha.Device) error {
log.Info("createLogicalDevice")
var logicalId *string
var err error
@@ -251,7 +294,80 @@
return nil
}
-func (dMgr *DeviceManager) addUNILogicalPort(pDevice *voltha.Device, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) deleteLogicalDevice(cDevice *voltha.Device) error {
+ log.Info("deleteLogicalDevice")
+ var err error
+ if err = dMgr.logicalDeviceMgr.DeleteLogicalDevice(nil, cDevice); err != nil {
+ log.Warnw("deleteLogical-device-error", log.Fields{"deviceId": cDevice.Id})
+ return err
+ }
+ // Remove the logical device Id from the parent device
+ logicalId := ""
+ dMgr.UpdateDeviceAttribute(cDevice.Id, "ParentId", logicalId)
+ return nil
+}
+
+func (dMgr *DeviceManager) deleteLogicalPort(cDevice *voltha.Device) error {
+ log.Info("deleteLogicalPort")
+ var err error
+ if err = dMgr.logicalDeviceMgr.DeleteLogicalPort(nil, cDevice); err != nil {
+ log.Warnw("deleteLogical-port-error", log.Fields{"deviceId": cDevice.Id})
+ return err
+ }
+ //// Remove the logical device Id from the parent device
+ //logicalId := ""
+ //dMgr.UpdateDeviceAttribute(cDevice.Id, "ParentId", logicalId)
+ return nil
+}
+
+func (dMgr *DeviceManager) getParentDevice(childDevice *voltha.Device) *voltha.Device {
+ // Sanity check
+ if childDevice.Root {
+ // childDevice is the parent device
+ return childDevice
+ }
+ parentDevice, _ := dMgr.getDevice(childDevice.ParentId)
+ return parentDevice
+}
+
+func (dMgr *DeviceManager) disableAllChildDevices(cDevice *voltha.Device) error {
+ log.Debug("disableAllChildDevices")
+ var childDeviceIds []string
+ var err error
+ if childDeviceIds, err = dMgr.getAllChildDeviceIds(cDevice); err != nil {
+ return status.Errorf(codes.NotFound, "%s", cDevice.Id)
+ }
+ if len(childDeviceIds) == 0 {
+ log.Debugw("no-child-device", log.Fields{"deviceId": cDevice.Id})
+ }
+ for _, childDeviceId := range childDeviceIds {
+ if agent := dMgr.getDeviceAgent(childDeviceId); agent != nil {
+ if err = agent.disableDevice(nil); err != nil {
+ log.Errorw("failure-disable-device", log.Fields{"deviceId": childDeviceId, "error": err.Error()})
+ }
+ }
+ }
+ return nil
+}
+
+func (dMgr *DeviceManager) getAllChildDeviceIds(cDevice *voltha.Device) ([]string, error) {
+ log.Info("getAllChildDeviceIds")
+ // Get latest device info
+ var device *voltha.Device
+ var err error
+ if device, err = dMgr.getDevice(cDevice.Id); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", cDevice.Id)
+ }
+ childDeviceIds := make([]string, 0)
+ for _, port := range device.Ports {
+ for _, peer := range port.Peers {
+ childDeviceIds = append(childDeviceIds, peer.DeviceId)
+ }
+ }
+ return childDeviceIds, nil
+}
+
+func (dMgr *DeviceManager) addUNILogicalPort(cDevice *voltha.Device) error {
log.Info("addUNILogicalPort")
if err := dMgr.logicalDeviceMgr.AddUNILogicalPort(nil, cDevice); err != nil {
log.Warnw("addUNILogicalPort-error", log.Fields{"device": cDevice, "err": err})
@@ -260,32 +376,32 @@
return nil
}
-func (dMgr *DeviceManager) activateDevice(pDevice *voltha.Device, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) activateDevice(cDevice *voltha.Device) error {
log.Info("activateDevice")
return nil
}
-func (dMgr *DeviceManager) disableDevice(pDevice *voltha.Device, cDevice *voltha.Device) error {
- log.Info("disableDevice")
+func (dMgr *DeviceManager) disableDeviceHandler(cDevice *voltha.Device) error {
+ log.Info("disableDevice-donothing")
return nil
}
-func (dMgr *DeviceManager) abandonDevice(pDevice *voltha.Device, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) abandonDevice(cDevice *voltha.Device) error {
log.Info("abandonDevice")
return nil
}
-func (dMgr *DeviceManager) reEnableDevice(pDevice *voltha.Device, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) reEnableDevice(cDevice *voltha.Device) error {
log.Info("reEnableDevice")
return nil
}
-func (dMgr *DeviceManager) noOp(pDevice *voltha.Device, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) noOp(cDevice *voltha.Device) error {
log.Info("noOp")
return nil
}
-func (dMgr *DeviceManager) notAllowed(pDevice *voltha.Device, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) notAllowed(pcDevice *voltha.Device) error {
log.Info("notAllowed")
return errors.New("Transition-not-allowed")
}
@@ -304,7 +420,7 @@
func (dMgr *DeviceManager) GetParentDeviceId(deviceId string) *string {
if device, _ := dMgr.getDevice(deviceId); device != nil {
- log.Infow("GetParentDeviceId", log.Fields{"device": device})
+ log.Infow("GetParentDeviceId", log.Fields{"deviceId": device.Id, "parentId": device.ParentId})
return &device.ParentId
}
return nil
diff --git a/rw_core/core/device_state_transitions.go b/rw_core/core/device_state_transitions.go
index 0f0239c..a84f03a 100644
--- a/rw_core/core/device_state_transitions.go
+++ b/rw_core/core/device_state_transitions.go
@@ -34,13 +34,13 @@
Operational voltha.OperStatus_OperStatus
}
-type TransitionHandler func(*voltha.Device, *voltha.Device) error
+type TransitionHandler func(*voltha.Device) error
type Transition struct {
deviceType DeviceType
previousState DeviceState
currentState DeviceState
- handler TransitionHandler
+ handlers []TransitionHandler
}
type TransitionMap struct {
@@ -55,88 +55,93 @@
deviceType: any,
previousState: DeviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
currentState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- handler: dMgr.activateDevice})
+ handlers: []TransitionHandler{dMgr.activateDevice}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
previousState: DeviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
currentState: DeviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- handler: dMgr.notAllowed})
+ handlers: []TransitionHandler{dMgr.notAllowed}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
previousState: DeviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
currentState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- handler: dMgr.activateDevice})
+ handlers: []TransitionHandler{dMgr.activateDevice}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
previousState: DeviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
currentState: DeviceState{Admin: voltha.AdminState_DOWNLOADING_IMAGE, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- handler: dMgr.notAllowed})
+ handlers: []TransitionHandler{dMgr.notAllowed}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
currentState: DeviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- handler: dMgr.notAllowed})
+ handlers: []TransitionHandler{dMgr.notAllowed}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: parent,
previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVATING},
currentState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVE},
- handler: dMgr.createLogicalDevice})
+ handlers: []TransitionHandler{dMgr.createLogicalDevice}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: child,
previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVATING},
currentState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVE},
- handler: dMgr.addUNILogicalPort})
+ handlers: []TransitionHandler{dMgr.addUNILogicalPort}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
- deviceType: any,
+ deviceType: parent,
previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
currentState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- handler: dMgr.disableDevice})
-
+ handlers: []TransitionHandler{dMgr.deleteLogicalDevice, dMgr.disableAllChildDevices}})
+ transitionMap.transitions = append(transitionMap.transitions,
+ Transition{
+ deviceType: child,
+ previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+ currentState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+ handlers: []TransitionHandler{dMgr.deleteLogicalPort}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
currentState: DeviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- handler: dMgr.abandonDevice})
+ handlers: []TransitionHandler{dMgr.abandonDevice}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
previousState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
currentState: DeviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- handler: dMgr.notAllowed})
+ handlers: []TransitionHandler{dMgr.notAllowed}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
previousState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
currentState: DeviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- handler: dMgr.abandonDevice})
+ handlers: []TransitionHandler{dMgr.abandonDevice}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
previousState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
currentState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- handler: dMgr.reEnableDevice})
+ handlers: []TransitionHandler{dMgr.reEnableDevice}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
previousState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
currentState: DeviceState{Admin: voltha.AdminState_DOWNLOADING_IMAGE, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- handler: dMgr.notAllowed})
+ handlers: []TransitionHandler{dMgr.notAllowed}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: any,
previousState: DeviceState{Admin: voltha.AdminState_DOWNLOADING_IMAGE, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
currentState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
- handler: dMgr.notAllowed})
+ handlers: []TransitionHandler{dMgr.notAllowed}})
return &transitionMap
}
@@ -146,38 +151,38 @@
}
// isMatched matches a state transition. It returns whether there is a match and if there is whether it is an exact match
-func getHandler(previous *DeviceState, current *DeviceState, transition *Transition) (TransitionHandler, bool) {
+func getHandler(previous *DeviceState, current *DeviceState, transition *Transition) ([]TransitionHandler, bool) {
// Do we have an exact match?
if *previous == transition.previousState && *current == transition.currentState {
- return transition.handler, true
+ return transition.handlers, true
}
// If the admin state changed then prioritize it first
if previous.Admin != current.Admin {
if previous.Admin == transition.previousState.Admin && current.Admin == transition.currentState.Admin {
- return transition.handler, false
+ return transition.handlers, false
}
}
// If the operational state changed then prioritize it in second position
if previous.Operational != current.Operational {
if previous.Operational == transition.previousState.Operational && current.Operational == transition.currentState.Operational {
- return transition.handler, false
+ return transition.handlers, false
}
}
// If the connection state changed then prioritize it in third position
if previous.Connection != current.Connection {
if previous.Connection == transition.previousState.Connection && current.Connection == transition.currentState.Connection {
- return transition.handler, false
+ return transition.handlers, false
}
}
return nil, false
}
-func (tMap *TransitionMap) GetTransitionHandler(pDevice *voltha.Device, cDevice *voltha.Device) TransitionHandler {
+func (tMap *TransitionMap) GetTransitionHandler(pDevice *voltha.Device, cDevice *voltha.Device) []TransitionHandler {
//1. Get the previous and current set of states
pState := getDeviceStates(pDevice)
cState := getDeviceStates(cDevice)
- log.Infow("DeviceType", log.Fields{"device": pDevice})
+ //log.Infow("DeviceType", log.Fields{"device": pDevice})
deviceType := parent
if !pDevice.Root {
log.Info("device is child")
@@ -186,8 +191,8 @@
log.Infof("deviceType:%d-deviceId:%s-previous:%v-current:%v", deviceType, pDevice.Id, pState, cState)
//2. Go over transition array to get the right transition
- var currentMatch TransitionHandler
- var tempHandler TransitionHandler
+ var currentMatch []TransitionHandler
+ var tempHandler []TransitionHandler
var exactMatch bool
var deviceTypeMatch bool
for _, aTransition := range tMap.transitions {
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index bd28322..d446438 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -48,19 +48,14 @@
func (handler *APIHandler) UpdateLogLevel(ctx context.Context, logging *voltha.Logging) (*empty.Empty, error) {
log.Debugw("UpdateLogLevel-request", log.Fields{"newloglevel": logging.Level, "intval": int(logging.Level)})
- if isTestMode(ctx) {
- out := new(empty.Empty)
- log.SetPackageLogLevel(logging.PackageName, int(logging.Level))
- return out, nil
- }
- return nil, errors.New("Unimplemented")
-
+ out := new(empty.Empty)
+ log.SetPackageLogLevel(logging.PackageName, int(logging.Level))
+ return out, nil
}
func processEnableDevicePort(ctx context.Context, id *voltha.LogicalPortId, ch chan error) {
log.Debugw("processEnableDevicePort", log.Fields{"id": id, "test": common.TestModeKeys_api_test.String()})
ch <- status.Errorf(100, "%d-%s", 100, "erreur")
-
}
func (handler *APIHandler) EnableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
@@ -141,15 +136,17 @@
go handler.deviceMgr.createDevice(ctx, device, ch)
select {
case res := <-ch:
- if res == nil {
- return &voltha.Device{Id: device.Id}, nil
- } else if err, ok := res.(error); ok {
- return &voltha.Device{Id: device.Id}, err
- } else {
- log.Warnw("create-device-unexpected-return-type", log.Fields{"result": res})
- err = status.Errorf(codes.Internal, "%s", res)
- return &voltha.Device{Id: device.Id}, err
+ if res != nil {
+ if err, ok := res.(error); ok {
+ return &voltha.Device{}, err
+ }
+ if d, ok := res.(*voltha.Device); ok {
+ return d, nil
+ }
}
+ log.Warnw("create-device-unexpected-return-type", log.Fields{"result": res})
+ err := status.Errorf(codes.Internal, "%s", res)
+ return &voltha.Device{}, err
case <-ctx.Done():
log.Debug("createdevice-client-timeout")
return nil, ctx.Err()
@@ -188,6 +185,24 @@
out := new(empty.Empty)
return out, nil
}
+ ch := make(chan interface{})
+ defer close(ch)
+ go handler.deviceMgr.disableDevice(ctx, id, ch)
+ select {
+ case res := <-ch:
+ if res == nil {
+ return new(empty.Empty), nil
+ } else if err, ok := res.(error); ok {
+ return new(empty.Empty), err
+ } else {
+ log.Warnw("disable-device-unexpected-return-type", log.Fields{"result": res})
+ err = status.Errorf(codes.Internal, "%s", res)
+ return new(empty.Empty), err
+ }
+ case <-ctx.Done():
+ log.Debug("enabledevice-client-timeout")
+ return nil, ctx.Err()
+ }
return nil, errors.New("Unimplemented")
}
diff --git a/rw_core/core/grpc_nbi_api_handler_client_test.go b/rw_core/core/grpc_nbi_api_handler_client_test.go
index 6a32ee5..21300cc 100644
--- a/rw_core/core/grpc_nbi_api_handler_client_test.go
+++ b/rw_core/core/grpc_nbi_api_handler_client_test.go
@@ -39,7 +39,6 @@
Prerequite: These tests require the rw_core to run prior to executing these test cases.
*/
-
func setup() {
var err error
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 117c869..218478f 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -25,17 +25,18 @@
"github.com/opencord/voltha-go/protos/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
- "reflect"
+ "sync"
)
type LogicalDeviceAgent struct {
- logicalDeviceId string
- lastData *voltha.LogicalDevice
- rootDeviceId string
- deviceMgr *DeviceManager
- ldeviceMgr *LogicalDeviceManager
- clusterDataProxy *model.Proxy
- exitChannel chan int
+ logicalDeviceId string
+ lastData *voltha.LogicalDevice
+ rootDeviceId string
+ deviceMgr *DeviceManager
+ ldeviceMgr *LogicalDeviceManager
+ clusterDataProxy *model.Proxy
+ exitChannel chan int
+ lockLogicalDevice sync.RWMutex
}
func NewLogicalDeviceAgent(id string, device *voltha.Device, ldeviceMgr *LogicalDeviceManager, deviceMgr *DeviceManager,
@@ -47,11 +48,12 @@
agent.deviceMgr = deviceMgr
agent.clusterDataProxy = cdProxy
agent.ldeviceMgr = ldeviceMgr
+ agent.lockLogicalDevice = sync.RWMutex{}
return &agent
}
func (agent *LogicalDeviceAgent) Start(ctx context.Context) error {
- log.Info("starting-logical_device-agent")
+ log.Infow("starting-logical_device-agent", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
//Build the logical device based on information retrieved from the device adapter
var switchCap *ca.SwitchCapability
var err error
@@ -68,7 +70,7 @@
//hence. may need to extract the port by the NNI port id defined by the adapter during device
//creation
var nniPorts *voltha.Ports
- if nniPorts, err = agent.deviceMgr.getNNIPorts(ctx, agent.rootDeviceId); err != nil {
+ if nniPorts, err = agent.deviceMgr.getPorts(ctx, agent.rootDeviceId, voltha.Port_ETHERNET_NNI); err != nil {
log.Errorw("error-creating-logical-port", log.Fields{"error": err})
}
var portCap *ca.PortCapability
@@ -80,8 +82,11 @@
}
lp := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
+ lp.DeviceId = agent.rootDeviceId
ld.Ports = append(ld.Ports, lp)
}
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
// Save the logical device
if added := agent.clusterDataProxy.Add("/logical_devices", ld, ""); added == nil {
log.Errorw("failed-to-add-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
@@ -92,8 +97,30 @@
return nil
}
+func (agent *LogicalDeviceAgent) getLogicalDevice() (*voltha.LogicalDevice, error) {
+ log.Debug("getLogicalDevice")
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
+ logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 1, false, "")
+ if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
+ cloned := proto.Clone(lDevice).(*voltha.LogicalDevice)
+ return cloned, nil
+ }
+ return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
+}
+
+func (agent *LogicalDeviceAgent) getLogicalDeviceWithoutLock() (*voltha.LogicalDevice, error) {
+ log.Debug("getLogicalDeviceWithoutLock")
+ logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 1, false, "")
+ if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
+ cloned := proto.Clone(lDevice).(*voltha.LogicalDevice)
+ return cloned, nil
+ }
+ return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
+}
+
func (agent *LogicalDeviceAgent) addUNILogicalPort(ctx context.Context, childDevice *voltha.Device, portNo uint32) error {
- log.Info("addUNILogicalPort-start")
+ log.Infow("addUNILogicalPort-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
// Build the logical device based on information retrieved from the device adapter
var portCap *ca.PortCapability
var err error
@@ -101,29 +128,67 @@
log.Errorw("error-creating-logical-port", log.Fields{"error": err})
return err
}
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
// Get stored logical device
- if ldevice, err := agent.ldeviceMgr.getLogicalDevice(agent.logicalDeviceId); err != nil {
+ if ldevice, err := agent.getLogicalDeviceWithoutLock(); err != nil {
return status.Error(codes.NotFound, agent.logicalDeviceId)
} else {
- cloned := reflect.ValueOf(ldevice).Elem().Interface().(voltha.LogicalDevice)
- lp := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
+ cloned := proto.Clone(ldevice).(*voltha.LogicalDevice)
+ lp := proto.Clone(portCap.Port).(*voltha.LogicalPort)
+ lp.DeviceId = childDevice.Id
cloned.Ports = append(cloned.Ports, lp)
- afterUpdate := agent.clusterDataProxy.Update("/logical_devices/"+agent.logicalDeviceId, &cloned, false, "")
- if afterUpdate == nil {
- return status.Errorf(codes.Internal, "failed-add-UNI-port:%s", agent.logicalDeviceId)
- }
+ return agent.updateLogicalDeviceWithoutLock(cloned)
+ }
+}
+
+//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
+func (agent *LogicalDeviceAgent) updateLogicalDeviceWithoutLock(logicalDevice *voltha.LogicalDevice) error {
+ cloned := proto.Clone(logicalDevice).(*voltha.LogicalDevice)
+ afterUpdate := agent.clusterDataProxy.Update("/logical_devices/"+agent.logicalDeviceId, cloned, false, "")
+ if afterUpdate == nil {
+ return status.Errorf(codes.Internal, "failed-updating-logical-device:%s", agent.logicalDeviceId)
+ }
+ return nil
+}
+
+// deleteLogicalPort removes the logical port associated with a child device
+func (agent *LogicalDeviceAgent) deleteLogicalPort(device *voltha.Device) error {
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
+ // Get the most up to date logical device
+ var logicaldevice *voltha.LogicalDevice
+ if logicaldevice, _ = agent.getLogicalDeviceWithoutLock(); logicaldevice == nil {
+ log.Debugw("no-logical-device", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "deviceId": device.Id})
return nil
}
+ index := -1
+ for i, logicalPort := range logicaldevice.Ports {
+ if logicalPort.DeviceId == device.Id {
+ index = i
+ break
+ }
+ }
+ if index >= 0 {
+ copy(logicaldevice.Ports[index:], logicaldevice.Ports[index+1:])
+ logicaldevice.Ports[len(logicaldevice.Ports)-1] = nil
+ logicaldevice.Ports = logicaldevice.Ports[:len(logicaldevice.Ports)-1]
+ log.Debugw("logical-port-deleted", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+ return agent.updateLogicalDeviceWithoutLock(logicaldevice)
+ }
+ return nil
}
func (agent *LogicalDeviceAgent) Stop(ctx context.Context) {
log.Info("stopping-logical_device-agent")
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
+ //Remove the logical device from the model
+ if removed := agent.clusterDataProxy.Remove("/logical_devices/"+agent.logicalDeviceId, ""); removed == nil {
+ log.Errorw("failed-to-remove-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
+ } else {
+ log.Debugw("logicaldevice-removed", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
+ }
agent.exitChannel <- 1
log.Info("logical_device-agent-stopped")
}
-
-func (agent *LogicalDeviceAgent) getLogicalDevice(ctx context.Context) *voltha.LogicalDevice {
- log.Debug("getLogicalDevice")
- cp := proto.Clone(agent.lastData)
- return cp.(*voltha.LogicalDevice)
-}
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index aa22d57..bef078c 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -24,7 +24,6 @@
"github.com/opencord/voltha-go/protos/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
- "reflect"
"strings"
"sync"
)
@@ -78,33 +77,36 @@
return nil
}
+func (ldMgr *LogicalDeviceManager) deleteLogicalDeviceAgent(logicalDeviceId string) {
+ ldMgr.lockLogicalDeviceAgentsMap.Lock()
+ defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
+ delete(ldMgr.logicalDeviceAgents, logicalDeviceId)
+}
+
+// getLogicalDevice provides a cloned most up to date logical device
func (ldMgr *LogicalDeviceManager) getLogicalDevice(id string) (*voltha.LogicalDevice, error) {
- log.Debugw("getlogicalDevice-start", log.Fields{"logicaldeviceid": id})
- logicalDevice := ldMgr.clusterDataProxy.Get("/logical_devices/"+id, 1, false, "")
- if logicalDevice != nil {
- cloned := reflect.ValueOf(logicalDevice).Elem().Interface().(voltha.LogicalDevice)
- return &cloned, nil
+ log.Debugw("getlogicalDevice", log.Fields{"logicaldeviceid": id})
+ if agent := ldMgr.getLogicalDeviceAgent(id); agent != nil {
+ return agent.getLogicalDevice()
}
return nil, status.Errorf(codes.NotFound, "%s", id)
}
func (ldMgr *LogicalDeviceManager) listLogicalDevices() (*voltha.LogicalDevices, error) {
- log.Debug("listLogicalDevices-start")
+ log.Debug("listLogicalDevices")
result := &voltha.LogicalDevices{}
ldMgr.lockLogicalDeviceAgentsMap.Lock()
defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
for _, agent := range ldMgr.logicalDeviceAgents {
- logicalDevice := ldMgr.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 1, false, "")
- if logicalDevice != nil {
- cloned := reflect.ValueOf(logicalDevice).Elem().Interface().(voltha.LogicalDevice)
- result.Items = append(result.Items, &cloned)
+ if lDevice, err := agent.getLogicalDevice(); err == nil {
+ result.Items = append(result.Items, lDevice)
}
}
return result, nil
}
func (ldMgr *LogicalDeviceManager) CreateLogicalDevice(ctx context.Context, device *voltha.Device) (*string, error) {
- log.Infow("creating-logical-device-start", log.Fields{"deviceId": device.Id})
+ log.Debugw("creating-logical-device", log.Fields{"deviceId": device.Id})
// Sanity check
if !device.Root {
return nil, errors.New("Device-not-root")
@@ -116,18 +118,74 @@
// in the Device model. May need to be moved out.
macAddress := device.MacAddress
id := strings.Replace(macAddress, ":", "", -1)
- log.Debugw("setting-logical-device-id", log.Fields{"logicaldeviceId": id})
+ if id == "" {
+ log.Errorw("mac-address-not-set", log.Fields{"deviceId": device.Id})
+ return nil, errors.New("mac-address-not-set")
+ }
+ log.Debugw("logical-device-id", log.Fields{"logicaldeviceId": id})
agent := NewLogicalDeviceAgent(id, device, ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy)
ldMgr.addLogicalDeviceAgentToMap(agent)
go agent.Start(ctx)
- log.Info("creating-logical-device-ends")
+ log.Debug("creating-logical-device-ends")
return &id, nil
}
+func (ldMgr *LogicalDeviceManager) DeleteLogicalDevice(ctx context.Context, device *voltha.Device) error {
+ log.Debugw("deleting-logical-device", log.Fields{"deviceId": device.Id})
+ // Sanity check
+ if !device.Root {
+ return errors.New("Device-not-root")
+ }
+ logDeviceId := device.ParentId
+ if agent := ldMgr.getLogicalDeviceAgent(logDeviceId); agent != nil {
+ // Stop the logical device agent
+ agent.Stop(ctx)
+ //Remove the logical device agent from the Map
+ ldMgr.deleteLogicalDeviceAgent(logDeviceId)
+ }
+
+ log.Debug("deleting-logical-device-ends")
+ return nil
+}
+
+func (ldMgr *LogicalDeviceManager) getLogicalDeviceId(device *voltha.Device) (*string, error) {
+ // Device can either be a parent or a child device
+ if device.Root {
+ // Parent device. The ID of a parent device is the logical device ID
+ return &device.ParentId, nil
+ }
+ // Device is child device
+ // retrieve parent device using child device ID
+ if parentDevice := ldMgr.deviceMgr.getParentDevice(device); parentDevice != nil {
+ return &parentDevice.ParentId, nil
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", device.Id)
+}
+
+// DeleteLogicalDevice removes the logical port associated with a child device
+func (ldMgr *LogicalDeviceManager) DeleteLogicalPort(ctx context.Context, device *voltha.Device) error {
+ log.Debugw("deleting-logical-port", log.Fields{"deviceId": device.Id})
+ // Sanity check
+ if device.Root {
+ return errors.New("Device-root")
+ }
+ logDeviceId, _ := ldMgr.getLogicalDeviceId(device)
+ if logDeviceId == nil {
+ log.Debugw("no-logical-device-present", log.Fields{"deviceId": device.Id})
+ return nil
+ }
+ if agent := ldMgr.getLogicalDeviceAgent(*logDeviceId); agent != nil {
+ agent.deleteLogicalPort(device)
+ }
+
+ log.Debug("deleting-logical-port-ends")
+ return nil
+}
+
func (ldMgr *LogicalDeviceManager) AddUNILogicalPort(ctx context.Context, childDevice *voltha.Device) error {
- log.Infow("AddUNILogicalPort-start", log.Fields{"deviceId": childDevice.Id})
+ log.Debugw("AddUNILogicalPort", log.Fields{"deviceId": childDevice.Id})
// Sanity check
if childDevice.Root {
return errors.New("Device-root")
@@ -137,7 +195,7 @@
parentId := childDevice.ParentId
logDeviceId := ldMgr.deviceMgr.GetParentDeviceId(parentId)
- log.Infow("AddUNILogicalPort", log.Fields{"logDeviceId": logDeviceId, "parentId": parentId})
+ log.Debugw("AddUNILogicalPort", log.Fields{"logDeviceId": logDeviceId, "parentId": parentId})
if agent := ldMgr.getLogicalDeviceAgent(*logDeviceId); agent != nil {
return agent.addUNILogicalPort(ctx, childDevice, childDevice.ProxyAddress.ChannelId)
diff --git a/rw_core/main.go b/rw_core/main.go
index 25adee6..f53a6ba 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -221,7 +221,7 @@
}
log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
- log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.DebugLevel)
+ log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.WarnLevel)
defer log.CleanUp()
diff --git a/tests/kafka/kafka_inter_container_messaging_test.go b/tests/kafka/kafka_inter_container_messaging_test.go
index 99ba2c4..b31f2ce 100644
--- a/tests/kafka/kafka_inter_container_messaging_test.go
+++ b/tests/kafka/kafka_inter_container_messaging_test.go
@@ -42,12 +42,12 @@
log.SetAllLogLevel(log.ErrorLevel)
coreKafkaProxy, _ = kk.NewKafkaMessagingProxy(
- kk.KafkaHost("192.168.0.20"),
+ kk.KafkaHost("192.168.0.17"),
kk.KafkaPort(9092),
kk.DefaultTopic(&kk.Topic{Name: "Core"}))
adapterKafkaProxy, _ = kk.NewKafkaMessagingProxy(
- kk.KafkaHost("192.168.0.20"),
+ kk.KafkaHost("192.168.0.17"),
kk.KafkaPort(9092),
kk.DefaultTopic(&kk.Topic{Name: "Adapter"}))
@@ -186,7 +186,7 @@
func TestGetDevice(t *testing.T) {
trnsId := uuid.New().String()
- protoMsg := &voltha.ID{Id:trnsId}
+ protoMsg := &voltha.ID{Id: trnsId}
args := make([]*kk.KVArg, 1)
args[0] = &kk.KVArg{
Key: "deviceID",
@@ -212,7 +212,7 @@
func TestGetDeviceTimeout(t *testing.T) {
trnsId := uuid.New().String()
- protoMsg := &voltha.ID{Id:trnsId}
+ protoMsg := &voltha.ID{Id: trnsId}
args := make([]*kk.KVArg, 1)
args[0] = &kk.KVArg{
Key: "deviceID",
@@ -237,7 +237,7 @@
func TestGetChildDevice(t *testing.T) {
trnsId := uuid.New().String()
- protoMsg := &voltha.ID{Id:trnsId}
+ protoMsg := &voltha.ID{Id: trnsId}
args := make([]*kk.KVArg, 1)
args[0] = &kk.KVArg{
Key: "deviceID",
@@ -260,7 +260,7 @@
func TestGetChildDevices(t *testing.T) {
trnsId := uuid.New().String()
- protoMsg := &voltha.ID{Id:trnsId}
+ protoMsg := &voltha.ID{Id: trnsId}
args := make([]*kk.KVArg, 1)
args[0] = &kk.KVArg{
Key: "deviceID",
@@ -283,7 +283,7 @@
func TestGetPorts(t *testing.T) {
trnsId := uuid.New().String()
- protoArg1 := &voltha.ID{Id:trnsId}
+ protoArg1 := &voltha.ID{Id: trnsId}
args := make([]*kk.KVArg, 2)
args[0] = &kk.KVArg{
Key: "deviceID",
@@ -311,7 +311,7 @@
func TestGetPortsMissingArgs(t *testing.T) {
trnsId := uuid.New().String()
- protoArg1 := &voltha.ID{Id:trnsId}
+ protoArg1 := &voltha.ID{Id: trnsId}
args := make([]*kk.KVArg, 1)
args[0] = &kk.KVArg{
Key: "deviceID",
@@ -443,6 +443,36 @@
assert.NotNil(t, unpackResult)
}
+func TestDeviceStateChange(t *testing.T) {
+ log.SetAllLogLevel(log.DebugLevel)
+ trnsId := uuid.New().String()
+ protoArg1 := &voltha.ID{Id: trnsId}
+ args := make([]*kk.KVArg, 4)
+ args[0] = &kk.KVArg{
+ Key: "device_id",
+ Value: protoArg1,
+ }
+ protoArg2 := &ca.IntType{Val: 1}
+ args[1] = &kk.KVArg{
+ Key: "oper_status",
+ Value: protoArg2,
+ }
+ protoArg3 := &ca.IntType{Val: 1}
+ args[2] = &kk.KVArg{
+ Key: "connect_status",
+ Value: protoArg3,
+ }
+
+ rpc := "DeviceStateUpdate"
+ topic := kk.Topic{Name: "Core"}
+ start := time.Now()
+ status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+ elapsed := time.Since(start)
+ log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ assert.Equal(t, status, true)
+ assert.Nil(t, result)
+}
+
func TestStopKafkaProxy(t *testing.T) {
adapterKafkaProxy.Stop()
coreKafkaProxy.Stop()