[VOL-1034, VOL-1035, VOL-1037] This commit consists of:
1) Implementation of inter-adapter communication using flows
as proxy message between an ONU and its parent OLT.
2) Update the protos to reflect the inter-adapter message structure
3) Cleanup the ponsim adapters to removed unsued references and
general cleanup.
Change-Id: Ibe913a80a96d601fed946d9b9db55bb8d4f2c15a
diff --git a/adapters/ponsim_olt/__init__.py b/adapters/ponsim_olt/__init__.py
index b0fb0b2..4a82628 100644
--- a/adapters/ponsim_olt/__init__.py
+++ b/adapters/ponsim_olt/__init__.py
@@ -1,4 +1,4 @@
-# Copyright 2017-present Open Networking Foundation
+# Copyright 2018-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
diff --git a/adapters/ponsim_olt/main.py b/adapters/ponsim_olt/main.py
index 53745ee..c9ad9d0 100755
--- a/adapters/ponsim_olt/main.py
+++ b/adapters/ponsim_olt/main.py
@@ -18,30 +18,33 @@
"""Ponsim OLT Adapter main entry point"""
import argparse
-import arrow
import os
import time
+import arrow
import yaml
+from packaging.version import Version
from simplejson import dumps
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.task import LoopingCall
from zope.interface import implementer
-from adapters.protos import third_party
+
from adapters.common.structlog_setup import setup_logging, update_logging
+from adapters.common.utils.asleep import asleep
+from adapters.common.utils.deferred_utils import TimeOutError
from adapters.common.utils.dockerhelpers import get_my_containers_name
from adapters.common.utils.nethelpers import get_my_primary_local_ipv4, \
get_my_primary_interface
-from adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
from adapters.common.utils.registry import registry, IComponent
-from packaging.version import Version
-from adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, get_messaging_proxy
-from adapters.ponsim_olt.ponsim_olt import PonSimOltAdapter
-from adapters.protos.adapter_pb2 import AdapterConfig, Adapter
+from adapters.kafka.adapter_proxy import AdapterProxy
from adapters.kafka.adapter_request_facade import AdapterRequestFacade
from adapters.kafka.core_proxy import CoreProxy
-from adapters.common.utils.deferred_utils import TimeOutError
-from adapters.common.utils.asleep import asleep
+from adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
+ get_messaging_proxy
+from adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
+from adapters.ponsim_olt.ponsim_olt import PonSimOltAdapter
+from adapters.protos import third_party
+from adapters.protos.adapter_pb2 import AdapterConfig, Adapter
_ = third_party
@@ -358,8 +361,13 @@
core_topic=self.core_topic,
my_listening_topic=self.listening_topic)
+ self.adapter_proxy = AdapterProxy(
+ kafka_proxy=None,
+ core_topic=self.core_topic,
+ my_listening_topic=self.listening_topic)
+
ponsim_olt_adapter = PonSimOltAdapter(
- adapter_agent=self.core_proxy, config=config)
+ core_proxy=self.core_proxy, adapter_proxy=self.adapter_proxy, config=config)
ponsim_request_handler = AdapterRequestFacade(
adapter=ponsim_olt_adapter)
@@ -376,6 +384,7 @@
).start()
self.core_proxy.kafka_proxy = get_messaging_proxy()
+ self.adapter_proxy.kafka_proxy = get_messaging_proxy()
# retry for ever
res = yield self._register_with_core(-1)
diff --git a/adapters/ponsim_olt/ponsim_olt.py b/adapters/ponsim_olt/ponsim_olt.py
index 1806a33..88c6b4d 100644
--- a/adapters/ponsim_olt/ponsim_olt.py
+++ b/adapters/ponsim_olt/ponsim_olt.py
@@ -17,42 +17,39 @@
"""
Fully simulated OLT adapter.
"""
-from uuid import uuid4
import arrow
-import adapters.common.openflow.utils as fd
import grpc
import structlog
+from google.protobuf.empty_pb2 import Empty
+from google.protobuf.json_format import MessageToDict
+from google.protobuf.message import Message
+from grpc._channel import _Rendezvous
from scapy.layers.l2 import Ether, Dot1Q
+from simplejson import dumps
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue
-from grpc._channel import _Rendezvous
+from twisted.internet.task import LoopingCall
from adapters.common.frameio.frameio import BpfProgramFilter, hexify
from adapters.common.utils.asleep import asleep
-from twisted.internet.task import LoopingCall
+from adapters.common.utils.registry import registry
from adapters.iadapter import OltAdapter
-from adapters.protos import third_party
-from adapters.protos import openflow_13_pb2 as ofp
+from adapters.kafka.kafka_proxy import get_kafka_proxy
from adapters.protos import ponsim_pb2
-from adapters.protos.common_pb2 import OperStatus, ConnectStatus, AdminState
+from adapters.protos import third_party
+from adapters.protos.common_pb2 import OperStatus, ConnectStatus
+from adapters.protos.core_adapter_pb2 import SwitchCapability, PortCapability, \
+ InterAdapterMessageType, InterAdapterResponseBody
from adapters.protos.device_pb2 import Port, PmConfig, PmConfigs
from adapters.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
-from google.protobuf.empty_pb2 import Empty
-from adapters.protos.logical_device_pb2 import LogicalPort, LogicalDevice
+from adapters.protos.logical_device_pb2 import LogicalPort
from adapters.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, \
OFPPF_1GB_FD, \
OFPC_GROUP_STATS, OFPC_PORT_STATS, OFPC_TABLE_STATS, OFPC_FLOW_STATS, \
ofp_switch_features, ofp_desc
from adapters.protos.openflow_13_pb2 import ofp_port
from adapters.protos.ponsim_pb2 import FlowTable, PonSimFrame
-from adapters.protos.core_adapter_pb2 import SwitchCapability, PortCapability
-from adapters.common.utils.registry import registry
-from adapters.kafka.kafka_proxy import get_kafka_proxy
-from simplejson import dumps
-from google.protobuf.json_format import MessageToDict
-from google.protobuf.message import Message
-
_ = third_party
log = structlog.get_logger()
@@ -61,9 +58,11 @@
is_inband_frame = BpfProgramFilter('(ether[14:2] & 0xfff) = 0x{:03x}'.format(
PACKET_IN_VLAN))
+
def mac_str_to_tuple(mac):
return tuple(int(d, 16) for d in mac.split(':'))
+
class AdapterPmMetrics:
def __init__(self, device):
self.pm_names = {'tx_64_pkts', 'tx_65_127_pkts', 'tx_128_255_pkts',
@@ -188,8 +187,9 @@
class PonSimOltAdapter(OltAdapter):
- def __init__(self, adapter_agent, config):
- super(PonSimOltAdapter, self).__init__(adapter_agent=adapter_agent,
+ def __init__(self, core_proxy, adapter_proxy, config):
+ super(PonSimOltAdapter, self).__init__(core_proxy=core_proxy,
+ adapter_proxy=adapter_proxy,
config=config,
device_handler_class=PonSimOltHandler,
name='ponsim_olt',
@@ -206,11 +206,11 @@
handler.update_pm_config(device, pm_config)
-
class PonSimOltHandler(object):
def __init__(self, adapter, device_id):
self.adapter = adapter
- self.adapter_agent = adapter.adapter_agent
+ self.core_proxy = adapter.core_proxy
+ self.adapter_proxy = adapter.adapter_proxy
self.device_id = device_id
self.log = structlog.get_logger(device_id=device_id)
self.channel = None
@@ -227,11 +227,12 @@
def get_channel(self):
if self.channel is None:
try:
- device = yield self.adapter_agent.get_device(self.device_id)
- self.log.info('device-info', device=device, host_port=device.host_and_port)
+ device = yield self.core_proxy.get_device(self.device_id)
+ self.log.info('device-info', device=device,
+ host_port=device.host_and_port)
self.channel = grpc.insecure_channel(device.host_and_port)
except Exception as e:
- log.exception("ponsim-connection-failure", e=e)
+ log.exception("ponsim-connection-failure", e=e)
# returnValue(self.channel)
@@ -252,7 +253,8 @@
@inlineCallbacks
def _get_nni_port(self):
- ports = yield self.adapter_agent.get_ports(self.device_id, Port.ETHERNET_NNI)
+ ports = yield self.core_proxy.get_ports(self.device_id,
+ Port.ETHERNET_NNI)
returnValue(ports)
@inlineCallbacks
@@ -263,7 +265,7 @@
if not device.host_and_port:
device.oper_status = OperStatus.FAILED
device.reason = 'No host_and_port field provided'
- self.adapter_agent.device_update(device)
+ self.core_proxy.device_update(device)
return
yield self.get_channel()
@@ -277,14 +279,13 @@
device.model = 'n/a'
device.serial_number = device.host_and_port
device.mac_address = "AA:BB:CC:DD:EE:FF"
- # device.connect_status = ConnectStatus.REACHABLE
- yield self.adapter_agent.device_update(device)
+ yield self.core_proxy.device_update(device)
# Now set the initial PM configuration for this device
self.pm_metrics = AdapterPmMetrics(device)
pm_config = self.pm_metrics.make_proto()
log.info("initial-pm-config", pm_config=pm_config)
- self.adapter_agent.device_pm_config_update(pm_config, init=True)
+ self.core_proxy.device_pm_config_update(pm_config, init=True)
# Setup alarm handler
self.alarms = AdapterAlarms(self.adapter, device)
@@ -293,26 +294,26 @@
port_no=info.nni_port,
label='NNI facing Ethernet port',
type=Port.ETHERNET_NNI,
- # admin_state=AdminState.ENABLED,
oper_status=OperStatus.ACTIVE
)
self.nni_port = nni_port
- yield self.adapter_agent.port_created(device.id, nni_port)
- yield self.adapter_agent.port_created(device.id, Port(
+ yield self.core_proxy.port_created(device.id, nni_port)
+ yield self.core_proxy.port_created(device.id, Port(
port_no=1,
label='PON port',
type=Port.PON_OLT,
- # admin_state=AdminState.ENABLED,
oper_status=OperStatus.ACTIVE
))
- yield self.adapter_agent.device_state_update(device.id, connect_status=ConnectStatus.REACHABLE, oper_status=OperStatus.ACTIVE)
+ yield self.core_proxy.device_state_update(device.id,
+ connect_status=ConnectStatus.REACHABLE,
+ oper_status=OperStatus.ACTIVE)
# register ONUS
self.log.info('onu-found', onus=info.onus, len=len(info.onus))
for onu in info.onus:
vlan_id = onu.uni_port
- yield self.adapter_agent.child_device_detected(
+ yield self.core_proxy.child_device_detected(
parent_device_id=device.id,
parent_port_no=1,
child_device_type='ponsim_onu',
@@ -329,7 +330,6 @@
except Exception as e:
log.exception("Exception-activating", e=e)
-
def get_ofp_device_info(self, device):
return SwitchCapability(
desc=ofp_desc(
@@ -342,10 +342,10 @@
n_buffers=256, # TODO fake for now
n_tables=2, # TODO ditto
capabilities=( # TODO and ditto
- OFPC_FLOW_STATS
- | OFPC_TABLE_STATS
- | OFPC_PORT_STATS
- | OFPC_GROUP_STATS
+ OFPC_FLOW_STATS
+ | OFPC_TABLE_STATS
+ | OFPC_PORT_STATS
+ | OFPC_GROUP_STATS
)
)
)
@@ -354,16 +354,14 @@
# Since the adapter created the device port then it has the reference of the port to
# return the capability. TODO: Do a lookup on the NNI port number and return the
# appropriate attributes
- self.log.info('get_ofp_port_info', port_no=port_no, info=self.ofp_port_no, device_id=device.id)
+ self.log.info('get_ofp_port_info', port_no=port_no,
+ info=self.ofp_port_no, device_id=device.id)
cap = OFPPF_1GB_FD | OFPPF_FIBER
return PortCapability(
- port = LogicalPort (
- # id='nni',
+ port=LogicalPort(
ofp_port=ofp_port(
- # port_no=port_no,
hw_addr=mac_str_to_tuple(
- '00:00:00:00:00:%02x' % port_no),
- # name='nni',
+ '00:00:00:00:00:%02x' % port_no),
config=0,
state=OFPPS_LIVE,
curr=cap,
@@ -374,65 +372,12 @@
),
device_id=device.id,
device_port_no=port_no
- )
+ )
)
# TODO - change for core 2.0
def reconcile(self, device):
- self.log.info('reconciling-OLT-device-starts')
-
- if not device.host_and_port:
- device.oper_status = OperStatus.FAILED
- device.reason = 'No host_and_port field provided'
- self.adapter_agent.device_update(device)
- return
-
- try:
- stub = ponsim_pb2.PonSimStub(self.get_channel())
- info = stub.GetDeviceInfo(Empty())
- log.info('got-info', info=info)
- # TODO: Verify we are connected to the same device we are
- # reconciling - not much data in ponsim to differentiate at the
- # time
- device.oper_status = OperStatus.ACTIVE
- self.adapter_agent.device_update(device)
- self.ofp_port_no = info.nni_port
- self.nni_port = self._get_nni_port()
- except Exception, e:
- log.exception('device-unreachable', e=e)
- device.connect_status = ConnectStatus.UNREACHABLE
- device.oper_status = OperStatus.UNKNOWN
- self.adapter_agent.device_update(device)
- return
-
- # Now set the initial PM configuration for this device
- self.pm_metrics = AdapterPmMetrics(device)
- pm_config = self.pm_metrics.make_proto()
- log.info("initial-pm-config", pm_config=pm_config)
- self.adapter_agent.device_update_pm_config(pm_config, init=True)
-
- # Setup alarm handler
- self.alarms = AdapterAlarms(self.adapter, device)
-
- # TODO: Is there anything required to verify nni and PON ports
-
- # Set the logical device id
- device = self.adapter_agent.get_device(device.id)
- if device.parent_id:
- self.logical_device_id = device.parent_id
- self.adapter_agent.reconcile_logical_device(device.parent_id)
- else:
- self.log.info('no-logical-device-set')
-
- # Reconcile child devices
- self.adapter_agent.reconcile_child_devices(device.id)
-
- reactor.callInThread(self.rcv_grpc)
-
- # Start collecting stats from the device after a brief pause
- self.start_kpi_collection(device.id)
-
- self.log.info('reconciling-OLT-device-ends')
+ self.log.info('reconciling-OLT-device')
@inlineCallbacks
def rcv_grpc(self):
@@ -459,7 +404,6 @@
self.log.info('stopped-receiving-grpc-frames')
-
@inlineCallbacks
def update_flow_table(self, flows):
yield self.get_channel()
@@ -495,7 +439,33 @@
stub = ponsim_pb2.PonSimStub(self.get_channel())
self.log.info('pushing-onu-flow-table', port=msg.port)
res = stub.UpdateFlowTable(msg)
- self.adapter_agent.receive_proxied_message(proxy_address, res)
+ self.core_proxy.receive_proxied_message(proxy_address, res)
+
+ @inlineCallbacks
+ def process_inter_adapter_message(self, request):
+ self.log.info('process-inter-adapter-message', msg=request)
+ try:
+ if request.header.type == InterAdapterMessageType.FLOW_REQUEST:
+ f = FlowTable()
+ if request.body:
+ request.body.Unpack(f)
+ stub = ponsim_pb2.PonSimStub(self.channel)
+ self.log.info('pushing-onu-flow-table')
+ res = stub.UpdateFlowTable(f)
+ # Send response back
+ reply = InterAdapterResponseBody()
+ reply.success = True
+ self.log.info('sending-response-back', reply=reply)
+ yield self.adapter_proxy.send_inter_adapter_message(
+ msg=reply,
+ type=InterAdapterMessageType.FLOW_RESPONSE,
+ from_adapter=self.adapter.name,
+ to_adapter=request.header.from_topic,
+ to_device_id=request.header.to_device_id,
+ message_no=request.header.id
+ )
+ except Exception as e:
+ self.log.exception("error-processing-inter-adapter-message", e=e)
def packet_out(self, egress_port, msg):
self.log.info('sending-packet-out', egress_port=egress_port,
@@ -515,19 +485,20 @@
# send over grpc stream
stub = ponsim_pb2.PonSimStub(self.get_channel())
- frame = PonSimFrame(id=self.device_id, payload=str(out_pkt), out_port=out_port)
+ frame = PonSimFrame(id=self.device_id, payload=str(out_pkt),
+ out_port=out_port)
stub.SendFrame(frame)
-
@inlineCallbacks
def reboot(self):
self.log.info('rebooting', device_id=self.device_id)
- yield self.adapter_agent.device_state_update(self.device_id, connect_status=ConnectStatus.UNREACHABLE)
+ yield self.core_proxy.device_state_update(self.device_id,
+ connect_status=ConnectStatus.UNREACHABLE)
- # Update the child devices connect state to UNREACHABLE
- yield self.adapter_agent.children_state_update(self.device_id,
- connect_status=ConnectStatus.UNREACHABLE)
+ # Update the child devices connect state to UNREACHABLE
+ yield self.core_proxy.children_state_update(self.device_id,
+ connect_status=ConnectStatus.UNREACHABLE)
# Sleep 10 secs, simulating a reboot
# TODO: send alert and clear alert after the reboot
@@ -535,12 +506,12 @@
# Change the connection status back to REACHABLE. With a
# real OLT the connection state must be the actual state
- yield self.adapter_agent.device_state_update(self.device_id, connect_status=ConnectStatus.REACHABLE)
-
+ yield self.core_proxy.device_state_update(self.device_id,
+ connect_status=ConnectStatus.REACHABLE)
# Update the child devices connect state to REACHABLE
- yield self.adapter_agent.children_state_update(self.device_id,
- connect_status=ConnectStatus.REACHABLE)
+ yield self.core_proxy.children_state_update(self.device_id,
+ connect_status=ConnectStatus.REACHABLE)
self.log.info('rebooted', device_id=self.device_id)
@@ -553,7 +524,6 @@
log.info('self-test-device', device=device.id)
raise NotImplementedError()
-
@inlineCallbacks
def disable(self):
self.log.info('disabling', device_id=self.device_id)
@@ -561,7 +531,9 @@
self.stop_kpi_collection()
# Update the operational status to UNKNOWN and connection status to UNREACHABLE
- yield self.adapter_agent.device_state_update(self.device_id, oper_status=OperStatus.UNKNOWN, connect_status=ConnectStatus.UNREACHABLE)
+ yield self.core_proxy.device_state_update(self.device_id,
+ oper_status=OperStatus.UNKNOWN,
+ connect_status=ConnectStatus.UNREACHABLE)
self.close_channel()
self.log.info('disabled-grpc-channel')
@@ -590,21 +562,21 @@
self.nni_port = ports.items[0]
# Update the state of the NNI port
- yield self.adapter_agent.port_state_update(self.device_id,
- port_type=Port.ETHERNET_NNI,
- port_no=self.ofp_port_no,
- oper_status=OperStatus.ACTIVE)
+ yield self.core_proxy.port_state_update(self.device_id,
+ port_type=Port.ETHERNET_NNI,
+ port_no=self.ofp_port_no,
+ oper_status=OperStatus.ACTIVE)
# Update the state of the PON port
- yield self.adapter_agent.port_state_update(self.device_id,
- port_type=Port.PON_OLT,
- port_no=1,
- oper_status=OperStatus.ACTIVE)
+ yield self.core_proxy.port_state_update(self.device_id,
+ port_type=Port.PON_OLT,
+ port_no=1,
+ oper_status=OperStatus.ACTIVE)
# Set the operational state of the device to ACTIVE and connect status to REACHABLE
- yield self.adapter_agent.device_state_update(self.device_id,
- connect_status=ConnectStatus.REACHABLE,
- oper_status=OperStatus.ACTIVE)
+ yield self.core_proxy.device_state_update(self.device_id,
+ connect_status=ConnectStatus.REACHABLE,
+ oper_status=OperStatus.ACTIVE)
# TODO: establish frame grpc-stream
# yield reactor.callInThread(self.rcv_grpc)