[VOL-1034, VOL-1035, VOL-1037] This commit consists of:
1) Implementation of inter-adapter communication using flows
as proxy message between an ONU and its parent OLT.
2) Update the protos to reflect the inter-adapter message structure
3) Cleanup the ponsim adapters to removed unsued references and
general cleanup.
Change-Id: Ibe913a80a96d601fed946d9b9db55bb8d4f2c15a
diff --git a/adapters/Makefile b/adapters/Makefile
index 9374284..2531985 100644
--- a/adapters/Makefile
+++ b/adapters/Makefile
@@ -178,6 +178,7 @@
ifneq ($(VOLTHA_BUILD),docker)
make -C protos
else
+ cp ../protos/*.proto ./protos
docker build $(DOCKER_BUILD_ARGS) -t ${REGISTRY}${REPOSITORY}voltha-protos:${TAG} -f docker/Dockerfile.protos .
endif
diff --git a/adapters/common/openflow/utils.py b/adapters/common/openflow/utils.py
index 4547255..730c714 100644
--- a/adapters/common/openflow/utils.py
+++ b/adapters/common/openflow/utils.py
@@ -85,6 +85,7 @@
TUNNEL_ID = ofp.OFPXMT_OFB_TUNNEL_ID
IPV6_EXTHDR = ofp.OFPXMT_OFB_IPV6_EXTHDR
+
# ofp_action_* shortcuts
def output(port, max_len=ofp.OFPCML_MAX):
@@ -93,41 +94,48 @@
output=ofp.ofp_action_output(port=port, max_len=max_len)
)
+
def mpls_ttl(ttl):
return action(
type=SET_MPLS_TTL,
mpls_ttl=ofp.ofp_action_mpls_ttl(mpls_ttl=ttl)
)
+
def push_vlan(eth_type):
return action(
type=PUSH_VLAN,
push=ofp.ofp_action_push(ethertype=eth_type)
)
+
def pop_vlan():
return action(
type=POP_VLAN
)
+
def pop_mpls(eth_type):
return action(
type=POP_MPLS,
pop_mpls=ofp.ofp_action_pop_mpls(ethertype=eth_type)
)
+
def group(group_id):
return action(
type=GROUP,
group=ofp.ofp_action_group(group_id=group_id)
)
+
def nw_ttl(nw_ttl):
return action(
type=NW_TTL,
nw_ttl=ofp.ofp_action_nw_ttl(nw_ttl=nw_ttl)
)
+
def set_field(field):
return action(
type=SET_FIELD,
@@ -137,6 +145,7 @@
ofb_field=field))
)
+
def experimenter(experimenter, data):
return action(
type=EXPERIMENTER,
@@ -144,125 +153,165 @@
experimenter=experimenter, data=data)
)
+
# ofb_field generators (incomplete set)
def in_port(_in_port):
return ofb_field(type=IN_PORT, port=_in_port)
+
def in_phy_port(_in_phy_port):
return ofb_field(type=IN_PHY_PORT, port=_in_phy_port)
+
def metadata(_table_metadata):
return ofb_field(type=METADATA, table_metadata=_table_metadata)
+
def eth_dst(_eth_dst):
return ofb_field(type=ETH_DST, table_metadata=_eth_dst)
+
def eth_src(_eth_src):
return ofb_field(type=ETH_SRC, table_metadata=_eth_src)
+
def eth_type(_eth_type):
return ofb_field(type=ETH_TYPE, eth_type=_eth_type)
+
def vlan_vid(_vlan_vid):
return ofb_field(type=VLAN_VID, vlan_vid=_vlan_vid)
+
def vlan_pcp(_vlan_pcp):
return ofb_field(type=VLAN_PCP, vlan_pcp=_vlan_pcp)
+
def ip_dscp(_ip_dscp):
return ofb_field(type=IP_DSCP, ip_dscp=_ip_dscp)
+
def ip_ecn(_ip_ecn):
return ofb_field(type=IP_ECN, ip_ecn=_ip_ecn)
+
def ip_proto(_ip_proto):
return ofb_field(type=IP_PROTO, ip_proto=_ip_proto)
+
def ipv4_src(_ipv4_src):
return ofb_field(type=IPV4_SRC, ipv4_src=_ipv4_src)
+
def ipv4_dst(_ipv4_dst):
return ofb_field(type=IPV4_DST, ipv4_dst=_ipv4_dst)
+
def tcp_src(_tcp_src):
return ofb_field(type=TCP_SRC, tcp_src=_tcp_src)
+
def tcp_dst(_tcp_dst):
return ofb_field(type=TCP_DST, tcp_dst=_tcp_dst)
+
def udp_src(_udp_src):
return ofb_field(type=UDP_SRC, udp_src=_udp_src)
+
def udp_dst(_udp_dst):
return ofb_field(type=UDP_DST, udp_dst=_udp_dst)
+
def sctp_src(_sctp_src):
return ofb_field(type=SCTP_SRC, sctp_src=_sctp_src)
+
def sctp_dst(_sctp_dst):
return ofb_field(type=SCTP_DST, sctp_dst=_sctp_dst)
+
def icmpv4_type(_icmpv4_type):
return ofb_field(type=ICMPV4_TYPE, icmpv4_type=_icmpv4_type)
+
def icmpv4_code(_icmpv4_code):
return ofb_field(type=ICMPV4_CODE, icmpv4_code=_icmpv4_code)
+
def arp_op(_arp_op):
return ofb_field(type=ARP_OP, arp_op=_arp_op)
+
def arp_spa(_arp_spa):
return ofb_field(type=ARP_SPA, arp_spa=_arp_spa)
+
def arp_tpa(_arp_tpa):
return ofb_field(type=ARP_TPA, arp_tpa=_arp_tpa)
+
def arp_sha(_arp_sha):
return ofb_field(type=ARP_SHA, arp_sha=_arp_sha)
+
def arp_tha(_arp_tha):
return ofb_field(type=ARP_THA, arp_tha=_arp_tha)
+
def ipv6_src(_ipv6_src):
return ofb_field(type=IPV6_SRC, arp_tha=_ipv6_src)
+
def ipv6_dst(_ipv6_dst):
return ofb_field(type=IPV6_DST, arp_tha=_ipv6_dst)
+
def ipv6_flabel(_ipv6_flabel):
return ofb_field(type=IPV6_FLABEL, arp_tha=_ipv6_flabel)
+
def ipmpv6_type(_icmpv6_type):
return ofb_field(type=ICMPV6_TYPE, arp_tha=_icmpv6_type)
+
def icmpv6_code(_icmpv6_code):
return ofb_field(type=ICMPV6_CODE, arp_tha=_icmpv6_code)
+
def ipv6_nd_target(_ipv6_nd_target):
return ofb_field(type=IPV6_ND_TARGET, arp_tha=_ipv6_nd_target)
+
def ofb_ipv6_nd_sll(_ofb_ipv6_nd_sll):
return ofb_field(type=OFB_IPV6_ND_SLL, arp_tha=_ofb_ipv6_nd_sll)
+
def ipv6_nd_tll(_ipv6_nd_tll):
return ofb_field(type=IPV6_ND_TLL, arp_tha=_ipv6_nd_tll)
+
def mpls_label(_mpls_label):
return ofb_field(type=MPLS_LABEL, arp_tha=_mpls_label)
+
def mpls_tc(_mpls_tc):
return ofb_field(type=MPLS_TC, arp_tha=_mpls_tc)
+
def mpls_bos(_mpls_bos):
return ofb_field(type=MPLS_BOS, arp_tha=_mpls_bos)
+
def pbb_isid(_pbb_isid):
return ofb_field(type=PBB_ISID, arp_tha=_pbb_isid)
+
def tunnel_id(_tunnel_id):
return ofb_field(type=TUNNEL_ID, arp_tha=_tunnel_id)
+
def ipv6_exthdr(_ipv6_exthdr):
return ofb_field(type=IPV6_EXTHDR, arp_tha=_ipv6_exthdr)
@@ -277,6 +326,7 @@
if instruction.type == ofp.OFPIT_APPLY_ACTIONS:
return instruction.actions.actions
+
def get_ofb_fields(flow):
assert isinstance(flow, ofp.ofp_flow_stats)
assert flow.match.type == ofp.OFPMT_OXM
@@ -286,24 +336,28 @@
ofb_fields.append(field.ofb_field)
return ofb_fields
+
def get_out_port(flow):
for action in get_actions(flow):
if action.type == OUTPUT:
return action.output.port
return None
+
def get_in_port(flow):
for field in get_ofb_fields(flow):
if field.type == IN_PORT:
return field.port
return None
+
def get_goto_table_id(flow):
for instruction in flow.instructions:
if instruction.type == ofp.OFPIT_GOTO_TABLE:
return instruction.goto_table.table_id
return None
+
def get_metadata(flow):
''' legacy get method (only want lower 32 bits '''
for field in get_ofb_fields(flow):
@@ -311,6 +365,7 @@
return field.table_metadata & 0xffffffff
return None
+
def get_metadata_64_bit(flow):
for field in get_ofb_fields(flow):
if field.type == METADATA:
@@ -364,17 +419,20 @@
def has_next_table(flow):
return get_goto_table_id(flow) is not None
+
def get_group(flow):
for action in get_actions(flow):
if action.type == GROUP:
return action.group.group_id
return None
+
def has_group(flow):
return get_group(flow) is not None
+
def mk_oxm_fields(match_fields):
- oxm_fields=[
+ oxm_fields = [
ofp.ofp_oxm_field(
oxm_class=ofp.OFPXMC_OPENFLOW_BASIC,
ofb_field=field
@@ -383,6 +441,7 @@
return oxm_fields
+
def mk_instructions_from_actions(actions):
instructions_action = ofp.ofp_instruction_actions()
instructions_action.actions.extend(actions)
@@ -390,6 +449,7 @@
actions=instructions_action)
return [instruction]
+
def mk_simple_flow_mod(match_fields, actions, command=ofp.OFPFC_ADD,
next_table_id=None, **kw):
"""
@@ -495,4 +555,4 @@
def mk_group_stat(**kw):
- return group_entry_from_group_mod(mk_multicast_group_mod(**kw))
\ No newline at end of file
+ return group_entry_from_group_mod(mk_multicast_group_mod(**kw))
diff --git a/adapters/docker/Dockerfile.protos b/adapters/docker/Dockerfile.protos
index 27e3db4..db70d13 100644
--- a/adapters/docker/Dockerfile.protos
+++ b/adapters/docker/Dockerfile.protos
@@ -19,14 +19,15 @@
FROM ${REGISTRY}${REPOSITORY}voltha-protoc:${TAG} as builder
MAINTAINER Voltha Community <info@opennetworking.org>
-COPY adapters/protos/third_party/google/api/*.proto /protos/google/api/
-COPY adapters/docker/config/Makefile.protos /protos/google/api/Makefile.protos
+COPY protos/third_party/google/api/*.proto /protos/google/api/
+COPY docker/config/Makefile.protos /protos/google/api/Makefile.protos
+COPY protos/*.proto /protos/voltha/
+COPY docker/config/Makefile.protos /protos/voltha/Makefile.protos
+
WORKDIR /protos
RUN make -f google/api/Makefile.protos google_api
RUN touch /protos/google/__init__.py /protos/google/api/__init__.py
-COPY protos/*.proto /protos/voltha/
-COPY adapters/docker/config/Makefile.protos /protos/voltha/Makefile.protos
WORKDIR /protos/voltha
RUN make -f Makefile.protos build
diff --git a/adapters/iadapter.py b/adapters/iadapter.py
index 0d32096..ee4d116 100644
--- a/adapters/iadapter.py
+++ b/adapters/iadapter.py
@@ -19,30 +19,37 @@
"""
import structlog
-from zope.interface import implementer
from twisted.internet import reactor
+from zope.interface import implementer
-from adapters.protos.common_pb2 import AdminState
-from adapters.protos.device_pb2 import DeviceType, DeviceTypes
from adapters.interface import IAdapterInterface
from adapters.protos.adapter_pb2 import Adapter
from adapters.protos.adapter_pb2 import AdapterConfig
+from adapters.protos.common_pb2 import AdminState
from adapters.protos.common_pb2 import LogLevel
+from adapters.protos.device_pb2 import DeviceType, DeviceTypes
from adapters.protos.health_pb2 import HealthStatus
-from adapters.protos.device_pb2 import Device
log = structlog.get_logger()
@implementer(IAdapterInterface)
class IAdapter(object):
- def __init__(self, adapter_agent, config, device_handler_class, name,
- vendor, version, device_type, vendor_id,
+ def __init__(self,
+ core_proxy,
+ adapter_proxy,
+ config,
+ device_handler_class,
+ name,
+ vendor,
+ version,
+ device_type, vendor_id,
accepts_bulk_flow_update=True,
- accepts_add_remove_flow_updates=False, core_proxy=None):
- log.debug('Initializing adapter: {} {} {}'.format(vendor, name, version))
- self.adapter_agent = adapter_agent
- self.core_proxy=core_proxy
+ accepts_add_remove_flow_updates=False):
+ log.debug(
+ 'Initializing adapter: {} {} {}'.format(vendor, name, version))
+ self.core_proxy = core_proxy
+ self.adapter_proxy = adapter_proxy
self.config = config
self.name = name
self.supported_device_types = [
@@ -84,19 +91,24 @@
def get_ofp_device_info(self, device):
log.debug('get_ofp_device_info_start', device_id=device.id)
- ofp_device_info = self.devices_handlers[device.id].get_ofp_device_info(device)
+ ofp_device_info = self.devices_handlers[device.id].get_ofp_device_info(
+ device)
log.debug('get_ofp_device_info_ends', device_id=device.id)
return ofp_device_info
def get_ofp_port_info(self, device, port_no):
- log.debug('get_ofp_port_info_start', device_id=device.id, port_no=port_no)
- ofp_port_info = self.devices_handlers[device.id].get_ofp_port_info(device, port_no)
- log.debug('get_ofp_port_info_ends', device_id=device.id, port_no=port_no)
+ log.debug('get_ofp_port_info_start', device_id=device.id,
+ port_no=port_no)
+ ofp_port_info = self.devices_handlers[device.id].get_ofp_port_info(
+ device, port_no)
+ log.debug('get_ofp_port_info_ends', device_id=device.id,
+ port_no=port_no)
return ofp_port_info
def adopt_device(self, device):
log.debug('adopt_device', device_id=device.id)
- self.devices_handlers[device.id] = self.device_handler_class(self, device.id)
+ self.devices_handlers[device.id] = self.device_handler_class(self,
+ device.id)
reactor.callLater(0, self.devices_handlers[device.id].activate, device)
log.debug('adopt_device_done', device_id=device.id)
return device
@@ -142,7 +154,8 @@
def self_test_device(self, device):
log.info('self-test', device_id=device.id)
- result = reactor.callLater(0, self.devices_handlers[device.id].self_test_device)
+ result = reactor.callLater(0, self.devices_handlers[
+ device.id].self_test_device)
log.info('self-test-done', device_id=device.id)
return result
@@ -159,10 +172,10 @@
log.info('bulk-flow-update', device_id=device.id,
flows=flows, groups=groups)
assert len(groups.items) == 0
- reactor.callLater(0, self.devices_handlers[device.id].update_flow_table, flows.items)
+ reactor.callLater(0, self.devices_handlers[device.id].update_flow_table,
+ flows.items)
return device
-
def update_flows_incrementally(self, device, flow_changes, group_changes):
log.info('incremental-flow-update', device_id=device.id,
flows=flow_changes, groups=group_changes)
@@ -173,11 +186,13 @@
handler = self.devices_handlers[device.id]
# Remove flows
if len(flow_changes.to_remove.items) != 0:
- reactor.callLater(0, handler.remove_from_flow_table, flow_changes.to_remove.items)
+ reactor.callLater(0, handler.remove_from_flow_table,
+ flow_changes.to_remove.items)
# Add flows
if len(flow_changes.to_add.items) != 0:
- reactor.callLater(0, handler.add_to_flow_table, flow_changes.to_add.items)
+ reactor.callLater(0, handler.add_to_flow_table,
+ flow_changes.to_add.items)
return device
def update_pm_config(self, device, pm_config):
@@ -186,18 +201,12 @@
handler = self.devices_handlers[device.id]
handler.update_pm_config(device, pm_config)
- def send_proxied_message(self, proxy_address, msg):
- raise NotImplementedError()
-
- def receive_proxied_message(self, proxy_address, msg):
+ def process_inter_adapter_message(self, msg):
raise NotImplementedError()
def receive_packet_out(self, logical_device_id, egress_port_no, msg):
raise NotImplementedError()
- def receive_inter_adapter_message(self, msg):
- raise NotImplementedError()
-
def suppress_alarm(self, filter):
raise NotImplementedError()
@@ -211,16 +220,25 @@
return handler
return None
+
"""
OLT Adapter base class
"""
+
+
class OltAdapter(IAdapter):
- def __init__(self, adapter_agent, config, device_handler_class, name,
- vendor, version, device_type,
+ def __init__(self,
+ core_proxy,
+ adapter_proxy,
+ config,
+ device_handler_class,
+ name,
+ vendor,
+ version, device_type,
accepts_bulk_flow_update=True,
- accepts_add_remove_flow_updates=False,
- core_proxy=None):
- super(OltAdapter, self).__init__(adapter_agent=adapter_agent,
+ accepts_add_remove_flow_updates=False):
+ super(OltAdapter, self).__init__(core_proxy=core_proxy,
+ adapter_proxy=adapter_proxy,
config=config,
device_handler_class=device_handler_class,
name=name,
@@ -229,13 +247,13 @@
device_type=device_type,
vendor_id=None,
accepts_bulk_flow_update=accepts_bulk_flow_update,
- accepts_add_remove_flow_updates=accepts_add_remove_flow_updates,
- core_proxy=None)
+ accepts_add_remove_flow_updates=accepts_add_remove_flow_updates)
self.logical_device_id_to_root_device_id = dict()
def reconcile_device(self, device):
try:
- self.devices_handlers[device.id] = self.device_handler_class(self, device.id)
+ self.devices_handlers[device.id] = self.device_handler_class(self,
+ device.id)
# Work only required for devices that are in ENABLED state
if device.admin_state == AdminState.ENABLED:
reactor.callLater(0,
@@ -244,7 +262,7 @@
else:
# Invoke the children reconciliation which would setup the
# basic children data structures
- self.adapter_agent.reconcile_child_devices(device.id)
+ self.core_proxy.reconcile_child_devices(device.id)
return device
except Exception, e:
log.exception('Exception', e=e)
@@ -254,11 +272,25 @@
handler = self.devices_handlers[proxy_address.device_id]
handler.send_proxied_message(proxy_address, msg)
+ def process_inter_adapter_message(self, msg):
+ log.info('process-inter-adapter-message', msg=msg)
+ # Unpack the header to know which device needs to handle this message
+ handler = None
+ if msg.header.proxy_device_id:
+ # typical request
+ handler = self.devices_handlers[msg.header.proxy_device_id]
+ elif msg.header.to_device_id and \
+ msg.header.to_device_id in self.devices_handlers:
+ # typical response
+ handler = self.devices_handlers[msg.header.to_device_id]
+ if handler:
+ reactor.callLater(0, handler.process_inter_adapter_message, msg)
+
def receive_packet_out(self, logical_device_id, egress_port_no, msg):
def ldi_to_di(ldi):
di = self.logical_device_id_to_root_device_id.get(ldi)
if di is None:
- logical_device = self.adapter_agent.get_logical_device(ldi)
+ logical_device = self.core_proxy.get_logical_device(ldi)
di = logical_device.root_device_id
self.logical_device_id_to_root_device_id[ldi] = di
return di
@@ -274,10 +306,20 @@
class OnuAdapter(IAdapter):
- def __init__(self, adapter_agent, config, device_handler_class, name,
- vendor, version, device_type, vendor_id, accepts_bulk_flow_update=True,
+ def __init__(self,
+ core_proxy,
+ adapter_proxy,
+ config,
+ device_handler_class,
+ name,
+ vendor,
+ version,
+ device_type,
+ vendor_id,
+ accepts_bulk_flow_update=True,
accepts_add_remove_flow_updates=False):
- super(OnuAdapter, self).__init__(adapter_agent=adapter_agent,
+ super(OnuAdapter, self).__init__(core_proxy=core_proxy,
+ adapter_proxy=adapter_proxy,
config=config,
device_handler_class=device_handler_class,
name=name,
@@ -286,12 +328,11 @@
device_type=device_type,
vendor_id=vendor_id,
accepts_bulk_flow_update=accepts_bulk_flow_update,
- accepts_add_remove_flow_updates=accepts_add_remove_flow_updates,
- core_proxy=None
- )
+ accepts_add_remove_flow_updates=accepts_add_remove_flow_updates)
def reconcile_device(self, device):
- self.devices_handlers[device.id] = self.device_handler_class(self, device.id)
+ self.devices_handlers[device.id] = self.device_handler_class(self,
+ device.id)
# Reconcile only if state was ENABLED
if device.admin_state == AdminState.ENABLED:
reactor.callLater(0,
@@ -304,8 +345,15 @@
device_id=proxy_address.device_id, msg=msg)
# Device_id from the proxy_address is the olt device id. We need to
# get the onu device id using the port number in the proxy_address
- device = self.adapter_agent. \
+ device = self.core_proxy. \
get_child_device_with_proxy_address(proxy_address)
if device:
handler = self.devices_handlers[device.id]
handler.receive_message(msg)
+
+ def process_inter_adapter_message(self, msg):
+ log.info('process-inter-adapter-message', msg=msg)
+ # Unpack the header to know which device needs to handle this message
+ if msg.header:
+ handler = self.devices_handlers[msg.header.to_device_id]
+ handler.process_inter_adapter_message(msg)
diff --git a/adapters/interface.py b/adapters/interface.py
index d1ac455..b0390d8 100644
--- a/adapters/interface.py
+++ b/adapters/interface.py
@@ -259,209 +259,18 @@
:return: Proto Message (TBD)
"""
- # def start():
- # """
- # Called once after adapter instance is laoded. Can be used to async
- # initialization.
- # :return: (None or Deferred)
- # """
- #
- # def stop():
- # """
- # Called once before adapter is unloaded. It can be used to perform
- # any cleanup after the adapter.
- # :return: (None or Deferred)
- # """
- #
- # def receive_inter_adapter_message(msg):
- # """
- # Called when the adapter recieves a message that was sent to it directly
- # from another adapter. An adapter may register for these messages by calling
- # the register_for_inter_adapter_messages() method in the adapter agent.
- # Note that it is the responsibility of the sending and receiving
- # adapters to properly encode and decode the message.
- # :param msg: The message contents.
- # :return: None
- # """
- #
- # def send_proxied_message(proxy_address, msg):
- # """
- # Forward a msg to a child device of device, addressed by the given
- # proxy_address=Device.ProxyAddress().
- # :param proxy_address: Address info for the parent device
- # to route the message to the child device. This was given to the
- # child device by the parent device at the creation of the child
- # device.
- # :param msg: (str) The actual message to send.
- # :return: (Deferred(None) or None) The return of this method should
- # indicate that the message was successfully *sent*.
- # """
- #
- # def receive_proxied_message(proxy_address, msg):
- # """
- # Pass an async message (arrived via a proxy) to this device.
- # :param proxy_address: Address info for the parent device
- # to route the message to the child device. This was given to the
- # child device by the parent device at the creation of the child
- # device. Note this is the proxy_address with which the adapter
- # had to register prior to receiving proxied messages.
- # :param msg: (str) The actual message received.
- # :return: None
- # """
- #
- # def receive_packet_out(logical_device_id, egress_port_no, msg):
- # """
- # Pass a packet_out message content to adapter so that it can forward it
- # out to the device. This is only called on root devices.
- # :param logical_device_id:
- # :param egress_port: egress logical port number
- # :param msg: actual message
- # :return: None
- # """
- #
- # def change_master_state(master):
- # """
- # Called to indicate if plugin shall assume or lose master role. The
- # master role can be used to perform functions that must be performed
- # from a single point in the cluster. In single-node deployments of
- # Voltha, the plugins are always in master role.
- # :param master: (bool) True to indicate the mastership needs to be
- # assumed; False to indicate that mastership needs to be abandoned.
- # :return: (Deferred) which is fired by the adapter when mastership is
- # assumed/dropped, respectively.
- # """
+ def process_inter_adapter_message(msg):
+ """
+ Called when the adapter receives a message that was sent to it directly
+ from another adapter. An adapter is automatically registered for these
+ messages when creating the inter-container kafka proxy. Note that it is
+ the responsibility of the sending and receiving adapters to properly encode
+ and decode the message.
+ :param msg: Proto Message (any)
+ :return: Proto Message Response
+ """
-# class IAdapterAgent(Interface):
-# """
-# This object is passed in to the __init__ function of each adapter,
-# and can be used by the adapter implementation to initiate async calls
-# toward Voltha's CORE via the APIs defined here.
-# """
-#
-# def get_device(device_id):
-# # TODO add doc
-# """"""
-#
-# def add_device(device):
-# # TODO add doc
-# """"""
-#
-# def update_device(device):
-# # TODO add doc
-# """"""
-#
-# def add_port(device_id, port):
-# # TODO add doc
-# """"""
-#
-# def create_logical_device(logical_device):
-# # TODO add doc
-# """"""
-#
-# def add_logical_port(logical_device_id, port):
-# # TODO add doc
-# """"""
-#
-# def child_device_detected(parent_device_id,
-# parent_port_no,
-# child_device_type,
-# proxy_address,
-# admin_state,
-# **kw):
-# # TODO add doc
-# """"""
-#
-# def send_proxied_message(proxy_address, msg):
-# """
-# Forward a msg to a child device of device, addressed by the given
-# proxy_address=Device.ProxyAddress().
-# :param proxy_address: Address info for the parent device
-# to route the message to the child device. This was given to the
-# child device by the parent device at the creation of the child
-# device.
-# :param msg: (str) The actual message to send.
-# :return: (Deferred(None) or None) The return of this method should
-# indicate that the message was successfully *sent*.
-# """
-#
-# def receive_proxied_message(proxy_address, msg):
-# """
-# Pass an async message (arrived via a proxy) to this device.
-# :param proxy_address: Address info for the parent device
-# to route the message to the child device. This was given to the
-# child device by the parent device at the creation of the child
-# device. Note this is the proxy_address with which the adapter
-# had to register prior to receiving proxied messages.
-# :param msg: (str) The actual message received.
-# :return: None
-# """
-#
-# def register_for_proxied_messages(proxy_address):
-# """
-# A child device adapter can use this to indicate its intent to
-# receive async messages sent via a parent device. Example: an
-# ONU adapter can use this to register for OMCI messages received
-# via the OLT and the OLT adapter.
-# :param child_device_address: Address info that was given to the
-# child device by the parent device at the creation of the child
-# device. Its uniqueness acts as a router information for the
-# registration.
-# :return: None
-# """
-#
-# def unregister_for_proxied_messages(proxy_address):
-# """
-# Cancel a previous registration
-# :return:
-# """
-#
-# def send_packet_in(logical_device_id, logical_port_no, packet):
-# """
-# Forward given packet to the northbound toward an SDN controller.
-# :param device_id: logical device identifier
-# :param logical_port_no: logical port_no (as numbered in openflow)
-# :param packet: the actual packet; can be a serialized string or a scapy
-# Packet.
-# :return: None returned on success
-# """
-#
-# def submit_kpis(kpi_event_msg):
-# """
-# Submit KPI metrics on behalf of the OLT and its adapter. This can
-# include hardware related metrics, usage and utilization metrics, as
-# well as optional adapter specific metrics.
-# :param kpi_event_msg: A protobuf message of KpiEvent type.
-# :return: None
-# """
-#
-# def submit_alarm(device_id, alarm_event_msg):
-# """
-# Submit an alarm on behalf of the OLT and its adapter.
-# :param alarm_event_msg: A protobuf message of AlarmEvent type.
-# :return: None
-# """
-#
-# def register_for_onu_detect_state(proxy_address):
-# """
-#
-# :return: None
-# """
-#
-# def unregister_for_onu_detect_state(proxy_address):
-# """
-#
-# :return: None
-# """
-#
-# def forward_onu_detect_state(proxy_address, state):
-# """
-# Forward onu detect state to ONU adapter
-# :param proxy_address: ONU device address
-# :param state: ONU detect state (bool)
-# :return: None
-# """
-
class ICoreSouthBoundInterface(Interface):
"""
Represents a Voltha Core. This is used by an adapter to initiate async
@@ -520,7 +329,6 @@
:return: None
"""
-
def child_device_detected(parent_device_id,
parent_port_no,
child_device_type,
@@ -649,122 +457,3 @@
:param packet: The actual packet
:return: None
"""
-
- # def add_device(device):
- # # TODO add doc
- # """"""
-
- # def update_device(device):
- # # TODO add doc
- # """"""
-
- # def add_port(device_id, port):
- # # TODO add doc
- # """"""
-
- # def create_logical_device(logical_device):
- # # TODO add doc
- # """"""
- #
- # def add_logical_port(logical_device_id, port):
- # # TODO add doc
- # """"""
-
- # def child_device_detected(parent_device_id,
- # parent_port_no,
- # child_device_type,
- # proxy_address,
- # admin_state,
- # **kw):
- # # TODO add doc
- # """"""
-
- # def send_proxied_message(proxy_address, msg):
- # """
- # Forward a msg to a child device of device, addressed by the given
- # proxy_address=Device.ProxyAddress().
- # :param proxy_address: Address info for the parent device
- # to route the message to the child device. This was given to the
- # child device by the parent device at the creation of the child
- # device.
- # :param msg: (str) The actual message to send.
- # :return: (Deferred(None) or None) The return of this method should
- # indicate that the message was successfully *sent*.
- # """
- #
- # def receive_proxied_message(proxy_address, msg):
- # """
- # Pass an async message (arrived via a proxy) to this device.
- # :param proxy_address: Address info for the parent device
- # to route the message to the child device. This was given to the
- # child device by the parent device at the creation of the child
- # device. Note this is the proxy_address with which the adapter
- # had to register prior to receiving proxied messages.
- # :param msg: (str) The actual message received.
- # :return: None
- # """
- #
- # def register_for_proxied_messages(proxy_address):
- # """
- # A child device adapter can use this to indicate its intent to
- # receive async messages sent via a parent device. Example: an
- # ONU adapter can use this to register for OMCI messages received
- # via the OLT and the OLT adapter.
- # :param child_device_address: Address info that was given to the
- # child device by the parent device at the creation of the child
- # device. Its uniqueness acts as a router information for the
- # registration.
- # :return: None
- # """
- #
- # def unregister_for_proxied_messages(proxy_address):
- # """
- # Cancel a previous registration
- # :return:
- # """
- #
- # def submit_kpis(kpi_event_msg):
- # """
- # Submit KPI metrics on behalf of the OLT and its adapter. This can
- # include hardware related metrics, usage and utilization metrics, as
- # well as optional adapter specific metrics.
- # :param kpi_event_msg: A protobuf message of KpiEvent type.
- # :return: None
- # """
- #
- # def submit_alarm(device_id, alarm_event_msg):
- # """
- # Submit an alarm on behalf of the OLT and its adapter.
- # :param alarm_event_msg: A protobuf message of AlarmEvent type.
- # :return: None
- # """
-
- # def register_for_onu_detect_state(proxy_address):
- # """
- #
- # :return: None
- # """
- #
- # def unregister_for_onu_detect_state(proxy_address):
- # """
- #
- # :return: None
- # """
- #
- # def forward_onu_detect_state(proxy_address, state):
- # """
- # Forward onu detect state to ONU adapter
- # :param proxy_address: ONU device address
- # :param state: ONU detect state (bool)
- # :return: None
- # """
- #
- # def send_packet_in(logical_device_id, logical_port_no, packet):
- # """
- # Forward given packet to the northbound toward an SDN controller.
- # :param device_id: logical device identifier
- # :param logical_port_no: logical port_no (as numbered in openflow)
- # :param packet: the actual packet; can be a serialized string or a
- # scapy Packet.
- # :return: None returned on success
- # """
diff --git a/adapters/kafka/adapter_proxy.py b/adapters/kafka/adapter_proxy.py
new file mode 100644
index 0000000..2d4831a
--- /dev/null
+++ b/adapters/kafka/adapter_proxy.py
@@ -0,0 +1,110 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Agent to play gateway between adapters.
+"""
+
+import structlog
+from uuid import uuid4
+from twisted.internet.defer import inlineCallbacks, returnValue
+from adapters.kafka.container_proxy import ContainerProxy
+from adapters.protos import third_party
+from adapters.protos.core_adapter_pb2 import InterAdapterHeader, \
+ InterAdapterMessage
+import time
+
+_ = third_party
+log = structlog.get_logger()
+
+
+class AdapterProxy(ContainerProxy):
+
+ def __init__(self, kafka_proxy, core_topic, my_listening_topic):
+ super(AdapterProxy, self).__init__(kafka_proxy,
+ core_topic,
+ my_listening_topic)
+
+ def _to_string(self, unicode_str):
+ if unicode_str is not None:
+ if type(unicode_str) == unicode:
+ return unicode_str.encode('ascii', 'ignore')
+ else:
+ return unicode_str
+ else:
+ return ""
+
+ @ContainerProxy.wrap_request(None)
+ @inlineCallbacks
+ def send_inter_adapter_message(self,
+ msg,
+ type,
+ from_adapter,
+ to_adapter,
+ to_device_id=None,
+ proxy_device_id=None,
+ message_no=None):
+ """
+ Sends a message directly to an adapter. This is typically used to send
+ proxied messages from one adapter to another. An initial ACK response
+ is sent back to the invoking adapter. If there is subsequent response
+ to be sent back (async) then the adapter receiving this request will
+ use this same API to send back the async response.
+ :param msg : GRPC message to send
+ :param type : InterAdapterMessageType of the message to send
+ :param from_adapter: Name of the adapter making the request.
+ :param to_adapter: Name of the remote adapter.
+ :param to_device_id: The ID of the device for to the message is
+ intended. if it's None then the message is not intended to a specific
+ device. Its interpretation is adapter specific.
+ :param proxy_device_id: The ID of the device which will proxy that
+ message. If it's None then there is no specific device to proxy the
+ message. Its interpretation is adapter specific.
+ :param message_no: A unique number for this transaction that the
+ adapter may use to correlate a request and an async response.
+ """
+
+ try:
+ # validate params
+ assert msg
+ assert from_adapter
+ assert to_adapter
+
+ # Build the inter adapter message
+ h = InterAdapterHeader()
+ h.type = type
+ h.from_topic = self._to_string(from_adapter)
+ h.to_topic = self._to_string(to_adapter)
+ h.to_device_id = self._to_string(to_device_id)
+ h.proxy_device_id = self._to_string(proxy_device_id)
+
+ if message_no:
+ h.id = self._to_string(message_no)
+ else:
+ h.id = uuid4().hex
+
+ h.timestamp = int(round(time.time() * 1000))
+ iaMsg = InterAdapterMessage()
+ iaMsg.header.CopyFrom(h)
+ iaMsg.body.Pack(msg)
+
+ log.debug("sending-inter-adapter-message", header=iaMsg.header)
+ res = yield self.invoke(rpc="process_inter_adapter_message",
+ to_topic=iaMsg.header.to_topic,
+ msg=iaMsg)
+ returnValue(res)
+ except Exception as e:
+ log.exception("error-sending-request", e=e)
diff --git a/adapters/kafka/adapter_request_facade.py b/adapters/kafka/adapter_request_facade.py
index 3009206..67f7869 100644
--- a/adapters/kafka/adapter_request_facade.py
+++ b/adapters/kafka/adapter_request_facade.py
@@ -15,35 +15,18 @@
#
"""
-Agent to play gateway between CORE and an individual adapter.
+This facade handles kafka-formatted messages from the Core, extracts the kafka
+formatting and forwards the request to the concrete handler.
"""
-from uuid import uuid4
-import arrow
-import structlog
-from google.protobuf.json_format import MessageToJson
-from scapy.packet import Packet
-from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.defer import inlineCallbacks
from zope.interface import implementer
-from adapters.common.event_bus import EventBusClient
-from adapters.common.frameio.frameio import hexify
-from adapters.common.utils.id_generation import create_cluster_logical_device_ids
from adapters.interface import IAdapterInterface
+from adapters.protos.core_adapter_pb2 import IntType, InterAdapterMessage
from adapters.protos.device_pb2 import Device
-
-from adapters.protos import third_party
-from adapters.protos.device_pb2 import Device, Port, PmConfigs
-from adapters.protos.events_pb2 import AlarmEvent, AlarmEventType, \
- AlarmEventSeverity, AlarmEventState, AlarmEventCategory
-from adapters.protos.events_pb2 import KpiEvent
-from adapters.protos.voltha_pb2 import DeviceGroup, LogicalDevice, \
- LogicalPort, AdminState, OperStatus, AlarmFilterRuleKey
-from adapters.common.utils.registry import registry
-from adapters.common.utils.id_generation import create_cluster_device_id
-from adapters.protos.core_adapter_pb2 import IntType
-from adapters.protos.openflow_13_pb2 import FlowChanges, FlowGroups, Flows, FlowGroupChanges
-import re
+from adapters.protos.openflow_13_pb2 import FlowChanges, FlowGroups, Flows, \
+ FlowGroupChanges
class MacAddressError(BaseException):
@@ -107,7 +90,6 @@
return True, self.adapter.get_ofp_port_info(d, p.val)
-
def reconcile_device(self, device):
return self.adapter.reconcile_device(device)
@@ -207,3 +189,11 @@
def unsuppress_alarm(self, filter):
return self.adapter.unsuppress_alarm(filter)
+ def process_inter_adapter_message(self, msg):
+ m = InterAdapterMessage()
+ if msg:
+ msg.Unpack(m)
+ else:
+ return (False, m)
+
+ return (True, self.adapter.process_inter_adapter_message(m))
diff --git a/adapters/kafka/container_proxy.py b/adapters/kafka/container_proxy.py
new file mode 100644
index 0000000..79918cd
--- /dev/null
+++ b/adapters/kafka/container_proxy.py
@@ -0,0 +1,114 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+The superclass for all kafka proxy subclasses.
+"""
+
+import structlog
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.python import failure
+from zope.interface import implementer
+
+from adapters.common.utils.deferred_utils import DeferredWithTimeout, \
+ TimeOutError
+from adapters.common.utils.registry import IComponent
+
+log = structlog.get_logger()
+
+
+class KafkaMessagingError(BaseException):
+ def __init__(self, error):
+ self.error = error
+
+
+@implementer(IComponent)
+class ContainerProxy(object):
+
+ def __init__(self, kafka_proxy, core_topic, my_listening_topic):
+ self.kafka_proxy = kafka_proxy
+ self.listening_topic = my_listening_topic
+ self.core_topic = core_topic
+ self.default_timeout = 3
+
+ def start(self):
+ log.info('started')
+
+ return self
+
+ def stop(self):
+ log.info('stopped')
+
+ @classmethod
+ def wrap_request(cls, return_cls):
+ def real_wrapper(func):
+ @inlineCallbacks
+ def wrapper(*args, **kw):
+ try:
+ (success, d) = yield func(*args, **kw)
+ if success:
+ log.debug("successful-response", func=func, val=d)
+ if return_cls is not None:
+ rc = return_cls()
+ if d is not None:
+ d.Unpack(rc)
+ returnValue(rc)
+ else:
+ log.debug("successful-response-none", func=func,
+ val=None)
+ returnValue(None)
+ else:
+ log.warn("unsuccessful-request", func=func, args=args,
+ kw=kw)
+ returnValue(d)
+ except Exception as e:
+ log.exception("request-wrapper-exception", func=func, e=e)
+ raise
+
+ return wrapper
+
+ return real_wrapper
+
+ @inlineCallbacks
+ def invoke(self, rpc, to_topic=None, **kwargs):
+ @inlineCallbacks
+ def _send_request(rpc, m_callback, to_topic, **kwargs):
+ try:
+ log.debug("sending-request", rpc=rpc)
+ if to_topic is None:
+ to_topic = self.core_topic
+ result = yield self.kafka_proxy.send_request(rpc=rpc,
+ to_topic=to_topic,
+ reply_topic=self.listening_topic,
+ callback=None,
+ **kwargs)
+ if not m_callback.called:
+ m_callback.callback(result)
+ else:
+ log.debug('timeout-already-occurred', rpc=rpc)
+ except Exception as e:
+ log.exception("Failure-sending-request", rpc=rpc, kw=kwargs)
+ if not m_callback.called:
+ m_callback.errback(failure.Failure())
+
+ cb = DeferredWithTimeout(timeout=self.default_timeout)
+ _send_request(rpc, cb, to_topic, **kwargs)
+ try:
+ res = yield cb
+ returnValue(res)
+ except TimeOutError as e:
+ log.warn('invoke-timeout', e=e)
+ raise e
diff --git a/adapters/kafka/core_proxy.py b/adapters/kafka/core_proxy.py
index 512262f..36459ed 100644
--- a/adapters/kafka/core_proxy.py
+++ b/adapters/kafka/core_proxy.py
@@ -1,5 +1,5 @@
#
-# Copyright 2017 the original author or authors.
+# Copyright 2018 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -15,122 +15,28 @@
#
"""
-Agent to play gateway between CORE and an individual adapter.
+Agent to play gateway between CORE and an adapter.
"""
-from uuid import uuid4
-
-import arrow
import structlog
-from google.protobuf.json_format import MessageToJson
-from scapy.packet import Packet
-from twisted.internet.defer import inlineCallbacks, returnValue
-from twisted.python import failure
-from zope.interface import implementer
-
-from adapters.common.event_bus import EventBusClient
-from adapters.common.frameio.frameio import hexify
-from adapters.common.utils.id_generation import create_cluster_logical_device_ids
-from adapters.interface import IAdapterInterface
-from adapters.protos import third_party
-from adapters.protos.device_pb2 import Device, Port, Ports, PmConfigs
-from adapters.protos.events_pb2 import AlarmEvent, AlarmEventType, \
- AlarmEventSeverity, AlarmEventState, AlarmEventCategory
-from adapters.protos.events_pb2 import KpiEvent
-from adapters.protos.voltha_pb2 import DeviceGroup, LogicalDevice, \
- LogicalPort, AlarmFilterRuleKey, CoreInstance
-from adapters.common.utils.registry import registry, IComponent
-from adapters.common.utils.id_generation import create_cluster_device_id
-import re
-from adapters.interface import ICoreSouthBoundInterface
-from adapters.protos.core_adapter_pb2 import StrType, BoolType, IntType
-from adapters.protos.common_pb2 import ID, ConnectStatus, OperStatus
from google.protobuf.message import Message
-from adapters.common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
+from twisted.internet.defer import inlineCallbacks, returnValue
+
+from adapters.kafka.container_proxy import ContainerProxy
+from adapters.protos.common_pb2 import ID, ConnectStatus, OperStatus
+from adapters.protos.core_adapter_pb2 import StrType, BoolType, IntType
+from adapters.protos.device_pb2 import Device, Ports
+from adapters.protos.voltha_pb2 import CoreInstance
log = structlog.get_logger()
-class KafkaMessagingError(BaseException):
- def __init__(self, error):
- self.error = error
-def wrap_request(return_cls):
- def real_wrapper(func):
- @inlineCallbacks
- def wrapper(*args, **kw):
- try:
- (success, d) = yield func(*args, **kw)
- if success:
- log.debug("successful-response", func=func, val=d)
- if return_cls is not None:
- rc = return_cls()
- if d is not None:
- d.Unpack(rc)
- returnValue(rc)
- else:
- log.debug("successful-response-none", func=func,
- val=None)
- returnValue(None)
- else:
- log.warn("unsuccessful-request", func=func, args=args, kw=kw)
- returnValue(d)
- except Exception as e:
- log.exception("request-wrapper-exception", func=func, e=e)
- raise
- return wrapper
- return real_wrapper
-
-
-@implementer(IComponent, ICoreSouthBoundInterface)
-class CoreProxy(object):
+class CoreProxy(ContainerProxy):
def __init__(self, kafka_proxy, core_topic, my_listening_topic):
- self.kafka_proxy = kafka_proxy
- self.listening_topic = my_listening_topic
- self.core_topic = core_topic
- self.default_timeout = 3
+ super(CoreProxy, self).__init__(kafka_proxy, core_topic,
+ my_listening_topic)
- def start(self):
- log.info('started')
-
- return self
-
- def stop(self):
- log.info('stopped')
-
- @inlineCallbacks
- def invoke(self, rpc, to_topic=None, **kwargs):
- @inlineCallbacks
- def _send_request(rpc, m_callback,to_topic, **kwargs):
- try:
- log.debug("sending-request", rpc=rpc)
- if to_topic is None:
- to_topic = self.core_topic
- result = yield self.kafka_proxy.send_request(rpc=rpc,
- to_topic=to_topic,
- reply_topic=self.listening_topic,
- callback=None,
- **kwargs)
- if not m_callback.called:
- m_callback.callback(result)
- else:
- log.debug('timeout-already-occurred', rpc=rpc)
- except Exception as e:
- log.exception("Failure-sending-request", rpc=rpc, kw=kwargs)
- if not m_callback.called:
- m_callback.errback(failure.Failure())
-
- log.debug('invoke-request', rpc=rpc)
- cb = DeferredWithTimeout(timeout=self.default_timeout)
- _send_request(rpc, cb, to_topic, **kwargs)
- try:
- res = yield cb
- returnValue(res)
- except TimeOutError as e:
- log.warn('invoke-timeout', e=e)
- raise e
-
-
- @wrap_request(CoreInstance)
+ @ContainerProxy.wrap_request(CoreInstance)
@inlineCallbacks
def register(self, adapter):
log.debug("register")
@@ -142,7 +48,7 @@
log.exception("registration-exception", e=e)
raise
- @wrap_request(Device)
+ @ContainerProxy.wrap_request(Device)
@inlineCallbacks
def get_device(self, device_id):
log.debug("get-device")
@@ -151,15 +57,12 @@
res = yield self.invoke(rpc="GetDevice", device_id=id)
returnValue(res)
- @wrap_request(Device)
+ @ContainerProxy.wrap_request(Device)
@inlineCallbacks
def get_child_device(self, parent_device_id, **kwargs):
raise NotImplementedError()
- # def add_device(self, device):
- # raise NotImplementedError()
-
- @wrap_request(Ports)
+ @ContainerProxy.wrap_request(Ports)
@inlineCallbacks
def get_ports(self, device_id, port_type):
id = ID()
@@ -179,7 +82,7 @@
def _to_proto(self, **kwargs):
encoded = {}
- for k,v in kwargs.iteritems():
+ for k, v in kwargs.iteritems():
if isinstance(v, Message):
encoded[k] = v
elif type(v) == int:
@@ -196,8 +99,7 @@
encoded[k] = b_proto
return encoded
-
- @wrap_request(None)
+ @ContainerProxy.wrap_request(None)
@inlineCallbacks
def child_device_detected(self,
parent_device_id,
@@ -217,14 +119,13 @@
args = self._to_proto(**kw)
res = yield self.invoke(rpc="ChildDeviceDetected",
parent_device_id=id,
- parent_port_no = ppn,
- child_device_type= cdt,
+ parent_port_no=ppn,
+ child_device_type=cdt,
channel_id=channel,
**args)
returnValue(res)
-
- @wrap_request(None)
+ @ContainerProxy.wrap_request(None)
@inlineCallbacks
def device_update(self, device):
log.debug("device_update")
@@ -234,21 +135,20 @@
def child_device_removed(parent_device_id, child_device_id):
raise NotImplementedError()
-
- @wrap_request(None)
+ @ContainerProxy.wrap_request(None)
@inlineCallbacks
def device_state_update(self, device_id,
- oper_status=None,
- connect_status=None):
+ oper_status=None,
+ connect_status=None):
id = ID()
id.id = device_id
o_status = IntType()
- if oper_status or oper_status==OperStatus.UNKNOWN:
+ if oper_status or oper_status == OperStatus.UNKNOWN:
o_status.val = oper_status
else:
o_status.val = -1
c_status = IntType()
- if connect_status or connect_status==ConnectStatus.UNKNOWN:
+ if connect_status or connect_status == ConnectStatus.UNKNOWN:
c_status.val = connect_status
else:
c_status.val = -1
@@ -259,21 +159,20 @@
connect_status=c_status)
returnValue(res)
-
- @wrap_request(None)
+ @ContainerProxy.wrap_request(None)
@inlineCallbacks
def children_state_update(self, device_id,
- oper_status=None,
- connect_status=None):
+ oper_status=None,
+ connect_status=None):
id = ID()
id.id = device_id
o_status = IntType()
- if oper_status or oper_status==OperStatus.UNKNOWN:
+ if oper_status or oper_status == OperStatus.UNKNOWN:
o_status.val = oper_status
else:
o_status.val = -1
c_status = IntType()
- if connect_status or connect_status==ConnectStatus.UNKNOWN:
+ if connect_status or connect_status == ConnectStatus.UNKNOWN:
c_status.val = connect_status
else:
c_status.val = -1
@@ -284,7 +183,7 @@
connect_status=c_status)
returnValue(res)
- @wrap_request(None)
+ @ContainerProxy.wrap_request(None)
@inlineCallbacks
def port_state_update(self,
device_id,
@@ -307,9 +206,7 @@
oper_status=o_status)
returnValue(res)
-
-
- @wrap_request(None)
+ @ContainerProxy.wrap_request(None)
@inlineCallbacks
def child_devices_state_update(self, parent_device_id,
oper_status=None,
@@ -318,12 +215,12 @@
id = ID()
id.id = parent_device_id
o_status = IntType()
- if oper_status or oper_status==OperStatus.UNKNOWN:
+ if oper_status or oper_status == OperStatus.UNKNOWN:
o_status.val = oper_status
else:
o_status.val = -1
c_status = IntType()
- if connect_status or connect_status==ConnectStatus.UNKNOWN:
+ if connect_status or connect_status == ConnectStatus.UNKNOWN:
c_status.val = connect_status
else:
c_status.val = -1
@@ -334,12 +231,10 @@
connect_status=c_status)
returnValue(res)
-
def child_devices_removed(parent_device_id):
raise NotImplementedError()
-
- @wrap_request(None)
+ @ContainerProxy.wrap_request(None)
@inlineCallbacks
def device_pm_config_update(self, device_pm_config, init=False):
log.debug("device_pm_config_update")
@@ -349,16 +244,16 @@
device_pm_config=device_pm_config, init=b)
returnValue(res)
- @wrap_request(None)
+ @ContainerProxy.wrap_request(None)
@inlineCallbacks
def port_created(self, device_id, port):
log.debug("port_created")
proto_id = ID()
proto_id.id = device_id
- res = yield self.invoke(rpc="PortCreated", device_id=proto_id, port=port)
+ res = yield self.invoke(rpc="PortCreated", device_id=proto_id,
+ port=port)
returnValue(res)
-
def port_removed(device_id, port):
raise NotImplementedError()
diff --git a/adapters/kafka/kafka_inter_container_library.py b/adapters/kafka/kafka_inter_container_library.py
index f5bb720..3f6f5eb 100644
--- a/adapters/kafka/kafka_inter_container_library.py
+++ b/adapters/kafka/kafka_inter_container_library.py
@@ -14,28 +14,32 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from zope.interface import Interface, implementer
-from adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
+import time
+from uuid import uuid4
+
+import structlog
+from afkak.client import KafkaClient
+from afkak.consumer import OFFSET_LATEST, Consumer
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue, Deferred, \
DeferredQueue, gatherResults
-from afkak.client import KafkaClient
-from afkak.consumer import OFFSET_LATEST, Consumer
-import structlog
-from adapters.common.utils import asleep
-from adapters.protos.core_adapter_pb2 import MessageType, Argument, \
- InterContainerRequestBody, InterContainerMessage, Header, InterContainerResponseBody
-import time
-from uuid import uuid4
-from adapters.common.utils.registry import IComponent
+from zope.interface import implementer
+from adapters.common.utils import asleep
+from adapters.common.utils.registry import IComponent
+from adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
+from adapters.protos.core_adapter_pb2 import MessageType, Argument, \
+ InterContainerRequestBody, InterContainerMessage, Header, \
+ InterContainerResponseBody
log = structlog.get_logger()
+
class KafkaMessagingError(BaseException):
def __init__(self, error):
self.error = error
+
@implementer(IComponent)
class IKafkaMessagingProxy(object):
_kafka_messaging_instance = None
@@ -115,7 +119,6 @@
except Exception as e:
log.exception("Failed-to-start-proxy", e=e)
-
def stop(self):
"""
Invoked to stop the kafka proxy
@@ -135,7 +138,6 @@
except Exception as e:
log.exception("Exception-when-stopping-messaging-proxy:", e=e)
-
@inlineCallbacks
def _wait_until_topic_is_ready(self, client, topic):
e = True
@@ -165,7 +167,8 @@
for partition in partitions]
self.topic_consumer_map[topic] = consumers
- log.debug("_subscribe", topic=topic, consumermap=self.topic_consumer_map)
+ log.debug("_subscribe", topic=topic,
+ consumermap=self.topic_consumer_map)
if target_cls is not None and callback is None:
# Scenario #1
@@ -409,6 +412,7 @@
Default internal method invoked for every batch of messages received
from Kafka.
"""
+
def _toDict(args):
"""
Convert a repeatable Argument type into a python dictionary
@@ -443,24 +447,6 @@
message.ParseFromString(val)
if message.header.type == MessageType.Value("REQUEST"):
- # if self.num_messages == 0:
- # self.init_time = int(round(time.time() * 1000))
- # self.init_received_time = message.header.timestamp
- # log.debug("INIT_TIME", time=self.init_time,
- # time_sent=message.header.timestamp)
- # self.num_messages = self.num_messages + 1
- #
- # self.total_time = self.total_time + current_time - message.header.timestamp
- #
- # if self.num_messages % 10 == 0:
- # log.debug("TOTAL_TIME ...",
- # num=self.num_messages,
- # total=self.total_time,
- # duration=current_time - self.init_time,
- # time_since_first_msg=current_time - self.init_received_time,
- # average=self.total_time / 10)
- # self.total_time = 0
-
# Get the target class for that specific topic
targetted_topic = self._to_string(message.header.to_topic)
msg_body = InterContainerRequestBody()
@@ -497,16 +483,6 @@
elif message.header.type == MessageType.Value("RESPONSE"):
trns_id = self._to_string(message.header.id)
if trns_id in self.transaction_id_deferred_map:
- # self.num_responses = self.num_responses + 1
- # self.total_time_responses = self.total_time_responses + current_time - \
- # message.header.timestamp
- # if self.num_responses % 10 == 0:
- # log.debug("TOTAL RESPONSES ...",
- # num=self.num_responses,
- # total=self.total_time_responses,
- # average=self.total_time_responses / 10)
- # self.total_time_responses = 0
-
resp = self._parse_response(val)
self.transaction_id_deferred_map[trns_id].callback(resp)
@@ -568,9 +544,9 @@
self.transaction_id_deferred_map[
self._to_string(request.header.id)] = wait_for_result
- log.debug("BEFORE-SENDING", to_topic=to_topic, from_topic=reply_topic)
yield self._send_kafka_message(to_topic, request)
- log.debug("AFTER-SENDING", to_topic=to_topic, from_topic=reply_topic)
+ log.debug("message-sent", to_topic=to_topic,
+ from_topic=reply_topic)
if response_required:
res = yield wait_for_result
diff --git a/adapters/kafka/kafka_proxy.py b/adapters/kafka/kafka_proxy.py
index 10fdbf8..c11caa7 100644
--- a/adapters/kafka/kafka_proxy.py
+++ b/adapters/kafka/kafka_proxy.py
@@ -16,7 +16,7 @@
from afkak.client import KafkaClient as _KafkaClient
from afkak.common import (
- PRODUCER_ACK_LOCAL_WRITE, PRODUCER_ACK_NOT_REQUIRED
+ PRODUCER_ACK_NOT_REQUIRED
)
from afkak.producer import Producer as _kafkaProducer
from structlog import get_logger
@@ -24,9 +24,8 @@
from zope.interface import implementer
from adapters.common.utils.consulhelpers import get_endpoint_from_consul
-from adapters.kafka.event_bus_publisher import EventBusPublisher
from adapters.common.utils.registry import IComponent
-import time
+from adapters.kafka.event_bus_publisher import EventBusPublisher
log = get_logger()
@@ -96,21 +95,12 @@
log.exception('failed-stopped-kproducer-kafka-proxy', e=e)
pass
- #try:
- # if self.event_bus_publisher:
- # yield self.event_bus_publisher.stop()
- # self.event_bus_publisher = None
- # log.debug('stopped-event-bus-publisher-kafka-proxy')
- #except Exception, e:
- # log.debug('failed-stopped-event-bus-publisher-kafka-proxy')
- # pass
-
log.debug('stopped-kafka-proxy')
except Exception, e:
self.kclient = None
self.kproducer = None
- #self.event_bus_publisher = None
+ # self.event_bus_publisher = None
log.exception('failed-stopped-kafka-proxy', e=e)
pass
@@ -122,7 +112,8 @@
if self.kafka_endpoint.startswith('@'):
try:
_k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
- self.kafka_endpoint[1:])
+ self.kafka_endpoint[
+ 1:])
log.debug('found-kafka-service', endpoint=_k_endpoint)
except Exception as e:
@@ -160,7 +151,8 @@
self._get_kafka_producer()
# Lets the next message request do the retry if still a failure
if self.kproducer is None:
- log.error('no-kafka-producer', endpoint=self.kafka_endpoint)
+ log.error('no-kafka-producer',
+ endpoint=self.kafka_endpoint)
return
# log.debug('sending-kafka-msg', topic=topic, msg=msg)
@@ -206,4 +198,3 @@
# Common method to get the singleton instance of the kafka proxy class
def get_kafka_proxy():
return KafkaProxy._kafka_instance
-
diff --git a/adapters/ponsim_olt/__init__.py b/adapters/ponsim_olt/__init__.py
index b0fb0b2..4a82628 100644
--- a/adapters/ponsim_olt/__init__.py
+++ b/adapters/ponsim_olt/__init__.py
@@ -1,4 +1,4 @@
-# Copyright 2017-present Open Networking Foundation
+# Copyright 2018-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
diff --git a/adapters/ponsim_olt/main.py b/adapters/ponsim_olt/main.py
index 53745ee..c9ad9d0 100755
--- a/adapters/ponsim_olt/main.py
+++ b/adapters/ponsim_olt/main.py
@@ -18,30 +18,33 @@
"""Ponsim OLT Adapter main entry point"""
import argparse
-import arrow
import os
import time
+import arrow
import yaml
+from packaging.version import Version
from simplejson import dumps
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.task import LoopingCall
from zope.interface import implementer
-from adapters.protos import third_party
+
from adapters.common.structlog_setup import setup_logging, update_logging
+from adapters.common.utils.asleep import asleep
+from adapters.common.utils.deferred_utils import TimeOutError
from adapters.common.utils.dockerhelpers import get_my_containers_name
from adapters.common.utils.nethelpers import get_my_primary_local_ipv4, \
get_my_primary_interface
-from adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
from adapters.common.utils.registry import registry, IComponent
-from packaging.version import Version
-from adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, get_messaging_proxy
-from adapters.ponsim_olt.ponsim_olt import PonSimOltAdapter
-from adapters.protos.adapter_pb2 import AdapterConfig, Adapter
+from adapters.kafka.adapter_proxy import AdapterProxy
from adapters.kafka.adapter_request_facade import AdapterRequestFacade
from adapters.kafka.core_proxy import CoreProxy
-from adapters.common.utils.deferred_utils import TimeOutError
-from adapters.common.utils.asleep import asleep
+from adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
+ get_messaging_proxy
+from adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
+from adapters.ponsim_olt.ponsim_olt import PonSimOltAdapter
+from adapters.protos import third_party
+from adapters.protos.adapter_pb2 import AdapterConfig, Adapter
_ = third_party
@@ -358,8 +361,13 @@
core_topic=self.core_topic,
my_listening_topic=self.listening_topic)
+ self.adapter_proxy = AdapterProxy(
+ kafka_proxy=None,
+ core_topic=self.core_topic,
+ my_listening_topic=self.listening_topic)
+
ponsim_olt_adapter = PonSimOltAdapter(
- adapter_agent=self.core_proxy, config=config)
+ core_proxy=self.core_proxy, adapter_proxy=self.adapter_proxy, config=config)
ponsim_request_handler = AdapterRequestFacade(
adapter=ponsim_olt_adapter)
@@ -376,6 +384,7 @@
).start()
self.core_proxy.kafka_proxy = get_messaging_proxy()
+ self.adapter_proxy.kafka_proxy = get_messaging_proxy()
# retry for ever
res = yield self._register_with_core(-1)
diff --git a/adapters/ponsim_olt/ponsim_olt.py b/adapters/ponsim_olt/ponsim_olt.py
index 1806a33..88c6b4d 100644
--- a/adapters/ponsim_olt/ponsim_olt.py
+++ b/adapters/ponsim_olt/ponsim_olt.py
@@ -17,42 +17,39 @@
"""
Fully simulated OLT adapter.
"""
-from uuid import uuid4
import arrow
-import adapters.common.openflow.utils as fd
import grpc
import structlog
+from google.protobuf.empty_pb2 import Empty
+from google.protobuf.json_format import MessageToDict
+from google.protobuf.message import Message
+from grpc._channel import _Rendezvous
from scapy.layers.l2 import Ether, Dot1Q
+from simplejson import dumps
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue
-from grpc._channel import _Rendezvous
+from twisted.internet.task import LoopingCall
from adapters.common.frameio.frameio import BpfProgramFilter, hexify
from adapters.common.utils.asleep import asleep
-from twisted.internet.task import LoopingCall
+from adapters.common.utils.registry import registry
from adapters.iadapter import OltAdapter
-from adapters.protos import third_party
-from adapters.protos import openflow_13_pb2 as ofp
+from adapters.kafka.kafka_proxy import get_kafka_proxy
from adapters.protos import ponsim_pb2
-from adapters.protos.common_pb2 import OperStatus, ConnectStatus, AdminState
+from adapters.protos import third_party
+from adapters.protos.common_pb2 import OperStatus, ConnectStatus
+from adapters.protos.core_adapter_pb2 import SwitchCapability, PortCapability, \
+ InterAdapterMessageType, InterAdapterResponseBody
from adapters.protos.device_pb2 import Port, PmConfig, PmConfigs
from adapters.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
-from google.protobuf.empty_pb2 import Empty
-from adapters.protos.logical_device_pb2 import LogicalPort, LogicalDevice
+from adapters.protos.logical_device_pb2 import LogicalPort
from adapters.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, \
OFPPF_1GB_FD, \
OFPC_GROUP_STATS, OFPC_PORT_STATS, OFPC_TABLE_STATS, OFPC_FLOW_STATS, \
ofp_switch_features, ofp_desc
from adapters.protos.openflow_13_pb2 import ofp_port
from adapters.protos.ponsim_pb2 import FlowTable, PonSimFrame
-from adapters.protos.core_adapter_pb2 import SwitchCapability, PortCapability
-from adapters.common.utils.registry import registry
-from adapters.kafka.kafka_proxy import get_kafka_proxy
-from simplejson import dumps
-from google.protobuf.json_format import MessageToDict
-from google.protobuf.message import Message
-
_ = third_party
log = structlog.get_logger()
@@ -61,9 +58,11 @@
is_inband_frame = BpfProgramFilter('(ether[14:2] & 0xfff) = 0x{:03x}'.format(
PACKET_IN_VLAN))
+
def mac_str_to_tuple(mac):
return tuple(int(d, 16) for d in mac.split(':'))
+
class AdapterPmMetrics:
def __init__(self, device):
self.pm_names = {'tx_64_pkts', 'tx_65_127_pkts', 'tx_128_255_pkts',
@@ -188,8 +187,9 @@
class PonSimOltAdapter(OltAdapter):
- def __init__(self, adapter_agent, config):
- super(PonSimOltAdapter, self).__init__(adapter_agent=adapter_agent,
+ def __init__(self, core_proxy, adapter_proxy, config):
+ super(PonSimOltAdapter, self).__init__(core_proxy=core_proxy,
+ adapter_proxy=adapter_proxy,
config=config,
device_handler_class=PonSimOltHandler,
name='ponsim_olt',
@@ -206,11 +206,11 @@
handler.update_pm_config(device, pm_config)
-
class PonSimOltHandler(object):
def __init__(self, adapter, device_id):
self.adapter = adapter
- self.adapter_agent = adapter.adapter_agent
+ self.core_proxy = adapter.core_proxy
+ self.adapter_proxy = adapter.adapter_proxy
self.device_id = device_id
self.log = structlog.get_logger(device_id=device_id)
self.channel = None
@@ -227,11 +227,12 @@
def get_channel(self):
if self.channel is None:
try:
- device = yield self.adapter_agent.get_device(self.device_id)
- self.log.info('device-info', device=device, host_port=device.host_and_port)
+ device = yield self.core_proxy.get_device(self.device_id)
+ self.log.info('device-info', device=device,
+ host_port=device.host_and_port)
self.channel = grpc.insecure_channel(device.host_and_port)
except Exception as e:
- log.exception("ponsim-connection-failure", e=e)
+ log.exception("ponsim-connection-failure", e=e)
# returnValue(self.channel)
@@ -252,7 +253,8 @@
@inlineCallbacks
def _get_nni_port(self):
- ports = yield self.adapter_agent.get_ports(self.device_id, Port.ETHERNET_NNI)
+ ports = yield self.core_proxy.get_ports(self.device_id,
+ Port.ETHERNET_NNI)
returnValue(ports)
@inlineCallbacks
@@ -263,7 +265,7 @@
if not device.host_and_port:
device.oper_status = OperStatus.FAILED
device.reason = 'No host_and_port field provided'
- self.adapter_agent.device_update(device)
+ self.core_proxy.device_update(device)
return
yield self.get_channel()
@@ -277,14 +279,13 @@
device.model = 'n/a'
device.serial_number = device.host_and_port
device.mac_address = "AA:BB:CC:DD:EE:FF"
- # device.connect_status = ConnectStatus.REACHABLE
- yield self.adapter_agent.device_update(device)
+ yield self.core_proxy.device_update(device)
# Now set the initial PM configuration for this device
self.pm_metrics = AdapterPmMetrics(device)
pm_config = self.pm_metrics.make_proto()
log.info("initial-pm-config", pm_config=pm_config)
- self.adapter_agent.device_pm_config_update(pm_config, init=True)
+ self.core_proxy.device_pm_config_update(pm_config, init=True)
# Setup alarm handler
self.alarms = AdapterAlarms(self.adapter, device)
@@ -293,26 +294,26 @@
port_no=info.nni_port,
label='NNI facing Ethernet port',
type=Port.ETHERNET_NNI,
- # admin_state=AdminState.ENABLED,
oper_status=OperStatus.ACTIVE
)
self.nni_port = nni_port
- yield self.adapter_agent.port_created(device.id, nni_port)
- yield self.adapter_agent.port_created(device.id, Port(
+ yield self.core_proxy.port_created(device.id, nni_port)
+ yield self.core_proxy.port_created(device.id, Port(
port_no=1,
label='PON port',
type=Port.PON_OLT,
- # admin_state=AdminState.ENABLED,
oper_status=OperStatus.ACTIVE
))
- yield self.adapter_agent.device_state_update(device.id, connect_status=ConnectStatus.REACHABLE, oper_status=OperStatus.ACTIVE)
+ yield self.core_proxy.device_state_update(device.id,
+ connect_status=ConnectStatus.REACHABLE,
+ oper_status=OperStatus.ACTIVE)
# register ONUS
self.log.info('onu-found', onus=info.onus, len=len(info.onus))
for onu in info.onus:
vlan_id = onu.uni_port
- yield self.adapter_agent.child_device_detected(
+ yield self.core_proxy.child_device_detected(
parent_device_id=device.id,
parent_port_no=1,
child_device_type='ponsim_onu',
@@ -329,7 +330,6 @@
except Exception as e:
log.exception("Exception-activating", e=e)
-
def get_ofp_device_info(self, device):
return SwitchCapability(
desc=ofp_desc(
@@ -342,10 +342,10 @@
n_buffers=256, # TODO fake for now
n_tables=2, # TODO ditto
capabilities=( # TODO and ditto
- OFPC_FLOW_STATS
- | OFPC_TABLE_STATS
- | OFPC_PORT_STATS
- | OFPC_GROUP_STATS
+ OFPC_FLOW_STATS
+ | OFPC_TABLE_STATS
+ | OFPC_PORT_STATS
+ | OFPC_GROUP_STATS
)
)
)
@@ -354,16 +354,14 @@
# Since the adapter created the device port then it has the reference of the port to
# return the capability. TODO: Do a lookup on the NNI port number and return the
# appropriate attributes
- self.log.info('get_ofp_port_info', port_no=port_no, info=self.ofp_port_no, device_id=device.id)
+ self.log.info('get_ofp_port_info', port_no=port_no,
+ info=self.ofp_port_no, device_id=device.id)
cap = OFPPF_1GB_FD | OFPPF_FIBER
return PortCapability(
- port = LogicalPort (
- # id='nni',
+ port=LogicalPort(
ofp_port=ofp_port(
- # port_no=port_no,
hw_addr=mac_str_to_tuple(
- '00:00:00:00:00:%02x' % port_no),
- # name='nni',
+ '00:00:00:00:00:%02x' % port_no),
config=0,
state=OFPPS_LIVE,
curr=cap,
@@ -374,65 +372,12 @@
),
device_id=device.id,
device_port_no=port_no
- )
+ )
)
# TODO - change for core 2.0
def reconcile(self, device):
- self.log.info('reconciling-OLT-device-starts')
-
- if not device.host_and_port:
- device.oper_status = OperStatus.FAILED
- device.reason = 'No host_and_port field provided'
- self.adapter_agent.device_update(device)
- return
-
- try:
- stub = ponsim_pb2.PonSimStub(self.get_channel())
- info = stub.GetDeviceInfo(Empty())
- log.info('got-info', info=info)
- # TODO: Verify we are connected to the same device we are
- # reconciling - not much data in ponsim to differentiate at the
- # time
- device.oper_status = OperStatus.ACTIVE
- self.adapter_agent.device_update(device)
- self.ofp_port_no = info.nni_port
- self.nni_port = self._get_nni_port()
- except Exception, e:
- log.exception('device-unreachable', e=e)
- device.connect_status = ConnectStatus.UNREACHABLE
- device.oper_status = OperStatus.UNKNOWN
- self.adapter_agent.device_update(device)
- return
-
- # Now set the initial PM configuration for this device
- self.pm_metrics = AdapterPmMetrics(device)
- pm_config = self.pm_metrics.make_proto()
- log.info("initial-pm-config", pm_config=pm_config)
- self.adapter_agent.device_update_pm_config(pm_config, init=True)
-
- # Setup alarm handler
- self.alarms = AdapterAlarms(self.adapter, device)
-
- # TODO: Is there anything required to verify nni and PON ports
-
- # Set the logical device id
- device = self.adapter_agent.get_device(device.id)
- if device.parent_id:
- self.logical_device_id = device.parent_id
- self.adapter_agent.reconcile_logical_device(device.parent_id)
- else:
- self.log.info('no-logical-device-set')
-
- # Reconcile child devices
- self.adapter_agent.reconcile_child_devices(device.id)
-
- reactor.callInThread(self.rcv_grpc)
-
- # Start collecting stats from the device after a brief pause
- self.start_kpi_collection(device.id)
-
- self.log.info('reconciling-OLT-device-ends')
+ self.log.info('reconciling-OLT-device')
@inlineCallbacks
def rcv_grpc(self):
@@ -459,7 +404,6 @@
self.log.info('stopped-receiving-grpc-frames')
-
@inlineCallbacks
def update_flow_table(self, flows):
yield self.get_channel()
@@ -495,7 +439,33 @@
stub = ponsim_pb2.PonSimStub(self.get_channel())
self.log.info('pushing-onu-flow-table', port=msg.port)
res = stub.UpdateFlowTable(msg)
- self.adapter_agent.receive_proxied_message(proxy_address, res)
+ self.core_proxy.receive_proxied_message(proxy_address, res)
+
+ @inlineCallbacks
+ def process_inter_adapter_message(self, request):
+ self.log.info('process-inter-adapter-message', msg=request)
+ try:
+ if request.header.type == InterAdapterMessageType.FLOW_REQUEST:
+ f = FlowTable()
+ if request.body:
+ request.body.Unpack(f)
+ stub = ponsim_pb2.PonSimStub(self.channel)
+ self.log.info('pushing-onu-flow-table')
+ res = stub.UpdateFlowTable(f)
+ # Send response back
+ reply = InterAdapterResponseBody()
+ reply.success = True
+ self.log.info('sending-response-back', reply=reply)
+ yield self.adapter_proxy.send_inter_adapter_message(
+ msg=reply,
+ type=InterAdapterMessageType.FLOW_RESPONSE,
+ from_adapter=self.adapter.name,
+ to_adapter=request.header.from_topic,
+ to_device_id=request.header.to_device_id,
+ message_no=request.header.id
+ )
+ except Exception as e:
+ self.log.exception("error-processing-inter-adapter-message", e=e)
def packet_out(self, egress_port, msg):
self.log.info('sending-packet-out', egress_port=egress_port,
@@ -515,19 +485,20 @@
# send over grpc stream
stub = ponsim_pb2.PonSimStub(self.get_channel())
- frame = PonSimFrame(id=self.device_id, payload=str(out_pkt), out_port=out_port)
+ frame = PonSimFrame(id=self.device_id, payload=str(out_pkt),
+ out_port=out_port)
stub.SendFrame(frame)
-
@inlineCallbacks
def reboot(self):
self.log.info('rebooting', device_id=self.device_id)
- yield self.adapter_agent.device_state_update(self.device_id, connect_status=ConnectStatus.UNREACHABLE)
+ yield self.core_proxy.device_state_update(self.device_id,
+ connect_status=ConnectStatus.UNREACHABLE)
- # Update the child devices connect state to UNREACHABLE
- yield self.adapter_agent.children_state_update(self.device_id,
- connect_status=ConnectStatus.UNREACHABLE)
+ # Update the child devices connect state to UNREACHABLE
+ yield self.core_proxy.children_state_update(self.device_id,
+ connect_status=ConnectStatus.UNREACHABLE)
# Sleep 10 secs, simulating a reboot
# TODO: send alert and clear alert after the reboot
@@ -535,12 +506,12 @@
# Change the connection status back to REACHABLE. With a
# real OLT the connection state must be the actual state
- yield self.adapter_agent.device_state_update(self.device_id, connect_status=ConnectStatus.REACHABLE)
-
+ yield self.core_proxy.device_state_update(self.device_id,
+ connect_status=ConnectStatus.REACHABLE)
# Update the child devices connect state to REACHABLE
- yield self.adapter_agent.children_state_update(self.device_id,
- connect_status=ConnectStatus.REACHABLE)
+ yield self.core_proxy.children_state_update(self.device_id,
+ connect_status=ConnectStatus.REACHABLE)
self.log.info('rebooted', device_id=self.device_id)
@@ -553,7 +524,6 @@
log.info('self-test-device', device=device.id)
raise NotImplementedError()
-
@inlineCallbacks
def disable(self):
self.log.info('disabling', device_id=self.device_id)
@@ -561,7 +531,9 @@
self.stop_kpi_collection()
# Update the operational status to UNKNOWN and connection status to UNREACHABLE
- yield self.adapter_agent.device_state_update(self.device_id, oper_status=OperStatus.UNKNOWN, connect_status=ConnectStatus.UNREACHABLE)
+ yield self.core_proxy.device_state_update(self.device_id,
+ oper_status=OperStatus.UNKNOWN,
+ connect_status=ConnectStatus.UNREACHABLE)
self.close_channel()
self.log.info('disabled-grpc-channel')
@@ -590,21 +562,21 @@
self.nni_port = ports.items[0]
# Update the state of the NNI port
- yield self.adapter_agent.port_state_update(self.device_id,
- port_type=Port.ETHERNET_NNI,
- port_no=self.ofp_port_no,
- oper_status=OperStatus.ACTIVE)
+ yield self.core_proxy.port_state_update(self.device_id,
+ port_type=Port.ETHERNET_NNI,
+ port_no=self.ofp_port_no,
+ oper_status=OperStatus.ACTIVE)
# Update the state of the PON port
- yield self.adapter_agent.port_state_update(self.device_id,
- port_type=Port.PON_OLT,
- port_no=1,
- oper_status=OperStatus.ACTIVE)
+ yield self.core_proxy.port_state_update(self.device_id,
+ port_type=Port.PON_OLT,
+ port_no=1,
+ oper_status=OperStatus.ACTIVE)
# Set the operational state of the device to ACTIVE and connect status to REACHABLE
- yield self.adapter_agent.device_state_update(self.device_id,
- connect_status=ConnectStatus.REACHABLE,
- oper_status=OperStatus.ACTIVE)
+ yield self.core_proxy.device_state_update(self.device_id,
+ connect_status=ConnectStatus.REACHABLE,
+ oper_status=OperStatus.ACTIVE)
# TODO: establish frame grpc-stream
# yield reactor.callInThread(self.rcv_grpc)
diff --git a/adapters/ponsim_onu/__init__.py b/adapters/ponsim_onu/__init__.py
index b0fb0b2..4a82628 100644
--- a/adapters/ponsim_onu/__init__.py
+++ b/adapters/ponsim_onu/__init__.py
@@ -1,4 +1,4 @@
-# Copyright 2017-present Open Networking Foundation
+# Copyright 2018-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
diff --git a/adapters/ponsim_onu/main.py b/adapters/ponsim_onu/main.py
index 63e2bc4..f4c57f4 100755
--- a/adapters/ponsim_onu/main.py
+++ b/adapters/ponsim_onu/main.py
@@ -40,6 +40,9 @@
from adapters.protos.adapter_pb2 import AdapterConfig, Adapter
from adapters.kafka.adapter_request_facade import AdapterRequestFacade
from adapters.kafka.core_proxy import CoreProxy
+
+from adapters.kafka.adapter_proxy import AdapterProxy
+
from adapters.common.utils.deferred_utils import TimeOutError
from adapters.common.utils.asleep import asleep
@@ -357,8 +360,13 @@
core_topic=self.core_topic,
my_listening_topic=self.listening_topic)
+ self.adapter_proxy = AdapterProxy(
+ kafka_proxy=None,
+ core_topic=self.core_topic,
+ my_listening_topic=self.listening_topic)
+
ponsim_onu_adapter = PonSimOnuAdapter(
- adapter_agent=self.core_proxy, config=config)
+ core_proxy=self.core_proxy, adapter_proxy=self.adapter_proxy, config=config)
ponsim_request_handler = AdapterRequestFacade(
adapter=ponsim_onu_adapter)
@@ -374,6 +382,7 @@
).start()
self.core_proxy.kafka_proxy = get_messaging_proxy()
+ self.adapter_proxy.kafka_proxy = get_messaging_proxy()
# retry for ever
res = yield self._register_with_core(-1)
diff --git a/adapters/ponsim_onu/ponsim_onu.py b/adapters/ponsim_onu/ponsim_onu.py
index 19775cb..dfac1d3 100644
--- a/adapters/ponsim_onu/ponsim_onu.py
+++ b/adapters/ponsim_onu/ponsim_onu.py
@@ -15,24 +15,24 @@
#
"""
-Fully simulated OLT/ONU adapter.
+Represents an ONU device
"""
-import sys
import structlog
from twisted.internet.defer import DeferredQueue, inlineCallbacks, returnValue
-from adapters.common.utils.asleep import asleep
+from adapters.common.utils.asleep import asleep
from adapters.iadapter import OnuAdapter
from adapters.protos import third_party
from adapters.protos.common_pb2 import OperStatus, ConnectStatus, AdminState
+from adapters.protos.core_adapter_pb2 import PortCapability, \
+ InterAdapterMessageType, InterAdapterResponseBody
from adapters.protos.device_pb2 import Port
from adapters.protos.logical_device_pb2 import LogicalPort
from adapters.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, \
OFPPF_1GB_FD
from adapters.protos.openflow_13_pb2 import ofp_port
from adapters.protos.ponsim_pb2 import FlowTable
-from adapters.protos.core_adapter_pb2 import PortCapability
_ = third_party
log = structlog.get_logger()
@@ -41,11 +41,13 @@
def mac_str_to_tuple(mac):
return tuple(int(d, 16) for d in mac.split(':'))
+
class PonSimOnuAdapter(OnuAdapter):
- def __init__(self, adapter_agent, config):
+ def __init__(self, core_proxy, adapter_proxy, config):
# DeviceType of ONU should be same as VENDOR ID of ONU Serial Number as specified by standard
# requires for identifying correct adapter or ranged ONU
- super(PonSimOnuAdapter, self).__init__(adapter_agent=adapter_agent,
+ super(PonSimOnuAdapter, self).__init__(core_proxy=core_proxy,
+ adapter_proxy=adapter_proxy,
config=config,
device_handler_class=PonSimOnuHandler,
name='ponsim_onu',
@@ -60,8 +62,10 @@
class PonSimOnuHandler(object):
def __init__(self, adapter, device_id):
self.adapter = adapter
- self.adapter_agent = adapter.adapter_agent
+ self.core_proxy = adapter.core_proxy
+ self.adapter_proxy = adapter.adapter_proxy
self.device_id = device_id
+ self.device_parent_id = None
self.log = structlog.get_logger(device_id=device_id)
self.incoming_messages = DeferredQueue()
self.proxy_address = None
@@ -73,19 +77,18 @@
def receive_message(self, msg):
self.incoming_messages.put(msg)
-
@inlineCallbacks
def activate(self, device):
self.log.info('activating')
- # TODO: Register for proxy address
+ self.device_parent_id = device.parent_id
+ self.proxy_address = device.proxy_address
# populate device info
device.root = False
device.vendor = 'ponsim'
device.model = 'n/a'
- # device.connect_status = ConnectStatus.REACHABLE
- yield self.adapter_agent.device_update(device)
+ yield self.core_proxy.device_update(device)
# register physical ports
self.uni_port = Port(
@@ -108,11 +111,12 @@
)
]
)
- self.adapter_agent.port_created(device.id, self.uni_port)
- self.adapter_agent.port_created(device.id, self.pon_port)
+ self.core_proxy.port_created(device.id, self.uni_port)
+ self.core_proxy.port_created(device.id, self.pon_port)
- yield self.adapter_agent.device_state_update(device.id, connect_status=ConnectStatus.REACHABLE, oper_status=OperStatus.ACTIVE)
-
+ yield self.core_proxy.device_state_update(device.id,
+ connect_status=ConnectStatus.REACHABLE,
+ oper_status=OperStatus.ACTIVE)
# TODO: Return only port specific info
def get_ofp_port_info(self, device, port_no):
@@ -120,15 +124,11 @@
# return the capability. TODO: Do a lookup on the UNI port number and return the
# appropriate attributes
self.log.info('get_ofp_port_info', port_no=port_no, device_id=device.id)
- # port_no = device.proxy_address.channel_id
cap = OFPPF_1GB_FD | OFPPF_FIBER
return PortCapability(
- port = LogicalPort (
- # id='uni-{}'.format(port_no),
+ port=LogicalPort(
ofp_port=ofp_port(
- # port_no=port_no,
hw_addr=mac_str_to_tuple('00:00:00:00:00:%02x' % port_no),
- # name='uni-{}'.format(port_no),
config=0,
state=OFPPS_LIVE,
curr=cap,
@@ -144,56 +144,55 @@
@inlineCallbacks
def _get_uni_port(self):
- ports = yield self.adapter_agent.get_ports(self.device_id, Port.ETHERNET_UNI)
+ ports = yield self.core_proxy.get_ports(self.device_id,
+ Port.ETHERNET_UNI)
returnValue(ports)
@inlineCallbacks
def _get_pon_port(self):
- ports = yield self.adapter_agent.get_ports(self.device_id, Port.PON_ONU)
+ ports = yield self.core_proxy.get_ports(self.device_id, Port.PON_ONU)
returnValue(ports)
def reconcile(self, device):
self.log.info('reconciling-ONU-device-starts')
-
- # first we verify that we got parent reference and proxy info
- assert device.parent_id
- assert device.proxy_address.device_id
- assert device.proxy_address.channel_id
-
- # register for proxied messages right away
- self.proxy_address = device.proxy_address
- self.adapter_agent.register_for_proxied_messages(device.proxy_address)
-
- # Set the connection status to REACHABLE
- device.connect_status = ConnectStatus.REACHABLE
- self.adapter_agent.update_device(device)
-
- # TODO: Verify that the uni, pon and logical ports exists
-
- # Mark the device as REACHABLE and ACTIVE
- device = self.adapter_agent.get_device(device.id)
- device.connect_status = ConnectStatus.REACHABLE
- device.oper_status = OperStatus.ACTIVE
- self.adapter_agent.update_device(device)
-
- self.log.info('reconciling-ONU-device-ends')
+ # TODO: complete code
@inlineCallbacks
def update_flow_table(self, flows):
+ try:
+ self.log.info('update_flow_table', flows=flows)
+ # we need to proxy through the OLT to get to the ONU
- # we need to proxy through the OLT to get to the ONU
+ # reset response queue
+ while self.incoming_messages.pending:
+ yield self.incoming_messages.get()
- # reset response queue
- while self.incoming_messages.pending:
- yield self.incoming_messages.get()
+ fb = FlowTable(
+ port=self.proxy_address.channel_id,
+ flows=flows
+ )
+ # Sends the request via proxy and wait for an ACK
+ yield self.adapter_proxy.send_inter_adapter_message(
+ msg=fb,
+ type=InterAdapterMessageType.FLOW_REQUEST,
+ from_adapter=self.adapter.name,
+ to_adapter=self.proxy_address.device_type,
+ to_device_id=self.device_id,
+ proxy_device_id=self.proxy_address.device_id
+ )
+ # Wait for the full response from the proxied adapter
+ res = yield self.incoming_messages.get()
+ self.log.info('response-received', result=res)
+ except Exception as e:
+ self.log.exception("update-flow-error", e=e)
- msg = FlowTable(
- port=self.proxy_address.channel_id,
- flows=flows
- )
- self.adapter_agent.send_proxied_message(self.proxy_address, msg)
-
- yield self.incoming_messages.get()
+ def process_inter_adapter_message(self, msg):
+ self.log.info('process-inter-adapter-message', msg=msg)
+ if msg.header.type == InterAdapterMessageType.FLOW_RESPONSE:
+ body = InterAdapterResponseBody()
+ msg.body.Unpack(body)
+ self.log.info('received-response', status=body.success)
+ self.receive_message(msg)
def remove_from_flow_table(self, flows):
self.log.debug('remove-from-flow-table', flows=flows)
@@ -212,7 +211,8 @@
self.log.info('rebooting', device_id=self.device_id)
# Update the connect status to UNREACHABLE
- yield self.adapter_agent.device_state_update(self.device_id, connect_status=ConnectStatus.UNREACHABLE)
+ yield self.core_proxy.device_state_update(self.device_id,
+ connect_status=ConnectStatus.UNREACHABLE)
# Sleep 10 secs, simulating a reboot
# TODO: send alert and clear alert after the reboot
@@ -220,7 +220,8 @@
# Change the connection status back to REACHABLE. With a
# real ONU the connection state must be the actual state
- yield self.adapter_agent.device_state_update(self.device_id, connect_status=ConnectStatus.REACHABLE)
+ yield self.core_proxy.device_state_update(self.device_id,
+ connect_status=ConnectStatus.REACHABLE)
self.log.info('rebooted', device_id=self.device_id)
@@ -233,18 +234,18 @@
log.info('self-test-device', device=device.id)
raise NotImplementedError()
-
@inlineCallbacks
def disable(self):
self.log.info('disabling', device_id=self.device_id)
# Update the device operational status to UNKNOWN
- yield self.adapter_agent.device_state_update(self.device_id, oper_status=OperStatus.UNKNOWN, connect_status=ConnectStatus.UNREACHABLE)
+ yield self.core_proxy.device_state_update(self.device_id,
+ oper_status=OperStatus.UNKNOWN,
+ connect_status=ConnectStatus.UNREACHABLE)
# TODO:
# 1) Remove all flows from the device
# 2) Remove the device from ponsim
-
self.log.info('disabled', device_id=self.device_id)
@inlineCallbacks
@@ -264,18 +265,20 @@
self.pon_port = ports.items[0]
# Update the state of the UNI port
- yield self.adapter_agent.port_state_update(self.device_id,
- port_type=Port.ETHERNET_UNI,
- port_no=self.uni_port.port_no,
- oper_status=OperStatus.ACTIVE)
+ yield self.core_proxy.port_state_update(self.device_id,
+ port_type=Port.ETHERNET_UNI,
+ port_no=self.uni_port.port_no,
+ oper_status=OperStatus.ACTIVE)
# Update the state of the PON port
- yield self.adapter_agent.port_state_update(self.device_id,
- port_type=Port.PON_ONU,
- port_no=self.pon_port.port_no,
- oper_status=OperStatus.ACTIVE)
+ yield self.core_proxy.port_state_update(self.device_id,
+ port_type=Port.PON_ONU,
+ port_no=self.pon_port.port_no,
+ oper_status=OperStatus.ACTIVE)
- yield self.adapter_agent.device_state_update(self.device_id, oper_status=OperStatus.ACTIVE, connect_status=ConnectStatus.REACHABLE)
+ yield self.core_proxy.device_state_update(self.device_id,
+ oper_status=OperStatus.ACTIVE,
+ connect_status=ConnectStatus.REACHABLE)
self.log.info('re-enabled', device_id=self.device_id)
except Exception, e:
diff --git a/protos/core_adapter.proto b/protos/core_adapter.proto
index d828194..8b52a56 100644
--- a/protos/core_adapter.proto
+++ b/protos/core_adapter.proto
@@ -73,4 +73,35 @@
message PortCapability {
LogicalPort port = 1;
+}
+
+message InterAdapterMessageType {
+ enum Types {
+ FLOW_REQUEST = 0;
+ FLOW_RESPONSE = 1;
+ OMCI_REQUEST = 2;
+ OMCI_RESPONSE = 3;
+ METRICS_REQUEST = 4;
+ METRICS_RESPONSE = 5;
+ }
+}
+
+message InterAdapterHeader {
+ string id = 1;
+ InterAdapterMessageType.Types type = 2;
+ string from_topic = 3;
+ string to_topic = 4;
+ string to_device_id = 5;
+ string proxy_device_id = 6;
+ int64 timestamp = 7;
+}
+
+message InterAdapterResponseBody {
+ bool success = 1;
+ google.protobuf.Any result = 2;
+}
+
+message InterAdapterMessage {
+ InterAdapterHeader header = 1;
+ google.protobuf.Any body = 2;
}
\ No newline at end of file
diff --git a/protos/device.proto b/protos/device.proto
index 9c2d98c..61056e6 100644
--- a/protos/device.proto
+++ b/protos/device.proto
@@ -243,11 +243,12 @@
message ProxyAddress {
string device_id = 1; // Which device to use as proxy to this device
- uint32 channel_id = 2; // Sub-address within proxy
- uint32 channel_group_id = 5; // Channel Group index
- string channel_termination = 6; // Channel Termination name
- uint32 onu_id = 3; // onu identifier; optional
- uint32 onu_session_id = 4; // session identifier for the ONU; optional
+ string device_type = 2; // The device type of the proxy device to use as the adapter name
+ uint32 channel_id = 3; // Sub-address within proxy
+ uint32 channel_group_id = 4; // Channel Group index
+ string channel_termination = 5; // Channel Termination name
+ uint32 onu_id = 6; // onu identifier; optional
+ uint32 onu_session_id = 7; // session identifier for the ONU; optional
};
// Device contact MAC address (format: "xx:xx:xx:xx:xx:xx")
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 936e945..b2ab478 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -323,7 +323,15 @@
childDevice.ParentId = parentDeviceId
childDevice.ParentPortNo = uint32(parentPortNo)
childDevice.Root = false
- childDevice.ProxyAddress = &voltha.Device_ProxyAddress{ChannelId: uint32(channelId)}
+
+ //Get parent device type
+ parent, err := dMgr.GetDevice(parentDeviceId)
+ if err != nil {
+ log.Error("no-parent-found", log.Fields{"parentId":parentDeviceId})
+ return status.Errorf(codes.NotFound, "%s", parentDeviceId)
+ }
+
+ childDevice.ProxyAddress = &voltha.Device_ProxyAddress{DeviceId: parentDeviceId, DeviceType: parent.Type, ChannelId: uint32(channelId)}
// Create and start a device agent for that device
agent := newDeviceAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.clusterDataProxy)
@@ -386,7 +394,7 @@
// Get the logical port associated with this device
var lPortId *voltha.LogicalPortId
if lPortId, err = dMgr.logicalDeviceMgr.getLogicalPortId(device); err != nil {
- log.Warnw("getLogical-port-error", log.Fields{"deviceId": device.Id})
+ log.Warnw("getLogical-port-error", log.Fields{"deviceId": device.Id, "error": err})
return err
}
if err = dMgr.logicalDeviceMgr.deleteLogicalPort(nil, lPortId); err != nil {