[VOL-1034] This commit consists of:
1) Implement PM collections from the ONU
2) Update the Registration method to include for the adapter type
and its supported device types.
Change-Id: Id984468546328b6ebf2ca47578675c69b2b66f01
diff --git a/adapters/kafka/adapter_proxy.py b/adapters/kafka/adapter_proxy.py
index 2d4831a..fad1093 100644
--- a/adapters/kafka/adapter_proxy.py
+++ b/adapters/kafka/adapter_proxy.py
@@ -56,7 +56,7 @@
to_adapter,
to_device_id=None,
proxy_device_id=None,
- message_no=None):
+ message_id=None):
"""
Sends a message directly to an adapter. This is typically used to send
proxied messages from one adapter to another. An initial ACK response
@@ -73,7 +73,7 @@
:param proxy_device_id: The ID of the device which will proxy that
message. If it's None then there is no specific device to proxy the
message. Its interpretation is adapter specific.
- :param message_no: A unique number for this transaction that the
+ :param message_id: A unique number for this transaction that the
adapter may use to correlate a request and an async response.
"""
@@ -91,8 +91,8 @@
h.to_device_id = self._to_string(to_device_id)
h.proxy_device_id = self._to_string(proxy_device_id)
- if message_no:
- h.id = self._to_string(message_no)
+ if message_id:
+ h.id = self._to_string(message_id)
else:
h.id = uuid4().hex
diff --git a/adapters/kafka/core_proxy.py b/adapters/kafka/core_proxy.py
index 36459ed..cc3f081 100644
--- a/adapters/kafka/core_proxy.py
+++ b/adapters/kafka/core_proxy.py
@@ -38,10 +38,12 @@
@ContainerProxy.wrap_request(CoreInstance)
@inlineCallbacks
- def register(self, adapter):
+ def register(self, adapter, deviceTypes):
log.debug("register")
try:
- res = yield self.invoke(rpc="Register", adapter=adapter)
+ res = yield self.invoke(rpc="Register",
+ adapter=adapter,
+ deviceTypes=deviceTypes)
log.info("registration-returned", res=res)
returnValue(res)
except Exception as e:
diff --git a/adapters/ponsim_olt/main.py b/adapters/ponsim_olt/main.py
index c9ad9d0..569e284 100755
--- a/adapters/ponsim_olt/main.py
+++ b/adapters/ponsim_olt/main.py
@@ -44,7 +44,7 @@
from adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
from adapters.ponsim_olt.ponsim_olt import PonSimOltAdapter
from adapters.protos import third_party
-from adapters.protos.adapter_pb2 import AdapterConfig, Adapter
+from adapters.protos.adapter_pb2 import AdapterConfig
_ = third_party
@@ -284,6 +284,7 @@
if not args.no_banner:
print_banner(self.log)
+ self.adapter = None
# Create a unique instance id using the passed-in instance id and
# UTC timestamp
current_time = arrow.utcnow().timestamp
@@ -366,10 +367,11 @@
core_topic=self.core_topic,
my_listening_topic=self.listening_topic)
- ponsim_olt_adapter = PonSimOltAdapter(
- core_proxy=self.core_proxy, adapter_proxy=self.adapter_proxy, config=config)
- ponsim_request_handler = AdapterRequestFacade(
- adapter=ponsim_olt_adapter)
+ self.adapter = PonSimOltAdapter(core_proxy=self.core_proxy,
+ adapter_proxy=self.adapter_proxy,
+ config=config)
+
+ ponsim_request_handler = AdapterRequestFacade(adapter=self.adapter)
yield registry.register(
'kafka_adapter_proxy',
@@ -423,15 +425,14 @@
@inlineCallbacks
def _register_with_core(self, retries):
- # Send registration to Core with adapter specs
- adapter = Adapter()
- adapter.id = self.args.name
- adapter.vendor = self.args.name
- adapter.version = self.ponsim_olt_adapter_version
while 1:
try:
- resp = yield self.core_proxy.register(adapter)
- self.log.info('registration-response', response=resp)
+ resp = yield self.core_proxy.register(
+ self.adapter.adapter_descriptor(),
+ self.adapter.device_types())
+ if resp:
+ self.log.info('registered-with-core',
+ coreId=resp.instance_id)
returnValue(resp)
except TimeOutError as e:
self.log.warn("timeout-when-registering-with-core", e=e)
diff --git a/adapters/ponsim_olt/ponsim_olt.py b/adapters/ponsim_olt/ponsim_olt.py
index 88c6b4d..52fb63b 100644
--- a/adapters/ponsim_olt/ponsim_olt.py
+++ b/adapters/ponsim_olt/ponsim_olt.py
@@ -49,7 +49,7 @@
OFPC_GROUP_STATS, OFPC_PORT_STATS, OFPC_TABLE_STATS, OFPC_FLOW_STATS, \
ofp_switch_features, ofp_desc
from adapters.protos.openflow_13_pb2 import ofp_port
-from adapters.protos.ponsim_pb2 import FlowTable, PonSimFrame
+from adapters.protos.ponsim_pb2 import FlowTable, PonSimFrame, PonSimMetricsRequest
_ = third_party
log = structlog.get_logger()
@@ -454,7 +454,7 @@
res = stub.UpdateFlowTable(f)
# Send response back
reply = InterAdapterResponseBody()
- reply.success = True
+ reply.status = True
self.log.info('sending-response-back', reply=reply)
yield self.adapter_proxy.send_inter_adapter_message(
msg=reply,
@@ -462,7 +462,27 @@
from_adapter=self.adapter.name,
to_adapter=request.header.from_topic,
to_device_id=request.header.to_device_id,
- message_no=request.header.id
+ message_id=request.header.id
+ )
+ elif request.header.type == InterAdapterMessageType.METRICS_REQUEST:
+ m = PonSimMetricsRequest()
+ if request.body:
+ request.body.Unpack(m)
+ stub = ponsim_pb2.PonSimStub(self.channel)
+ self.log.info('proxying onu stats request', port=m.port)
+ res = stub.GetStats(m)
+ # Send response back
+ reply = InterAdapterResponseBody()
+ reply.status = True
+ reply.body.Pack(res)
+ self.log.info('sending-response-back', reply=reply)
+ yield self.adapter_proxy.send_inter_adapter_message(
+ msg=reply,
+ type=InterAdapterMessageType.METRICS_RESPONSE,
+ from_adapter=self.adapter.name,
+ to_adapter=request.header.from_topic,
+ to_device_id=request.header.to_device_id,
+ message_id=request.header.id
)
except Exception as e:
self.log.exception("error-processing-inter-adapter-message", e=e)
@@ -538,6 +558,8 @@
self.close_channel()
self.log.info('disabled-grpc-channel')
+ self.stop_kpi_collection()
+
# TODO:
# 1) Remove all flows from the device
# 2) Remove the device from ponsim
@@ -624,7 +646,7 @@
}
)
- # Step 3: submit directlt to kafka bus
+ # Step 3: submit directly to the kafka bus
if kafka_cluster_proxy:
if isinstance(kpi_event, Message):
kpi_event = dumps(MessageToDict(kpi_event, True, True))
diff --git a/adapters/ponsim_onu/main.py b/adapters/ponsim_onu/main.py
index f4c57f4..3f18e50 100755
--- a/adapters/ponsim_onu/main.py
+++ b/adapters/ponsim_onu/main.py
@@ -18,33 +18,33 @@
"""Ponsim ONU Adapter main entry point"""
import argparse
-import arrow
import os
import time
+import arrow
import yaml
+from packaging.version import Version
from simplejson import dumps
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.task import LoopingCall
from zope.interface import implementer
-from adapters.protos import third_party
+
from adapters.common.structlog_setup import setup_logging, update_logging
+from adapters.common.utils.asleep import asleep
+from adapters.common.utils.deferred_utils import TimeOutError
from adapters.common.utils.dockerhelpers import get_my_containers_name
from adapters.common.utils.nethelpers import get_my_primary_local_ipv4, \
get_my_primary_interface
-from adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
from adapters.common.utils.registry import registry, IComponent
-from packaging.version import Version
-from adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, get_messaging_proxy
-from adapters.ponsim_onu.ponsim_onu import PonSimOnuAdapter
-from adapters.protos.adapter_pb2 import AdapterConfig, Adapter
+from adapters.kafka.adapter_proxy import AdapterProxy
from adapters.kafka.adapter_request_facade import AdapterRequestFacade
from adapters.kafka.core_proxy import CoreProxy
-
-from adapters.kafka.adapter_proxy import AdapterProxy
-
-from adapters.common.utils.deferred_utils import TimeOutError
-from adapters.common.utils.asleep import asleep
+from adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
+ get_messaging_proxy
+from adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
+from adapters.ponsim_onu.ponsim_onu import PonSimOnuAdapter
+from adapters.protos import third_party
+from adapters.protos.adapter_pb2 import AdapterConfig
_ = third_party
@@ -283,6 +283,7 @@
if not args.no_banner:
print_banner(self.log)
+ self.adapter = None
# Create a unique instance id using the passed-in instance id and
# UTC timestamp
current_time = arrow.utcnow().timestamp
@@ -365,10 +366,11 @@
core_topic=self.core_topic,
my_listening_topic=self.listening_topic)
- ponsim_onu_adapter = PonSimOnuAdapter(
- core_proxy=self.core_proxy, adapter_proxy=self.adapter_proxy, config=config)
+ self.adapter = PonSimOnuAdapter(
+ core_proxy=self.core_proxy, adapter_proxy=self.adapter_proxy,
+ config=config)
ponsim_request_handler = AdapterRequestFacade(
- adapter=ponsim_onu_adapter)
+ adapter=self.adapter)
yield registry.register(
'kafka_adapter_proxy',
@@ -421,15 +423,15 @@
@inlineCallbacks
def _register_with_core(self, retries):
- # Send registration to Core with adapter specs
- adapter = Adapter()
- adapter.id = self.args.name
- adapter.vendor = self.args.name
- adapter.version = self.ponsim_olt_adapter_version
while 1:
try:
- resp = yield self.core_proxy.register(adapter)
- self.log.info('registration-response', response=resp)
+ resp = yield self.core_proxy.register(
+ self.adapter.adapter_descriptor(),
+ self.adapter.device_types())
+ if resp:
+ self.log.info('registered-with-core',
+ coreId=resp.instance_id)
+
returnValue(resp)
except TimeOutError as e:
self.log.warn("timeout-when-registering-with-core", e=e)
diff --git a/adapters/ponsim_onu/ponsim_onu.py b/adapters/ponsim_onu/ponsim_onu.py
index dfac1d3..e15d0a9 100644
--- a/adapters/ponsim_onu/ponsim_onu.py
+++ b/adapters/ponsim_onu/ponsim_onu.py
@@ -18,21 +18,31 @@
Represents an ONU device
"""
+from uuid import uuid4
+
+import arrow
import structlog
-from twisted.internet.defer import DeferredQueue, inlineCallbacks, returnValue
+from google.protobuf.json_format import MessageToDict
+from google.protobuf.message import Message
+from simplejson import dumps
+from twisted.internet.defer import DeferredQueue, inlineCallbacks, \
+ returnValue, Deferred
+from twisted.internet.task import LoopingCall
from adapters.common.utils.asleep import asleep
from adapters.iadapter import OnuAdapter
+from adapters.kafka.kafka_proxy import get_kafka_proxy
from adapters.protos import third_party
from adapters.protos.common_pb2 import OperStatus, ConnectStatus, AdminState
from adapters.protos.core_adapter_pb2 import PortCapability, \
InterAdapterMessageType, InterAdapterResponseBody
-from adapters.protos.device_pb2 import Port
+from adapters.protos.device_pb2 import Port, PmConfig, PmConfigs
+from adapters.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
from adapters.protos.logical_device_pb2 import LogicalPort
from adapters.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, \
OFPPF_1GB_FD
from adapters.protos.openflow_13_pb2 import ofp_port
-from adapters.protos.ponsim_pb2 import FlowTable
+from adapters.protos.ponsim_pb2 import FlowTable, PonSimMetricsRequest, PonSimMetrics
_ = third_party
log = structlog.get_logger()
@@ -42,9 +52,97 @@
return tuple(int(d, 16) for d in mac.split(':'))
+class AdapterPmMetrics:
+ def __init__(self, device):
+ self.pm_names = {'tx_64_pkts', 'tx_65_127_pkts', 'tx_128_255_pkts',
+ 'tx_256_511_pkts', 'tx_512_1023_pkts',
+ 'tx_1024_1518_pkts', 'tx_1519_9k_pkts',
+ 'rx_64_pkts', 'rx_65_127_pkts',
+ 'rx_128_255_pkts', 'rx_256_511_pkts',
+ 'rx_512_1023_pkts', 'rx_1024_1518_pkts',
+ 'rx_1519_9k_pkts'}
+ self.device = device
+ self.id = device.id
+ self.name = 'ponsim_onu'
+ self.default_freq = 150
+ self.grouped = False
+ self.freq_override = False
+ self.pm_metrics = None
+ self.pon_metrics_config = dict()
+ self.uni_metrics_config = dict()
+ self.lc = None
+ for m in self.pm_names:
+ self.pon_metrics_config[m] = PmConfig(name=m,
+ type=PmConfig.COUNTER,
+ enabled=True)
+ self.uni_metrics_config[m] = PmConfig(name=m,
+ type=PmConfig.COUNTER,
+ enabled=True)
+
+ def update(self, pm_config):
+ if self.default_freq != pm_config.default_freq:
+ # Update the callback to the new frequency.
+ self.default_freq = pm_config.default_freq
+ self.lc.stop()
+ self.lc.start(interval=self.default_freq / 10)
+ for m in pm_config.metrics:
+ self.pon_metrics_config[m.name].enabled = m.enabled
+ self.uni_metrics_config[m.name].enabled = m.enabled
+
+ def make_proto(self):
+ pm_config = PmConfigs(
+ id=self.id,
+ default_freq=self.default_freq,
+ grouped=False,
+ freq_override=False)
+ for m in sorted(self.pon_metrics_config):
+ pm = self.pon_metrics_config[m] # Either will do they're the same
+ pm_config.metrics.extend([PmConfig(name=pm.name,
+ type=pm.type,
+ enabled=pm.enabled)])
+ return pm_config
+
+ def extract_metrics(self, stats):
+ rtrn_port_metrics = dict()
+ rtrn_port_metrics['pon'] = self.extract_pon_metrics(stats)
+ rtrn_port_metrics['uni'] = self.extract_uni_metrics(stats)
+ return rtrn_port_metrics
+
+ def extract_pon_metrics(self, stats):
+ rtrn_pon_metrics = dict()
+ for m in stats.metrics:
+ if m.port_name == "pon":
+ for p in m.packets:
+ if self.pon_metrics_config[p.name].enabled:
+ rtrn_pon_metrics[p.name] = p.value
+ return rtrn_pon_metrics
+
+ def extract_uni_metrics(self, stats):
+ rtrn_pon_metrics = dict()
+ for m in stats.metrics:
+ if m.port_name == "uni":
+ for p in m.packets:
+ if self.pon_metrics_config[p.name].enabled:
+ rtrn_pon_metrics[p.name] = p.value
+ return rtrn_pon_metrics
+
+ def start_collector(self, callback):
+ log.info("starting-pm-collection", device_name=self.name,
+ device_id=self.device.id)
+ prefix = 'voltha.{}.{}'.format(self.name, self.device.id)
+ self.lc = LoopingCall(callback, self.device.id, prefix)
+ self.lc.start(interval=self.default_freq / 10)
+
+ def stop_collector(self):
+ log.info("stopping-pm-collection", device_name=self.name,
+ device_id=self.device.id)
+ self.lc.stop()
+
+
class PonSimOnuAdapter(OnuAdapter):
def __init__(self, core_proxy, adapter_proxy, config):
- # DeviceType of ONU should be same as VENDOR ID of ONU Serial Number as specified by standard
+ # DeviceType of ONU should be same as VENDOR ID of ONU Serial Number
+ # as specified by standard
# requires for identifying correct adapter or ranged ONU
super(PonSimOnuAdapter, self).__init__(core_proxy=core_proxy,
adapter_proxy=adapter_proxy,
@@ -68,14 +166,27 @@
self.device_parent_id = None
self.log = structlog.get_logger(device_id=device_id)
self.incoming_messages = DeferredQueue()
+ self.inter_adapter_message_deferred_map = {}
self.proxy_address = None
# reference of uni_port is required when re-enabling the device if
# it was disabled previously
self.uni_port = None
self.pon_port = None
+ def _to_string(self, unicode_str):
+ if unicode_str is not None:
+ if type(unicode_str) == unicode:
+ return unicode_str.encode('ascii', 'ignore')
+ else:
+ return unicode_str
+ else:
+ return ""
+
def receive_message(self, msg):
- self.incoming_messages.put(msg)
+ trns_id = self._to_string(msg.header.id)
+ if trns_id in self.inter_adapter_message_deferred_map:
+ self.inter_adapter_message_deferred_map[trns_id].callback(msg)
+ # self.incoming_messages.put(msg)
@inlineCallbacks
def activate(self, device):
@@ -90,6 +201,12 @@
device.model = 'n/a'
yield self.core_proxy.device_update(device)
+ # Now set the initial PM configuration for this device
+ self.pm_metrics = AdapterPmMetrics(device)
+ pm_config = self.pm_metrics.make_proto()
+ log.info("initial-pm-config", pm_config=pm_config)
+ self.core_proxy.device_pm_config_update(pm_config, init=True)
+
# register physical ports
self.uni_port = Port(
port_no=2,
@@ -118,10 +235,15 @@
connect_status=ConnectStatus.REACHABLE,
oper_status=OperStatus.ACTIVE)
+ # Start collecting stats from the device after a brief pause
+ self.start_kpi_collection(device.id)
+
# TODO: Return only port specific info
def get_ofp_port_info(self, device, port_no):
- # Since the adapter created the device port then it has the reference of the port to
- # return the capability. TODO: Do a lookup on the UNI port number and return the
+ # Since the adapter created the device port then it has the reference
+ # of the port to
+ # return the capability. TODO: Do a lookup on the UNI port number
+ # and return the
# appropriate attributes
self.log.info('get_ofp_port_info', port_no=port_no, device_id=device.id)
cap = OFPPF_1GB_FD | OFPPF_FIBER
@@ -159,18 +281,22 @@
@inlineCallbacks
def update_flow_table(self, flows):
+ trnsId = None
try:
self.log.info('update_flow_table', flows=flows)
# we need to proxy through the OLT to get to the ONU
- # reset response queue
- while self.incoming_messages.pending:
- yield self.incoming_messages.get()
-
fb = FlowTable(
port=self.proxy_address.channel_id,
flows=flows
)
+
+ # Create a deferred to wait for the result as well as a transid
+ wait_for_result = Deferred()
+ trnsId = uuid4().hex
+ self.inter_adapter_message_deferred_map[
+ self._to_string(trnsId)] = wait_for_result
+
# Sends the request via proxy and wait for an ACK
yield self.adapter_proxy.send_inter_adapter_message(
msg=fb,
@@ -178,21 +304,25 @@
from_adapter=self.adapter.name,
to_adapter=self.proxy_address.device_type,
to_device_id=self.device_id,
- proxy_device_id=self.proxy_address.device_id
+ proxy_device_id=self.proxy_address.device_id,
+ message_id=trnsId
)
# Wait for the full response from the proxied adapter
- res = yield self.incoming_messages.get()
- self.log.info('response-received', result=res)
+ res = yield wait_for_result
+ if res.header.type == InterAdapterMessageType.FLOW_RESPONSE:
+ body = InterAdapterResponseBody()
+ res.body.Unpack(body)
+ self.log.info('response-received', result=body.status)
except Exception as e:
self.log.exception("update-flow-error", e=e)
+ finally:
+ if trnsId in self.inter_adapter_message_deferred_map:
+ del self.inter_adapter_message_deferred_map[trnsId]
def process_inter_adapter_message(self, msg):
+ # We expect only responses on the ONU side
self.log.info('process-inter-adapter-message', msg=msg)
- if msg.header.type == InterAdapterMessageType.FLOW_RESPONSE:
- body = InterAdapterResponseBody()
- msg.body.Unpack(body)
- self.log.info('received-response', status=body.success)
- self.receive_message(msg)
+ self.receive_message(msg)
def remove_from_flow_table(self, flows):
self.log.debug('remove-from-flow-table', flows=flows)
@@ -243,6 +373,8 @@
oper_status=OperStatus.UNKNOWN,
connect_status=ConnectStatus.UNREACHABLE)
+ self.stop_kpi_collection()
+
# TODO:
# 1) Remove all flows from the device
# 2) Remove the device from ponsim
@@ -280,6 +412,8 @@
oper_status=OperStatus.ACTIVE,
connect_status=ConnectStatus.REACHABLE)
+ self.start_kpi_collection(self.device_id)
+
self.log.info('re-enabled', device_id=self.device_id)
except Exception, e:
self.log.exception('error-reenabling', e=e)
@@ -292,3 +426,83 @@
# 2) Remove the device from ponsim
self.log.info('deleted', device_id=self.device_id)
+
+ def start_kpi_collection(self, device_id):
+ kafka_cluster_proxy = get_kafka_proxy()
+
+ @inlineCallbacks
+ def _collect(device_id, prefix):
+ try:
+ self.log.debug("pm-collection-interval")
+ # Proxy a message to ponsim_olt. The OLT will then query the ONU
+ # for statistics. The reply will
+ # arrive proxied back to us in self.receive_message().
+ msg = PonSimMetricsRequest(port=self.proxy_address.channel_id)
+
+ # Create a deferred to wait for the result as well as a transid
+ wait_for_result = Deferred()
+ trnsId = uuid4().hex
+ self.inter_adapter_message_deferred_map[
+ self._to_string(trnsId)] = wait_for_result
+
+ # Sends the request via proxy and wait for an ACK
+ yield self.adapter_proxy.send_inter_adapter_message(
+ msg=msg,
+ type=InterAdapterMessageType.METRICS_REQUEST,
+ from_adapter=self.adapter.name,
+ to_adapter=self.proxy_address.device_type,
+ to_device_id=self.device_id,
+ proxy_device_id=self.proxy_address.device_id,
+ message_id=trnsId
+ )
+ # Wait for the full response from the proxied adapter
+ res = yield wait_for_result
+ # Remove the transaction from the transaction map
+ del self.inter_adapter_message_deferred_map[self._to_string(trnsId)]
+
+ # Message is a reply to an ONU statistics request. Push it out to
+ # Kafka via adapter.submit_kpis().
+ if res.header.type == InterAdapterMessageType.METRICS_RESPONSE:
+ msg = InterAdapterResponseBody()
+ res.body.Unpack(msg)
+ self.log.debug('metrics-response-received', result=msg.status)
+ if self.pm_metrics:
+ self.log.debug('Handling incoming ONU metrics')
+ response = PonSimMetrics()
+ msg.body.Unpack(response)
+ port_metrics = self.pm_metrics.extract_metrics(response)
+ try:
+ ts = arrow.utcnow().timestamp
+ kpi_event = KpiEvent(
+ type=KpiEventType.slice,
+ ts=ts,
+ prefixes={
+ # OLT NNI port
+ prefix + '.uni': MetricValuePairs(
+ metrics=port_metrics['uni']),
+ # OLT PON port
+ prefix + '.pon': MetricValuePairs(
+ metrics=port_metrics['pon'])
+ }
+ )
+
+ self.log.debug(
+ 'Submitting KPI for incoming ONU mnetrics')
+
+ # Step 3: submit directly to the kafka bus
+ if kafka_cluster_proxy:
+ if isinstance(kpi_event, Message):
+ kpi_event = dumps(
+ MessageToDict(kpi_event, True, True))
+ kafka_cluster_proxy.send_message("voltha.kpis",
+ kpi_event)
+
+ except Exception as e:
+ log.exception('failed-to-submit-kpis', e=e)
+ except Exception as e:
+ log.exception('failed-to-collect-metrics', e=e)
+
+ self.pm_metrics.start_collector(_collect)
+
+ def stop_kpi_collection(self):
+ self.pm_metrics.stop_collector()
diff --git a/protos/core_adapter.proto b/protos/core_adapter.proto
index 8b52a56..c995b0d 100644
--- a/protos/core_adapter.proto
+++ b/protos/core_adapter.proto
@@ -97,8 +97,8 @@
}
message InterAdapterResponseBody {
- bool success = 1;
- google.protobuf.Any result = 2;
+ bool status = 1;
+ google.protobuf.Any body = 2;
}
message InterAdapterMessage {
diff --git a/protos/ponsim.proto b/protos/ponsim.proto
index e477d98..1cd9e4e 100644
--- a/protos/ponsim.proto
+++ b/protos/ponsim.proto
@@ -44,6 +44,10 @@
repeated PonSimPortMetrics metrics = 2;
}
+message PonSimMetricsRequest {
+ int32 port = 1;
+}
+
service PonSim {
rpc SendFrame(PonSimFrame)
returns (google.protobuf.Empty) {}
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index e7f69f5..570b445 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -30,14 +30,16 @@
type AdapterRequestHandlerProxy struct {
TestMode bool
+ coreInstanceId string
deviceMgr *DeviceManager
lDeviceMgr *LogicalDeviceManager
localDataProxy *model.Proxy
clusterDataProxy *model.Proxy
}
-func NewAdapterRequestHandlerProxy(dMgr *DeviceManager, ldMgr *LogicalDeviceManager, cdProxy *model.Proxy, ldProxy *model.Proxy) *AdapterRequestHandlerProxy {
+func NewAdapterRequestHandlerProxy(coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager, cdProxy *model.Proxy, ldProxy *model.Proxy) *AdapterRequestHandlerProxy {
var proxy AdapterRequestHandlerProxy
+ proxy.coreInstanceId = coreInstanceId
proxy.deviceMgr = dMgr
proxy.lDeviceMgr = ldMgr
proxy.clusterDataProxy = cdProxy
@@ -46,23 +48,34 @@
}
func (rhp *AdapterRequestHandlerProxy) Register(args []*ca.Argument) (*voltha.CoreInstance, error) {
- if len(args) != 1 {
+ if len(args) != 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
adapter := &voltha.Adapter{}
- if err := ptypes.UnmarshalAny(args[0].Value, adapter); err != nil {
- log.Warnw("cannot-unmarshal-adapter", log.Fields{"error": err})
- return nil, err
+ deviceTypes := &voltha.DeviceTypes{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "adapter":
+ if err := ptypes.UnmarshalAny(arg.Value, adapter); err != nil {
+ log.Warnw("cannot-unmarshal-adapter", log.Fields{"error": err})
+ return nil, err
+ }
+ case "deviceTypes":
+ if err := ptypes.UnmarshalAny(arg.Value, deviceTypes); err != nil {
+ log.Warnw("cannot-unmarshal-device-types", log.Fields{"error": err})
+ return nil, err
+ }
+ }
}
- log.Debugw("Register", log.Fields{"Adapter": *adapter})
+ log.Debugw("Register", log.Fields{"Adapter": *adapter, "DeviceTypes": deviceTypes, "coreId": rhp.coreInstanceId})
// TODO process the request and store the data in the KV store
if rhp.TestMode { // Execute only for test cases
return &voltha.CoreInstance{InstanceId: "CoreInstance"}, nil
}
- return &voltha.CoreInstance{InstanceId: "CoreInstance"}, nil
+ return &voltha.CoreInstance{InstanceId: rhp.coreInstanceId}, nil
}
func (rhp *AdapterRequestHandlerProxy) GetDevice(args []*ca.Argument) (*voltha.Device, error) {
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 8449ccb..6015e7f 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -56,27 +56,29 @@
// Setup the KV store
// Do not call NewBackend constructor; it creates its own KV client
- backend := model.Backend{
- Client: kvClient,
- StoreType: cf.KVStoreType,
- Host: cf.KVStoreHost,
- Port: cf.KVStorePort,
- Timeout: cf.KVStoreTimeout,
- PathPrefix: "service/voltha"}
- core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, &backend)
- core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &backend)
+ // Commented the backend for now until the issue between the model and the KV store
+ // is resolved.
+ //backend := model.Backend{
+ // Client: kvClient,
+ // StoreType: cf.KVStoreType,
+ // Host: cf.KVStoreHost,
+ // Port: cf.KVStorePort,
+ // Timeout: cf.KVStoreTimeout,
+ // PathPrefix: "service/voltha"}
+ core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, nil)
+ core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, nil)
core.clusterDataProxy = core.clusterDataRoot.GetProxy("/", false)
core.localDataProxy = core.localDataRoot.GetProxy("/", false)
return &core
}
func (core *Core) Start(ctx context.Context) {
- log.Info("starting-core")
+ log.Info("starting-core", log.Fields{"coreId": core.instanceId})
core.startKafkaMessagingProxy(ctx)
log.Info("values", log.Fields{"kmp": core.kmp})
core.deviceMgr = newDeviceManager(core.kmp, core.clusterDataProxy)
core.logicalDeviceMgr = newLogicalDeviceManager(core.deviceMgr, core.kmp, core.clusterDataProxy)
- core.registerAdapterRequestHandler(ctx, core.deviceMgr, core.logicalDeviceMgr, core.localDataProxy, core.clusterDataProxy)
+ core.registerAdapterRequestHandler(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.localDataProxy, core.clusterDataProxy)
go core.startDeviceManager(ctx)
go core.startLogicalDeviceManager(ctx)
go core.startGRPCService(ctx)
@@ -135,9 +137,9 @@
return nil
}
-func (core *Core) registerAdapterRequestHandler(ctx context.Context, dMgr *DeviceManager, ldMgr *LogicalDeviceManager,
+func (core *Core) registerAdapterRequestHandler(ctx context.Context, coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager,
cdProxy *model.Proxy, ldProxy *model.Proxy) error {
- requestProxy := NewAdapterRequestHandlerProxy(dMgr, ldMgr, cdProxy, ldProxy)
+ requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, cdProxy, ldProxy)
core.kmp.SubscribeWithTarget(kafka.Topic{Name: core.config.CoreTopic}, requestProxy)
log.Info("request-handlers")