VOL-1451 Initial checkin of openonu build
Produced docker container capable of building and running
openonu/brcm_openonci_onu. Copied over current onu code
and resolved all imports by copying into the local source tree.
Change-Id: Ib9785d37afc65b7d32ecf74aee2456352626e2b6
diff --git a/python/kafka/__init__.py b/python/kafka/__init__.py
new file mode 100644
index 0000000..58aca1e
--- /dev/null
+++ b/python/kafka/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/python/kafka/adapter_proxy.py b/python/kafka/adapter_proxy.py
new file mode 100644
index 0000000..ddb11da
--- /dev/null
+++ b/python/kafka/adapter_proxy.py
@@ -0,0 +1,110 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Agent to play gateway between adapters.
+"""
+
+import structlog
+from uuid import uuid4
+from twisted.internet.defer import inlineCallbacks, returnValue
+from container_proxy import ContainerProxy
+from voltha.protos import third_party
+from voltha.protos.inter_container_pb2 import InterAdapterHeader, \
+ InterAdapterMessage
+import time
+
+_ = third_party
+log = structlog.get_logger()
+
+
+class AdapterProxy(ContainerProxy):
+
+ def __init__(self, kafka_proxy, core_topic, my_listening_topic):
+ super(AdapterProxy, self).__init__(kafka_proxy,
+ core_topic,
+ my_listening_topic)
+
+ def _to_string(self, unicode_str):
+ if unicode_str is not None:
+ if type(unicode_str) == unicode:
+ return unicode_str.encode('ascii', 'ignore')
+ else:
+ return unicode_str
+ else:
+ return ""
+
+ @ContainerProxy.wrap_request(None)
+ @inlineCallbacks
+ def send_inter_adapter_message(self,
+ msg,
+ type,
+ from_adapter,
+ to_adapter,
+ to_device_id=None,
+ proxy_device_id=None,
+ message_id=None):
+ """
+ Sends a message directly to an adapter. This is typically used to send
+ proxied messages from one adapter to another. An initial ACK response
+ is sent back to the invoking adapter. If there is subsequent response
+ to be sent back (async) then the adapter receiving this request will
+ use this same API to send back the async response.
+ :param msg : GRPC message to send
+ :param type : InterAdapterMessageType of the message to send
+ :param from_adapter: Name of the adapter making the request.
+ :param to_adapter: Name of the remote adapter.
+ :param to_device_id: The ID of the device for to the message is
+ intended. if it's None then the message is not intended to a specific
+ device. Its interpretation is adapter specific.
+ :param proxy_device_id: The ID of the device which will proxy that
+ message. If it's None then there is no specific device to proxy the
+ message. Its interpretation is adapter specific.
+ :param message_id: A unique number for this transaction that the
+ adapter may use to correlate a request and an async response.
+ """
+
+ try:
+ # validate params
+ assert msg
+ assert from_adapter
+ assert to_adapter
+
+ # Build the inter adapter message
+ h = InterAdapterHeader()
+ h.type = type
+ h.from_topic = self._to_string(from_adapter)
+ h.to_topic = self._to_string(to_adapter)
+ h.to_device_id = self._to_string(to_device_id)
+ h.proxy_device_id = self._to_string(proxy_device_id)
+
+ if message_id:
+ h.id = self._to_string(message_id)
+ else:
+ h.id = uuid4().hex
+
+ h.timestamp = int(round(time.time() * 1000))
+ iaMsg = InterAdapterMessage()
+ iaMsg.header.CopyFrom(h)
+ iaMsg.body.Pack(msg)
+
+ log.debug("sending-inter-adapter-message", header=iaMsg.header)
+ res = yield self.invoke(rpc="process_inter_adapter_message",
+ to_topic=iaMsg.header.to_topic,
+ msg=iaMsg)
+ returnValue(res)
+ except Exception as e:
+ log.exception("error-sending-request", e=e)
diff --git a/python/kafka/adapter_request_facade.py b/python/kafka/adapter_request_facade.py
new file mode 100644
index 0000000..7bf41e5
--- /dev/null
+++ b/python/kafka/adapter_request_facade.py
@@ -0,0 +1,337 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+This facade handles kafka-formatted messages from the Core, extracts the kafka
+formatting and forwards the request to the concrete handler.
+"""
+import structlog
+from twisted.internet.defer import inlineCallbacks
+from zope.interface import implementer
+from twisted.internet import reactor
+
+from afkak.consumer import OFFSET_LATEST, OFFSET_EARLIEST
+from voltha.adapters.interface import IAdapterInterface
+from voltha.protos.inter_container_pb2 import IntType, InterAdapterMessage, StrType, Error, ErrorCode
+from voltha.protos.device_pb2 import Device, ImageDownload
+from voltha.protos.openflow_13_pb2 import FlowChanges, FlowGroups, Flows, \
+ FlowGroupChanges, ofp_packet_out
+from kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
+ get_messaging_proxy, KAFKA_OFFSET_LATEST, KAFKA_OFFSET_EARLIEST
+
+log = structlog.get_logger()
+
+class MacAddressError(BaseException):
+ def __init__(self, error):
+ self.error = error
+
+
+class IDError(BaseException):
+ def __init__(self, error):
+ self.error = error
+
+
+@implementer(IAdapterInterface)
+class AdapterRequestFacade(object):
+ """
+ Gate-keeper between CORE and device adapters.
+
+ On one side it interacts with Core's internal model and update/dispatch
+ mechanisms.
+
+ On the other side, it interacts with the adapters standard interface as
+ defined in
+ """
+
+ def __init__(self, adapter):
+ self.adapter = adapter
+
+ @inlineCallbacks
+ def start(self):
+ log.debug('starting')
+
+ @inlineCallbacks
+ def stop(self):
+ log.debug('stopping')
+
+ @inlineCallbacks
+ def createKafkaDeviceTopic(self, deviceId):
+ log.debug("subscribing-to-topic", device_id=deviceId)
+ kafka_proxy = get_messaging_proxy()
+ device_topic = kafka_proxy.get_default_topic() + "_" + deviceId
+ # yield kafka_proxy.create_topic(topic=device_topic)
+ yield kafka_proxy.subscribe(topic=device_topic, group_id=device_topic, target_cls=self, offset=KAFKA_OFFSET_EARLIEST)
+ log.debug("subscribed-to-topic", topic=device_topic)
+
+ def adopt_device(self, device):
+ d = Device()
+ if device:
+ device.Unpack(d)
+
+ # Start the creation of a device specific topic to handle all
+ # subsequent requests from the Core. This adapter instance will
+ # handle all requests for that device.
+ reactor.callLater(0, self.createKafkaDeviceTopic, d.id)
+
+ result = self.adapter.adopt_device(d)
+ # return True, self.adapter.adopt_device(d)
+
+ return True, result
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="device-invalid")
+
+ def get_ofp_device_info(self, device):
+ d = Device()
+ if device:
+ device.Unpack(d)
+ return True, self.adapter.get_ofp_device_info(d)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="device-invalid")
+
+ def get_ofp_port_info(self, device, port_no):
+ d = Device()
+ if device:
+ device.Unpack(d)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="device-invalid")
+ p = IntType()
+ if port_no:
+ port_no.Unpack(p)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="port-no-invalid")
+
+ return True, self.adapter.get_ofp_port_info(d, p.val)
+
+ def reconcile_device(self, device):
+ return self.adapter.reconcile_device(device)
+
+ def abandon_device(self, device):
+ return self.adapter.abandon_device(device)
+
+ def disable_device(self, device):
+ d = Device()
+ if device:
+ device.Unpack(d)
+ return True, self.adapter.disable_device(d)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="device-invalid")
+
+ def reenable_device(self, device):
+ d = Device()
+ if device:
+ device.Unpack(d)
+ return True, self.adapter.reenable_device(d)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="device-invalid")
+
+ def reboot_device(self, device):
+ d = Device()
+ if device:
+ device.Unpack(d)
+ return (True, self.adapter.reboot_device(d))
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="device-invalid")
+
+ def download_image(self, device, request):
+ d = Device()
+ if device:
+ device.Unpack(d)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="device-invalid")
+ img = ImageDownload()
+ if request:
+ request.Unpack(img)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="port-no-invalid")
+
+ return True, self.adapter.download_image(device, request)
+
+ def get_image_download_status(self, device, request):
+ d = Device()
+ if device:
+ device.Unpack(d)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="device-invalid")
+ img = ImageDownload()
+ if request:
+ request.Unpack(img)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="port-no-invalid")
+
+ return True, self.adapter.get_image_download_status(device, request)
+
+ def cancel_image_download(self, device, request):
+ d = Device()
+ if device:
+ device.Unpack(d)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="device-invalid")
+ img = ImageDownload()
+ if request:
+ request.Unpack(img)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="port-no-invalid")
+
+ return True, self.adapter.cancel_image_download(device, request)
+
+ def activate_image_update(self, device, request):
+ d = Device()
+ if device:
+ device.Unpack(d)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="device-invalid")
+ img = ImageDownload()
+ if request:
+ request.Unpack(img)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="port-no-invalid")
+
+ return True, self.adapter.activate_image_update(device, request)
+
+ def revert_image_update(self, device, request):
+ d = Device()
+ if device:
+ device.Unpack(d)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="device-invalid")
+ img = ImageDownload()
+ if request:
+ request.Unpack(img)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="port-no-invalid")
+
+ return True, self.adapter.revert_image_update(device, request)
+
+
+ def self_test(self, device):
+ return self.adapter.self_test_device(device)
+
+ def delete_device(self, device):
+ d = Device()
+ if device:
+ device.Unpack(d)
+ result = self.adapter.delete_device(d)
+ # return (True, self.adapter.delete_device(d))
+
+ # Before we return, delete the device specific topic as we will no
+ # longer receive requests from the Core for that device
+ kafka_proxy = get_messaging_proxy()
+ device_topic = kafka_proxy.get_default_topic() + "/" + d.id
+ kafka_proxy.unsubscribe(topic=device_topic)
+
+ return (True, result)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="device-invalid")
+
+ def get_device_details(self, device):
+ return self.adapter.get_device_details(device)
+
+ def update_flows_bulk(self, device, flows, groups):
+ d = Device()
+ if device:
+ device.Unpack(d)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="device-invalid")
+ f = Flows()
+ if flows:
+ flows.Unpack(f)
+
+ g = FlowGroups()
+ if groups:
+ groups.Unpack(g)
+
+ return (True, self.adapter.update_flows_bulk(d, f, g))
+
+ def update_flows_incrementally(self, device, flow_changes, group_changes):
+ d = Device()
+ if device:
+ device.Unpack(d)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="device-invalid")
+ f = FlowChanges()
+ if flow_changes:
+ flow_changes.Unpack(f)
+
+ g = FlowGroupChanges()
+ if group_changes:
+ group_changes.Unpack(g)
+
+ return (True, self.adapter.update_flows_incrementally(d, f, g))
+
+ def suppress_alarm(self, filter):
+ return self.adapter.suppress_alarm(filter)
+
+ def unsuppress_alarm(self, filter):
+ return self.adapter.unsuppress_alarm(filter)
+
+ def process_inter_adapter_message(self, msg):
+ m = InterAdapterMessage()
+ if msg:
+ msg.Unpack(m)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="msg-invalid")
+
+ return (True, self.adapter.process_inter_adapter_message(m))
+
+
+ def receive_packet_out(self, deviceId, outPort, packet):
+ try:
+ d_id = StrType()
+ if deviceId:
+ deviceId.Unpack(d_id)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="deviceid-invalid")
+
+ op = IntType()
+ if outPort:
+ outPort.Unpack(op)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="outport-invalid")
+
+ p = ofp_packet_out()
+ if packet:
+ packet.Unpack(p)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="packet-invalid")
+
+ return (True, self.adapter.receive_packet_out(d_id.val, op.val, p))
+ except Exception as e:
+ log.exception("error-processing-receive_packet_out", e=e)
+
diff --git a/python/kafka/container_proxy.py b/python/kafka/container_proxy.py
new file mode 100644
index 0000000..d7f18b4
--- /dev/null
+++ b/python/kafka/container_proxy.py
@@ -0,0 +1,133 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+The superclass for all kafka proxy subclasses.
+"""
+
+import structlog
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.python import failure
+from zope.interface import implementer
+
+from common.utils.deferred_utils import DeferredWithTimeout, \
+ TimeOutError
+from voltha.core.registry import IComponent
+
+log = structlog.get_logger()
+
+
+class KafkaMessagingError(BaseException):
+ def __init__(self, error):
+ self.error = error
+
+
+@implementer(IComponent)
+class ContainerProxy(object):
+
+ def __init__(self, kafka_proxy, core_topic, my_listening_topic):
+ self.kafka_proxy = kafka_proxy
+ self.listening_topic = my_listening_topic
+ self.core_topic = core_topic
+ self.default_timeout = 3
+
+ def start(self):
+ log.info('started')
+
+ return self
+
+ def stop(self):
+ log.info('stopped')
+
+ @classmethod
+ def wrap_request(cls, return_cls):
+ def real_wrapper(func):
+ @inlineCallbacks
+ def wrapper(*args, **kw):
+ try:
+ (success, d) = yield func(*args, **kw)
+ if success:
+ log.debug("successful-response", func=func, val=d)
+ if return_cls is not None:
+ rc = return_cls()
+ if d is not None:
+ d.Unpack(rc)
+ returnValue(rc)
+ else:
+ log.debug("successful-response-none", func=func,
+ val=None)
+ returnValue(None)
+ else:
+ log.warn("unsuccessful-request", func=func, args=args,
+ kw=kw)
+ returnValue(d)
+ except Exception as e:
+ log.exception("request-wrapper-exception", func=func, e=e)
+ raise
+
+ return wrapper
+
+ return real_wrapper
+
+ @inlineCallbacks
+ def invoke(self, rpc, to_topic=None, reply_topic=None, **kwargs):
+ @inlineCallbacks
+ def _send_request(rpc, m_callback, to_topic, reply_topic, **kwargs):
+ try:
+ log.debug("sending-request",
+ rpc=rpc,
+ to_topic=to_topic,
+ reply_topic=reply_topic)
+ if to_topic is None:
+ to_topic = self.core_topic
+ if reply_topic is None:
+ reply_topic = self.listening_topic
+ result = yield self.kafka_proxy.send_request(rpc=rpc,
+ to_topic=to_topic,
+ reply_topic=reply_topic,
+ callback=None,
+ **kwargs)
+ if not m_callback.called:
+ m_callback.callback(result)
+ else:
+ log.debug('timeout-already-occurred', rpc=rpc)
+ except Exception as e:
+ log.exception("Failure-sending-request", rpc=rpc, kw=kwargs)
+ if not m_callback.called:
+ m_callback.errback(failure.Failure())
+
+ # We are going to resend the request on the to_topic if there is a
+ # timeout error. This time the timeout will be longer. If the second
+ # request times out then we will send the request to the default
+ # core_topic.
+ timeouts = [self.default_timeout,
+ self.default_timeout*2,
+ self.default_timeout]
+ retry = 0
+ max_retry = 2
+ for timeout in timeouts:
+ cb = DeferredWithTimeout(timeout=timeout)
+ _send_request(rpc, cb, to_topic, reply_topic, **kwargs)
+ try:
+ res = yield cb
+ returnValue(res)
+ except TimeOutError as e:
+ log.warn('invoke-timeout', e=e)
+ if retry == max_retry:
+ raise e
+ retry += 1
+ if retry == max_retry:
+ to_topic = self.core_topic
diff --git a/python/kafka/core_proxy.py b/python/kafka/core_proxy.py
new file mode 100644
index 0000000..4f4579b
--- /dev/null
+++ b/python/kafka/core_proxy.py
@@ -0,0 +1,344 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Agent to play gateway between CORE and an adapter.
+"""
+import structlog
+from google.protobuf.message import Message
+from twisted.internet.defer import inlineCallbacks, returnValue
+
+from container_proxy import ContainerProxy
+from voltha.protos.common_pb2 import ID, ConnectStatus, OperStatus
+from voltha.protos.inter_container_pb2 import StrType, BoolType, IntType, Packet
+from voltha.protos.device_pb2 import Device, Ports
+from voltha.protos.voltha_pb2 import CoreInstance
+
+log = structlog.get_logger()
+
+
+def createSubTopic(*args):
+ return '_'.join(args)
+
+class CoreProxy(ContainerProxy):
+
+ def __init__(self, kafka_proxy, core_topic, my_listening_topic):
+ super(CoreProxy, self).__init__(kafka_proxy, core_topic,
+ my_listening_topic)
+
+ @ContainerProxy.wrap_request(CoreInstance)
+ @inlineCallbacks
+ def register(self, adapter, deviceTypes):
+ log.debug("register")
+ try:
+ res = yield self.invoke(rpc="Register",
+ adapter=adapter,
+ deviceTypes=deviceTypes)
+ log.info("registration-returned", res=res)
+ returnValue(res)
+ except Exception as e:
+ log.exception("registration-exception", e=e)
+ raise
+
+ @ContainerProxy.wrap_request(Device)
+ @inlineCallbacks
+ def get_device(self, device_id):
+ log.debug("get-device")
+ id = ID()
+ id.id = device_id
+ # Once we have a device being managed, all communications between the
+ # the adapter and the core occurs over a topic associated with that
+ # device
+ to_topic = createSubTopic(self.core_topic, device_id)
+ reply_topic = createSubTopic(self.listening_topic, device_id)
+ res = yield self.invoke(rpc="GetDevice",
+ to_topic=to_topic,
+ reply_topic=reply_topic,
+ device_id=id)
+ returnValue(res)
+
+ @ContainerProxy.wrap_request(Device)
+ @inlineCallbacks
+ def get_child_device(self, parent_device_id, **kwargs):
+ raise NotImplementedError()
+
+ @ContainerProxy.wrap_request(Ports)
+ @inlineCallbacks
+ def get_ports(self, device_id, port_type):
+ id = ID()
+ id.id = device_id
+ p_type = IntType()
+ p_type.val = port_type
+ to_topic = createSubTopic(self.core_topic, device_id)
+ reply_topic = createSubTopic(self.listening_topic, device_id)
+ res = yield self.invoke(rpc="GetPorts",
+ to_topic=to_topic,
+ reply_topic=reply_topic,
+ device_id=id,
+ port_type=p_type)
+ returnValue(res)
+
+ def get_child_devices(self, parent_device_id):
+ raise NotImplementedError()
+
+ def get_child_device_with_proxy_address(self, proxy_address):
+ raise NotImplementedError()
+
+ def _to_proto(self, **kwargs):
+ encoded = {}
+ for k, v in kwargs.iteritems():
+ if isinstance(v, Message):
+ encoded[k] = v
+ elif type(v) == int:
+ i_proto = IntType()
+ i_proto.val = v
+ encoded[k] = i_proto
+ elif type(v) == str:
+ s_proto = StrType()
+ s_proto.val = v
+ encoded[k] = s_proto
+ elif type(v) == bool:
+ b_proto = BoolType()
+ b_proto.val = v
+ encoded[k] = b_proto
+ return encoded
+
+ @ContainerProxy.wrap_request(None)
+ @inlineCallbacks
+ def child_device_detected(self,
+ parent_device_id,
+ parent_port_no,
+ child_device_type,
+ channel_id,
+ **kw):
+ id = ID()
+ id.id = parent_device_id
+ ppn = IntType()
+ ppn.val = parent_port_no
+ cdt = StrType()
+ cdt.val = child_device_type
+ channel = IntType()
+ channel.val = channel_id
+ to_topic = createSubTopic(self.core_topic, parent_device_id)
+ reply_topic = createSubTopic(self.listening_topic, parent_device_id)
+ args = self._to_proto(**kw)
+ res = yield self.invoke(rpc="ChildDeviceDetected",
+ to_topic=to_topic,
+ reply_topic=reply_topic,
+ parent_device_id=id,
+ parent_port_no=ppn,
+ child_device_type=cdt,
+ channel_id=channel,
+ **args)
+ returnValue(res)
+
+ @ContainerProxy.wrap_request(None)
+ @inlineCallbacks
+ def device_update(self, device):
+ log.debug("device_update")
+ to_topic = createSubTopic(self.core_topic, device.id)
+ reply_topic = createSubTopic(self.listening_topic, device.id)
+ res = yield self.invoke(rpc="DeviceUpdate",
+ to_topic=to_topic,
+ reply_topic=reply_topic,
+ device=device)
+ returnValue(res)
+
+ def child_device_removed(parent_device_id, child_device_id):
+ raise NotImplementedError()
+
+ @ContainerProxy.wrap_request(None)
+ @inlineCallbacks
+ def device_state_update(self, device_id,
+ oper_status=None,
+ connect_status=None):
+ id = ID()
+ id.id = device_id
+ o_status = IntType()
+ if oper_status or oper_status == OperStatus.UNKNOWN:
+ o_status.val = oper_status
+ else:
+ o_status.val = -1
+ c_status = IntType()
+ if connect_status or connect_status == ConnectStatus.UNKNOWN:
+ c_status.val = connect_status
+ else:
+ c_status.val = -1
+
+ to_topic = createSubTopic(self.core_topic, device_id)
+ reply_topic = createSubTopic(self.listening_topic, device_id)
+ res = yield self.invoke(rpc="DeviceStateUpdate",
+ to_topic=to_topic,
+ reply_topic=reply_topic,
+ device_id=id,
+ oper_status=o_status,
+ connect_status=c_status)
+ returnValue(res)
+
+ @ContainerProxy.wrap_request(None)
+ @inlineCallbacks
+ def children_state_update(self, device_id,
+ oper_status=None,
+ connect_status=None):
+ id = ID()
+ id.id = device_id
+ o_status = IntType()
+ if oper_status or oper_status == OperStatus.UNKNOWN:
+ o_status.val = oper_status
+ else:
+ o_status.val = -1
+ c_status = IntType()
+ if connect_status or connect_status == ConnectStatus.UNKNOWN:
+ c_status.val = connect_status
+ else:
+ c_status.val = -1
+
+ to_topic = createSubTopic(self.core_topic, device_id)
+ reply_topic = createSubTopic(self.listening_topic, device_id)
+ res = yield self.invoke(rpc="ChildrenStateUpdate",
+ to_topic=to_topic,
+ reply_topic=reply_topic,
+ device_id=id,
+ oper_status=o_status,
+ connect_status=c_status)
+ returnValue(res)
+
+ @ContainerProxy.wrap_request(None)
+ @inlineCallbacks
+ def port_state_update(self,
+ device_id,
+ port_type,
+ port_no,
+ oper_status):
+ id = ID()
+ id.id = device_id
+ pt = IntType()
+ pt.val = port_type
+ pNo = IntType()
+ pNo.val = port_no
+ o_status = IntType()
+ o_status.val = oper_status
+
+ to_topic = createSubTopic(self.core_topic, device_id)
+ reply_topic = createSubTopic(self.listening_topic, device_id)
+ res = yield self.invoke(rpc="PortStateUpdate",
+ to_topic=to_topic,
+ reply_topic=reply_topic,
+ device_id=id,
+ port_type=pt,
+ port_no=pNo,
+ oper_status=o_status)
+ returnValue(res)
+
+ @ContainerProxy.wrap_request(None)
+ @inlineCallbacks
+ def child_devices_state_update(self, parent_device_id,
+ oper_status=None,
+ connect_status=None):
+
+ id = ID()
+ id.id = parent_device_id
+ o_status = IntType()
+ if oper_status or oper_status == OperStatus.UNKNOWN:
+ o_status.val = oper_status
+ else:
+ o_status.val = -1
+ c_status = IntType()
+ if connect_status or connect_status == ConnectStatus.UNKNOWN:
+ c_status.val = connect_status
+ else:
+ c_status.val = -1
+
+ to_topic = createSubTopic(self.core_topic, parent_device_id)
+ reply_topic = createSubTopic(self.listening_topic, parent_device_id)
+ res = yield self.invoke(rpc="child_devices_state_update",
+ to_topic=to_topic,
+ reply_topic=reply_topic,
+ parent_device_id=id,
+ oper_status=o_status,
+ connect_status=c_status)
+ returnValue(res)
+
+ def child_devices_removed(parent_device_id):
+ raise NotImplementedError()
+
+ @ContainerProxy.wrap_request(None)
+ @inlineCallbacks
+ def device_pm_config_update(self, device_pm_config, init=False):
+ log.debug("device_pm_config_update")
+ b = BoolType()
+ b.val = init
+ to_topic = createSubTopic(self.core_topic, device_pm_config.id)
+ reply_topic = createSubTopic(self.listening_topic, device_pm_config.id)
+ res = yield self.invoke(rpc="DevicePMConfigUpdate",
+ to_topic=to_topic,
+ reply_topic=reply_topic,
+ device_pm_config=device_pm_config,
+ init=b)
+ returnValue(res)
+
+ @ContainerProxy.wrap_request(None)
+ @inlineCallbacks
+ def port_created(self, device_id, port):
+ log.debug("port_created")
+ proto_id = ID()
+ proto_id.id = device_id
+ to_topic = createSubTopic(self.core_topic, device_id)
+ reply_topic = createSubTopic(self.listening_topic, device_id)
+ res = yield self.invoke(rpc="PortCreated",
+ to_topic=to_topic,
+ reply_topic=reply_topic,
+ device_id=proto_id,
+ port=port)
+ returnValue(res)
+
+ def port_removed(device_id, port):
+ raise NotImplementedError()
+
+ def ports_enabled(device_id):
+ raise NotImplementedError()
+
+ def ports_disabled(device_id):
+ raise NotImplementedError()
+
+ def ports_oper_status_update(device_id, oper_status):
+ raise NotImplementedError()
+
+ def image_download_update(img_dnld):
+ raise NotImplementedError()
+
+ def image_download_deleted(img_dnld):
+ raise NotImplementedError()
+
+ @ContainerProxy.wrap_request(None)
+ @inlineCallbacks
+ def send_packet_in(self, device_id, port, packet):
+ log.debug("send_packet_in", device_id=device_id)
+ proto_id = ID()
+ proto_id.id = device_id
+ p = IntType()
+ p.val = port
+ pac = Packet()
+ pac.payload = packet
+ to_topic = createSubTopic(self.core_topic, device_id)
+ reply_topic = createSubTopic(self.listening_topic, device_id)
+ res = yield self.invoke(rpc="PacketIn",
+ to_topic=to_topic,
+ reply_topic=reply_topic,
+ device_id=proto_id,
+ port=p,
+ packet=pac)
+ returnValue(res)
diff --git a/python/kafka/event_bus_publisher.py b/python/kafka/event_bus_publisher.py
new file mode 100644
index 0000000..8020bf5
--- /dev/null
+++ b/python/kafka/event_bus_publisher.py
@@ -0,0 +1,90 @@
+#!/usr/bin/env python
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A gateway between the internal event bus and the Kafka publisher proxy
+to publish select topics and messages posted to the Voltha-internal event
+bus toward the external world.
+"""
+import structlog
+from google.protobuf.json_format import MessageToDict
+from google.protobuf.message import Message
+from simplejson import dumps
+
+from common.event_bus import EventBusClient
+
+log = structlog.get_logger()
+
+
+class EventBusPublisher(object):
+
+ def __init__(self, kafka_proxy, config):
+ self.kafka_proxy = kafka_proxy
+ self.config = config
+ self.topic_mappings = config.get('topic_mappings', {})
+ self.event_bus = EventBusClient()
+ self.subscriptions = None
+
+ def start(self):
+ log.debug('starting')
+ self.subscriptions = list()
+ self._setup_subscriptions(self.topic_mappings)
+ log.info('started')
+ return self
+
+ def stop(self):
+ try:
+ log.debug('stopping-event-bus')
+ if self.subscriptions:
+ for subscription in self.subscriptions:
+ self.event_bus.unsubscribe(subscription)
+ log.info('stopped-event-bus')
+ except Exception, e:
+ log.exception('failed-stopping-event-bus', e=e)
+ return
+
+ def _setup_subscriptions(self, mappings):
+
+ for event_bus_topic, mapping in mappings.iteritems():
+
+ kafka_topic = mapping.get('kafka_topic', None)
+
+ if kafka_topic is None:
+ log.error('no-kafka-topic-in-config',
+ event_bus_topic=event_bus_topic,
+ mapping=mapping)
+ continue
+
+ self.subscriptions.append(self.event_bus.subscribe(
+ event_bus_topic,
+ # to avoid Python late-binding to the last registered
+ # kafka_topic, we force instant binding with the default arg
+ lambda _, m, k=kafka_topic: self.forward(k, m)))
+
+ log.info('event-to-kafka', kafka_topic=kafka_topic,
+ event_bus_topic=event_bus_topic)
+
+ def forward(self, kafka_topic, msg):
+ try:
+ # convert to JSON string if msg is a protobuf msg
+ if isinstance(msg, Message):
+ msg = dumps(MessageToDict(msg, True, True))
+ log.debug('forward-event-bus-publisher')
+ self.kafka_proxy.send_message(kafka_topic, msg)
+ except Exception, e:
+ log.exception('failed-forward-event-bus-publisher', e=e)
+
diff --git a/python/kafka/kafka_inter_container_library.py b/python/kafka/kafka_inter_container_library.py
new file mode 100644
index 0000000..cf51684
--- /dev/null
+++ b/python/kafka/kafka_inter_container_library.py
@@ -0,0 +1,570 @@
+#!/usr/bin/env python
+
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+from uuid import uuid4
+
+import structlog
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, returnValue, Deferred, \
+ DeferredQueue, gatherResults
+from zope.interface import implementer
+
+from common.utils import asleep
+from voltha.core.registry import IComponent
+from kafka_proxy import KafkaProxy, get_kafka_proxy
+from voltha.protos.inter_container_pb2 import MessageType, Argument, \
+ InterContainerRequestBody, InterContainerMessage, Header, \
+ InterContainerResponseBody
+
+log = structlog.get_logger()
+
+KAFKA_OFFSET_LATEST = 'latest'
+KAFKA_OFFSET_EARLIEST = 'earliest'
+
+
+class KafkaMessagingError(BaseException):
+ def __init__(self, error):
+ self.error = error
+
+
+@implementer(IComponent)
+class IKafkaMessagingProxy(object):
+ _kafka_messaging_instance = None
+
+ def __init__(self,
+ kafka_host_port,
+ kv_store,
+ default_topic,
+ group_id_prefix,
+ target_cls):
+ """
+ Initialize the kafka proxy. This is a singleton (may change to
+ non-singleton if performance is better)
+ :param kafka_host_port: Kafka host and port
+ :param kv_store: Key-Value store
+ :param default_topic: Default topic to subscribe to
+ :param target_cls: target class - method of that class is invoked
+ when a message is received on the default_topic
+ """
+ # return an exception if the object already exist
+ if IKafkaMessagingProxy._kafka_messaging_instance:
+ raise Exception(
+ 'Singleton-exist', cls=IKafkaMessagingProxy)
+
+ log.debug("Initializing-KafkaProxy")
+ self.kafka_host_port = kafka_host_port
+ self.kv_store = kv_store
+ self.default_topic = default_topic
+ self.default_group_id = "_".join((group_id_prefix, default_topic))
+ self.target_cls = target_cls
+ self.topic_target_cls_map = {}
+ self.topic_callback_map = {}
+ self.subscribers = {}
+ self.kafka_proxy = None
+ self.transaction_id_deferred_map = {}
+ self.received_msg_queue = DeferredQueue()
+ self.stopped = False
+
+ self.init_time = 0
+ self.init_received_time = 0
+
+ self.init_resp_time = 0
+ self.init_received_resp_time = 0
+
+ self.num_messages = 0
+ self.total_time = 0
+ self.num_responses = 0
+ self.total_time_responses = 0
+ log.debug("KafkaProxy-initialized")
+
+ def start(self):
+ try:
+ log.debug("KafkaProxy-starting")
+
+ # Get the kafka proxy instance. If it does not exist then
+ # create it
+ self.kafka_proxy = get_kafka_proxy()
+ if self.kafka_proxy == None:
+ KafkaProxy(kafka_endpoint=self.kafka_host_port).start()
+ self.kafka_proxy = get_kafka_proxy()
+
+ # Subscribe the default topic and target_cls
+ self.topic_target_cls_map[self.default_topic] = self.target_cls
+
+ # Start the queue to handle incoming messages
+ reactor.callLater(0, self._received_message_processing_loop)
+
+ # Subscribe using the default topic and default group id. Whenever
+ # a message is received on that topic then teh target_cls will be
+ # invoked.
+ reactor.callLater(0, self.subscribe,
+ topic=self.default_topic,
+ target_cls=self.target_cls,
+ group_id=self.default_group_id)
+
+ # Setup the singleton instance
+ IKafkaMessagingProxy._kafka_messaging_instance = self
+ log.debug("KafkaProxy-started")
+ except Exception as e:
+ log.exception("Failed-to-start-proxy", e=e)
+
+ def stop(self):
+ """
+ Invoked to stop the kafka proxy
+ :return: None on success, Exception on failure
+ """
+ log.debug("Stopping-messaging-proxy ...")
+ try:
+ # Stop the kafka proxy. This will stop all the consumers
+ # and producers
+ self.stopped = True
+ self.kafka_proxy.stop()
+ log.debug("Messaging-proxy-stopped.")
+ except Exception as e:
+ log.exception("Exception-when-stopping-messaging-proxy:", e=e)
+
+ def get_target_cls(self):
+ return self.target_cls
+
+ def get_default_topic(self):
+ return self.default_topic
+
+ @inlineCallbacks
+ def _subscribe_group_consumer(self, group_id, topic, offset, callback=None,
+ target_cls=None):
+ try:
+ log.debug("subscribing-to-topic-start", topic=topic)
+ yield self.kafka_proxy.subscribe(topic,
+ self._enqueue_received_group_message,
+ group_id, offset)
+
+ if target_cls is not None and callback is None:
+ # Scenario #1
+ if topic not in self.topic_target_cls_map:
+ self.topic_target_cls_map[topic] = target_cls
+ elif target_cls is None and callback is not None:
+ # Scenario #2
+ log.debug("custom-callback", topic=topic,
+ callback_map=self.topic_callback_map)
+ if topic not in self.topic_callback_map:
+ self.topic_callback_map[topic] = [callback]
+ else:
+ self.topic_callback_map[topic].extend([callback])
+ else:
+ log.warn("invalid-parameters")
+
+ returnValue(True)
+ except Exception as e:
+ log.exception("Exception-during-subscription", e=e)
+ returnValue(False)
+
+ @inlineCallbacks
+ def subscribe(self, topic, callback=None, target_cls=None,
+ max_retry=3, group_id=None, offset=KAFKA_OFFSET_LATEST):
+ """
+ Scenario 1: invoked to subscribe to a specific topic with a
+ target_cls to invoke when a message is received on that topic. This
+ handles the case of request/response where this library performs the
+ heavy lifting. In this case the m_callback must to be None
+
+ Scenario 2: invoked to subscribe to a specific topic with a
+ specific callback to invoke when a message is received on that topic.
+ This handles the case where the caller wants to process the message
+ received itself. In this case the target_cls must to be None
+
+ :param topic: topic to subscribe to
+ :param callback: Callback to invoke when a message is received on
+ the topic. Either one of callback or target_cls needs can be none
+ :param target_cls: Target class to use when a message is
+ received on the topic. There can only be 1 target_cls per topic.
+ Either one of callback or target_cls needs can be none
+ :param max_retry: the number of retries before reporting failure
+ to subscribe. This caters for scenario where the kafka topic is not
+ ready.
+ :param group_id: The ID of the group the consumer is subscribing to
+ :param offset: The topic offset on the kafka bus from where message consumption will start
+ :return: True on success, False on failure
+ """
+ RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
+
+ def _backoff(msg, retries):
+ wait_time = RETRY_BACKOFF[min(retries,
+ len(RETRY_BACKOFF) - 1)]
+ log.info(msg, retry_in=wait_time)
+ return asleep.asleep(wait_time)
+
+ log.debug("subscribing", topic=topic, group_id=group_id,
+ callback=callback, target=target_cls)
+
+ retry = 0
+ subscribed = False
+ if group_id is None:
+ group_id = self.default_group_id
+ while not subscribed:
+ subscribed = yield self._subscribe_group_consumer(group_id, topic,
+ callback=callback,
+ target_cls=target_cls,
+ offset=offset)
+ if subscribed:
+ returnValue(True)
+ elif retry > max_retry:
+ returnValue(False)
+ else:
+ _backoff("subscription-not-complete", retry)
+ retry += 1
+
+ def unsubscribe(self, topic, callback=None, target_cls=None):
+ """
+ Invoked when unsubscribing to a topic
+ :param topic: topic to unsubscribe from
+ :param callback: the callback used when subscribing to the topic, if any
+ :param target_cls: the targert class used when subscribing to the topic, if any
+ :return: None on success or Exception on failure
+ """
+ log.debug("Unsubscribing-to-topic", topic=topic)
+
+ try:
+ self.kafka_proxy.unsubscribe(topic,
+ self._enqueue_received_group_message)
+
+ if callback is None and target_cls is None:
+ log.error("both-call-and-target-cls-cannot-be-none",
+ topic=topic)
+ raise KafkaMessagingError(
+ error="both-call-and-target-cls-cannot-be-none")
+
+ if target_cls is not None and topic in self.topic_target_cls_map:
+ del self.topic_target_cls_map[topic]
+
+ if callback is not None and topic in self.topic_callback_map:
+ index = 0
+ for cb in self.topic_callback_map[topic]:
+ if cb == callback:
+ break
+ index += 1
+ if index < len(self.topic_callback_map[topic]):
+ self.topic_callback_map[topic].pop(index)
+
+ if len(self.topic_callback_map[topic]) == 0:
+ del self.topic_callback_map[topic]
+ except Exception as e:
+ log.exception("Exception-when-unsubscribing-to-topic", topic=topic,
+ e=e)
+ return e
+
+ @inlineCallbacks
+ def _enqueue_received_group_message(self, msg):
+ """
+ Internal method to continuously queue all received messaged
+ irrespective of topic
+ :param msg: Received message
+ :return: None on success, Exception on failure
+ """
+ try:
+ log.debug("received-msg", msg=msg)
+ yield self.received_msg_queue.put(msg)
+ except Exception as e:
+ log.exception("Failed-enqueueing-received-message", e=e)
+
+ @inlineCallbacks
+ def _received_message_processing_loop(self):
+ """
+ Internal method to continuously process all received messages one
+ at a time
+ :return: None on success, Exception on failure
+ """
+ while True:
+ try:
+ message = yield self.received_msg_queue.get()
+ yield self._process_message(message)
+ if self.stopped:
+ break
+ except Exception as e:
+ log.exception("Failed-dequeueing-received-message", e=e)
+
+ def _to_string(self, unicode_str):
+ if unicode_str is not None:
+ if type(unicode_str) == unicode:
+ return unicode_str.encode('ascii', 'ignore')
+ else:
+ return unicode_str
+ else:
+ return None
+
+ def _format_request(self,
+ rpc,
+ to_topic,
+ reply_topic,
+ **kwargs):
+ """
+ Format a request to send over kafka
+ :param rpc: Requested remote API
+ :param to_topic: Topic to send the request
+ :param reply_topic: Topic to receive the resulting response, if any
+ :param kwargs: Dictionary of key-value pairs to pass as arguments to
+ the remote rpc API.
+ :return: A InterContainerMessage message type on success or None on
+ failure
+ """
+ try:
+ transaction_id = uuid4().hex
+ request = InterContainerMessage()
+ request_body = InterContainerRequestBody()
+ request.header.id = transaction_id
+ request.header.type = MessageType.Value("REQUEST")
+ request.header.from_topic = reply_topic
+ request.header.to_topic = to_topic
+
+ response_required = False
+ if reply_topic:
+ request_body.reply_to_topic = reply_topic
+ request_body.response_required = True
+ response_required = True
+
+ request.header.timestamp = int(round(time.time() * 1000))
+ request_body.rpc = rpc
+ for a, b in kwargs.iteritems():
+ arg = Argument()
+ arg.key = a
+ try:
+ arg.value.Pack(b)
+ request_body.args.extend([arg])
+ except Exception as e:
+ log.exception("Failed-parsing-value", e=e)
+ request.body.Pack(request_body)
+ return request, transaction_id, response_required
+ except Exception as e:
+ log.exception("formatting-request-failed",
+ rpc=rpc,
+ to_topic=to_topic,
+ reply_topic=reply_topic,
+ args=kwargs)
+ return None, None, None
+
+ def _format_response(self, msg_header, msg_body, status):
+ """
+ Format a response
+ :param msg_header: The header portion of a received request
+ :param msg_body: The response body
+ :param status: True is this represents a successful response
+ :return: a InterContainerMessage message type
+ """
+ try:
+ assert isinstance(msg_header, Header)
+ response = InterContainerMessage()
+ response_body = InterContainerResponseBody()
+ response.header.id = msg_header.id
+ response.header.timestamp = int(
+ round(time.time() * 1000))
+ response.header.type = MessageType.Value("RESPONSE")
+ response.header.from_topic = msg_header.to_topic
+ response.header.to_topic = msg_header.from_topic
+ if msg_body is not None:
+ response_body.result.Pack(msg_body)
+ response_body.success = status
+ response.body.Pack(response_body)
+ return response
+ except Exception as e:
+ log.exception("formatting-response-failed", header=msg_header,
+ body=msg_body, status=status, e=e)
+ return None
+
+ def _parse_response(self, msg):
+ try:
+ message = InterContainerMessage()
+ message.ParseFromString(msg)
+ resp = InterContainerResponseBody()
+ if message.body.Is(InterContainerResponseBody.DESCRIPTOR):
+ message.body.Unpack(resp)
+ else:
+ log.debug("unsupported-msg", msg_type=type(message.body))
+ return None
+ log.debug("parsed-response", input=message, output=resp)
+ return resp
+ except Exception as e:
+ log.exception("parsing-response-failed", msg=msg, e=e)
+ return None
+
+ @inlineCallbacks
+ def _process_message(self, m):
+ """
+ Default internal method invoked for every batch of messages received
+ from Kafka.
+ """
+
+ def _toDict(args):
+ """
+ Convert a repeatable Argument type into a python dictionary
+ :param args: Repeatable core_adapter.Argument type
+ :return: a python dictionary
+ """
+ if args is None:
+ return None
+ result = {}
+ for arg in args:
+ assert isinstance(arg, Argument)
+ result[arg.key] = arg.value
+ return result
+
+ current_time = int(round(time.time() * 1000))
+ # log.debug("Got Message", message=m)
+ try:
+ val = m.value()
+ # val = m.message.value
+ # print m.topic
+
+ # Go over customized callbacks first
+ m_topic = m.topic()
+ if m_topic in self.topic_callback_map:
+ for c in self.topic_callback_map[m_topic]:
+ yield c(val)
+
+ # Check whether we need to process request/response scenario
+ if m_topic not in self.topic_target_cls_map:
+ return
+
+ # Process request/response scenario
+ message = InterContainerMessage()
+ message.ParseFromString(val)
+
+ if message.header.type == MessageType.Value("REQUEST"):
+ # Get the target class for that specific topic
+ targetted_topic = self._to_string(message.header.to_topic)
+ msg_body = InterContainerRequestBody()
+ if message.body.Is(InterContainerRequestBody.DESCRIPTOR):
+ message.body.Unpack(msg_body)
+ else:
+ log.debug("unsupported-msg", msg_type=type(message.body))
+ return
+ if targetted_topic in self.topic_target_cls_map:
+ if msg_body.args:
+ log.debug("message-body-args-present", body=msg_body)
+ (status, res) = yield getattr(
+ self.topic_target_cls_map[targetted_topic],
+ self._to_string(msg_body.rpc))(
+ **_toDict(msg_body.args))
+ else:
+ log.debug("message-body-args-absent", body=msg_body,
+ rpc=msg_body.rpc)
+ (status, res) = yield getattr(
+ self.topic_target_cls_map[targetted_topic],
+ self._to_string(msg_body.rpc))()
+ if msg_body.response_required:
+ response = self._format_response(
+ msg_header=message.header,
+ msg_body=res,
+ status=status,
+ )
+ if response is not None:
+ res_topic = self._to_string(
+ response.header.to_topic)
+ self._send_kafka_message(res_topic, response)
+
+ log.debug("Response-sent", response=response.body,
+ to_topic=res_topic)
+ elif message.header.type == MessageType.Value("RESPONSE"):
+ trns_id = self._to_string(message.header.id)
+ if trns_id in self.transaction_id_deferred_map:
+ resp = self._parse_response(val)
+
+ self.transaction_id_deferred_map[trns_id].callback(resp)
+ else:
+ log.error("!!INVALID-TRANSACTION-TYPE!!")
+
+ except Exception as e:
+ log.exception("Failed-to-process-message", message=m, e=e)
+
+ @inlineCallbacks
+ def _send_kafka_message(self, topic, msg):
+ try:
+ yield self.kafka_proxy.send_message(topic, msg.SerializeToString())
+ except Exception, e:
+ log.exception("Failed-sending-message", message=msg, e=e)
+
+ @inlineCallbacks
+ def send_request(self,
+ rpc,
+ to_topic,
+ reply_topic=None,
+ callback=None,
+ **kwargs):
+ """
+ Invoked to send a message to a remote container and receive a
+ response if required.
+ :param rpc: The remote API to invoke
+ :param to_topic: Send the message to this kafka topic
+ :param reply_topic: If not None then a response is expected on this
+ topic. If set to None then no response is required.
+ :param callback: Callback to invoke when a response is received.
+ :param kwargs: Key-value pairs representing arguments to pass to the
+ rpc remote API.
+ :return: Either no response is required, or a response is returned
+ via the callback or the response is a tuple of (status, return_cls)
+ """
+ try:
+ # Ensure all strings are not unicode encoded
+ rpc = self._to_string(rpc)
+ to_topic = self._to_string(to_topic)
+ reply_topic = self._to_string(reply_topic)
+
+ request, transaction_id, response_required = \
+ self._format_request(
+ rpc=rpc,
+ to_topic=to_topic,
+ reply_topic=reply_topic,
+ **kwargs)
+
+ if request is None:
+ return
+
+ # Add the transaction to the transaction map before sending the
+ # request. This will guarantee the eventual response will be
+ # processed.
+ wait_for_result = None
+ if response_required:
+ wait_for_result = Deferred()
+ self.transaction_id_deferred_map[
+ self._to_string(request.header.id)] = wait_for_result
+
+ yield self._send_kafka_message(to_topic, request)
+ log.debug("message-sent", to_topic=to_topic,
+ from_topic=reply_topic)
+
+ if response_required:
+ res = yield wait_for_result
+
+ if res is None or not res.success:
+ raise KafkaMessagingError(error="Failed-response:{"
+ "}".format(res))
+
+ # Remove the transaction from the transaction map
+ del self.transaction_id_deferred_map[transaction_id]
+
+ log.debug("send-message-response", rpc=rpc, result=res)
+
+ if callback:
+ callback((res.success, res.result))
+ else:
+ returnValue((res.success, res.result))
+ except Exception as e:
+ log.exception("Exception-sending-request", e=e)
+ raise KafkaMessagingError(error=e)
+
+
+# Common method to get the singleton instance of the kafka proxy class
+def get_messaging_proxy():
+ return IKafkaMessagingProxy._kafka_messaging_instance
diff --git a/python/kafka/kafka_proxy.py b/python/kafka/kafka_proxy.py
new file mode 100644
index 0000000..64da9a8
--- /dev/null
+++ b/python/kafka/kafka_proxy.py
@@ -0,0 +1,338 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+from confluent_kafka import Producer as _kafkaProducer
+from structlog import get_logger
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.threads import deferToThread
+from zope.interface import implementer
+
+from common.utils.consulhelpers import get_endpoint_from_consul
+from event_bus_publisher import EventBusPublisher
+from voltha.core.registry import IComponent
+from confluent_kafka import Consumer, KafkaError
+import threading
+
+log = get_logger()
+
+
+@implementer(IComponent)
+class KafkaProxy(object):
+ """
+ This is a singleton proxy kafka class to hide the kafka client details. This
+ proxy uses confluent-kafka-python as the kafka client. Since that client is
+ not a Twisted client then requests to that client are wrapped with
+ twisted.internet.threads.deferToThread to avoid any potential blocking of
+ the Twisted loop.
+ """
+ _kafka_instance = None
+
+ def __init__(self,
+ consul_endpoint='localhost:8500',
+ kafka_endpoint='localhost:9092',
+ ack_timeout=1000,
+ max_req_attempts=10,
+ consumer_poll_timeout=10,
+ config={}):
+
+ # return an exception if the object already exist
+ if KafkaProxy._kafka_instance:
+ raise Exception('Singleton exist for :{}'.format(KafkaProxy))
+
+ log.debug('initializing', endpoint=kafka_endpoint)
+ self.ack_timeout = ack_timeout
+ self.max_req_attempts = max_req_attempts
+ self.consul_endpoint = consul_endpoint
+ self.kafka_endpoint = kafka_endpoint
+ self.config = config
+ self.kclient = None
+ self.kproducer = None
+ self.event_bus_publisher = None
+ self.stopping = False
+ self.faulty = False
+ self.consumer_poll_timeout = consumer_poll_timeout
+ self.topic_consumer_map = {}
+ self.topic_callbacks_map = {}
+ self.topic_any_map_lock = threading.Lock()
+ log.debug('initialized', endpoint=kafka_endpoint)
+
+ @inlineCallbacks
+ def start(self):
+ log.debug('starting')
+ self._get_kafka_producer()
+ KafkaProxy._kafka_instance = self
+ self.event_bus_publisher = yield EventBusPublisher(
+ self, self.config.get('event_bus_publisher', {})).start()
+ log.info('started')
+ KafkaProxy.faulty = False
+ self.stopping = False
+ returnValue(self)
+
+ @inlineCallbacks
+ def stop(self):
+ try:
+ log.debug('stopping-kafka-proxy')
+ self.stopping = True
+ try:
+ if self.kclient:
+ yield self.kclient.close()
+ self.kclient = None
+ log.debug('stopped-kclient-kafka-proxy')
+ except Exception, e:
+ log.exception('failed-stopped-kclient-kafka-proxy', e=e)
+
+ try:
+ if self.kproducer:
+ yield self.kproducer.flush()
+ self.kproducer = None
+ log.debug('stopped-kproducer-kafka-proxy')
+ except Exception, e:
+ log.exception('failed-stopped-kproducer-kafka-proxy', e=e)
+
+ # Stop all consumers
+ try:
+ self.topic_any_map_lock.acquire()
+ log.debug('stopping-consumers-kafka-proxy')
+ for _, c in self.topic_consumer_map.iteritems():
+ yield deferToThread(c.close)
+ self.topic_consumer_map.clear()
+ self.topic_callbacks_map.clear()
+ log.debug('stopped-consumers-kafka-proxy')
+ except Exception, e:
+ log.exception('failed-stopped-consumers-kafka-proxy', e=e)
+ finally:
+ self.topic_any_map_lock.release()
+ log.debug('stopping-consumers-kafka-proxy-released-lock')
+
+ # try:
+ # if self.event_bus_publisher:
+ # yield self.event_bus_publisher.stop()
+ # self.event_bus_publisher = None
+ # log.debug('stopped-event-bus-publisher-kafka-proxy')
+ # except Exception, e:
+ # log.debug('failed-stopped-event-bus-publisher-kafka-proxy')
+ # pass
+
+ log.debug('stopped-kafka-proxy')
+
+ except Exception, e:
+ self.kclient = None
+ self.kproducer = None
+ # self.event_bus_publisher = None
+ log.exception('failed-stopped-kafka-proxy', e=e)
+ pass
+
+ def _get_kafka_producer(self):
+
+ try:
+
+ if self.kafka_endpoint.startswith('@'):
+ try:
+ _k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
+ self.kafka_endpoint[
+ 1:])
+ log.debug('found-kafka-service', endpoint=_k_endpoint)
+
+ except Exception as e:
+ log.exception('no-kafka-service-in-consul', e=e)
+
+ self.kproducer = None
+ self.kclient = None
+ return
+ else:
+ _k_endpoint = self.kafka_endpoint
+ self.kproducer = _kafkaProducer(
+ {'bootstrap.servers': _k_endpoint,
+ }
+ )
+ pass
+ except Exception, e:
+ log.exception('failed-get-kafka-producer', e=e)
+ return
+
+ @inlineCallbacks
+ def _wait_for_messages(self, consumer, topic):
+ while True:
+ try:
+ msg = yield deferToThread(consumer.poll,
+ self.consumer_poll_timeout)
+
+ if self.stopping:
+ log.debug("stop-request-recieved", topic=topic)
+ break
+
+ if msg is None:
+ continue
+ if msg.error():
+ # This typically is received when there are no more messages
+ # to read from kafka. Ignore.
+ continue
+
+ # Invoke callbacks
+ for cb in self.topic_callbacks_map[topic]:
+ yield cb(msg)
+ except Exception as e:
+ log.debug("exception-receiving-msg", topic=topic, e=e)
+
+ @inlineCallbacks
+ def subscribe(self, topic, callback, groupId, offset='latest'):
+ """
+ subscribe allows a caller to subscribe to a given kafka topic. This API
+ always create a group consumer.
+ :param topic - the topic to subscribe to
+ :param callback - the callback to invoke whenever a message is received
+ on that topic
+ :param groupId - the groupId for this consumer. In the current
+ implementation there is a one-to-one mapping between a topic and a
+ groupId. In other words, once a groupId is used for a given topic then
+ we won't be able to create another groupId for the same topic.
+ :param offset: the kafka offset from where the consumer will start
+ consuming messages
+ """
+ try:
+ self.topic_any_map_lock.acquire()
+ if topic in self.topic_consumer_map:
+ # Just add the callback
+ if topic in self.topic_callbacks_map:
+ self.topic_callbacks_map[topic].append(callback)
+ else:
+ self.topic_callbacks_map[topic] = [callback]
+ return
+
+ # Create consumer for that topic
+ c = Consumer({
+ 'bootstrap.servers': self.kafka_endpoint,
+ 'group.id': groupId,
+ 'auto.offset.reset': offset
+ })
+ yield deferToThread(c.subscribe, [topic])
+ # c.subscribe([topic])
+ self.topic_consumer_map[topic] = c
+ self.topic_callbacks_map[topic] = [callback]
+ # Start the consumer
+ reactor.callLater(0, self._wait_for_messages, c, topic)
+ except Exception, e:
+ log.exception("topic-subscription-error", e=e)
+ finally:
+ self.topic_any_map_lock.release()
+
+ @inlineCallbacks
+ def unsubscribe(self, topic, callback):
+ """
+ Unsubscribe to a given topic. Since there they be multiple callers
+ consuming from the same topic then to ensure only the relevant caller
+ gets unsubscribe then the callback is used as a differentiator. The
+ kafka consumer will be closed when there are no callbacks required.
+ :param topic: topic to unsubscribe
+ :param callback: callback the caller used when subscribing to the topic.
+ If multiple callers have subscribed to a topic using the same callback
+ then the first callback on the list will be removed.
+ :return:None
+ """
+ try:
+ self.topic_any_map_lock.acquire()
+ log.debug("unsubscribing-to-topic", topic=topic)
+ if topic in self.topic_callbacks_map:
+ index = 0
+ for cb in self.topic_callbacks_map[topic]:
+ if cb == callback:
+ break
+ index += 1
+ if index < len(self.topic_callbacks_map[topic]):
+ self.topic_callbacks_map[topic].pop(index)
+
+ if len(self.topic_callbacks_map[topic]) == 0:
+ # Stop the consumer
+ if topic in self.topic_consumer_map:
+ yield deferToThread(
+ self.topic_consumer_map[topic].close)
+ del self.topic_consumer_map[topic]
+ del self.topic_callbacks_map[topic]
+ log.debug("unsubscribed-to-topic", topic=topic)
+ else:
+ log.debug("consumers-for-topic-still-exist", topic=topic,
+ num=len(self.topic_callbacks_map[topic]))
+ except Exception, e:
+ log.exception("topic-unsubscription-error", e=e)
+ finally:
+ self.topic_any_map_lock.release()
+ log.debug("unsubscribing-to-topic-release-lock", topic=topic)
+
+ @inlineCallbacks
+ def send_message(self, topic, msg, key=None):
+ assert topic is not None
+ assert msg is not None
+
+ # first check whether we have a kafka producer. If there is none
+ # then try to get one - this happens only when we try to lookup the
+ # kafka service from consul
+ try:
+ if self.faulty is False:
+
+ if self.kproducer is None:
+ self._get_kafka_producer()
+ # Lets the next message request do the retry if still a failure
+ if self.kproducer is None:
+ log.error('no-kafka-producer',
+ endpoint=self.kafka_endpoint)
+ return
+
+ log.debug('sending-kafka-msg', topic=topic, kafka_msg=msg)
+ msgs = [msg]
+
+ if self.kproducer is not None and self.event_bus_publisher and self.faulty is False:
+ d = deferToThread(self.kproducer.produce, topic, msg, key)
+ yield d
+ log.debug('sent-kafka-msg', topic=topic, kafka_msg=msg)
+ # send a lightweight poll to avoid an exception after 100k messages.
+ d1 = deferToThread(self.kproducer.poll, 0)
+ yield d1
+ else:
+ return
+
+ except Exception, e:
+ self.faulty = True
+ log.error('failed-to-send-kafka-msg', topic=topic, kafka_msg=msg,
+ e=e)
+
+ # set the kafka producer to None. This is needed if the
+ # kafka docker went down and comes back up with a different
+ # port number.
+ if self.stopping is False:
+ log.debug('stopping-kafka-proxy')
+ try:
+ self.stopping = True
+ self.stop()
+ self.stopping = False
+ self.faulty = False
+ log.debug('stopped-kafka-proxy')
+ except Exception, e:
+ log.exception('failed-stopping-kafka-proxy', e=e)
+ pass
+ else:
+ log.info('already-stopping-kafka-proxy')
+
+ return
+
+ def is_faulty(self):
+ return self.faulty
+
+
+# Common method to get the singleton instance of the kafka proxy class
+def get_kafka_proxy():
+ return KafkaProxy._kafka_instance