Publish indications on kafka
Change-Id: I4214e1de042109ff3ecf6871fe2d2d14a3476633
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index 154f614..d750416 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -17,19 +17,14 @@
import grpc
import structlog
import time
-import threading
-from twisted.internet import reactor
from scapy.layers.l2 import Ether, Dot1Q
from transitions import Machine
-from google.protobuf.message import Message
-from simplejson import loads
from voltha.protos.device_pb2 import Port
from voltha.adapters.openolt.protos import openolt_pb2
from voltha.adapters.openolt.openolt_utils import OpenoltUtils
-from voltha.extensions.alarms.onu.onu_discovery_alarm import OnuDiscoveryAlarm
from voltha.adapters.openolt.openolt_grpc import OpenoltGrpc
-from voltha.adapters.openolt.openolt_kafka_consumer import KConsumer
+from voltha.adapters.openolt.openolt_indications import OpenoltIndications
class OpenoltDevice(object):
@@ -113,14 +108,13 @@
def do_state_init(self, event):
self.log.debug('init')
- self.indications_thread_handle = threading.Thread(
- target=self.indications_thread)
- self.indications_thread_handle.setDaemon(True)
- self.indications_thread_handle.start()
+ self._indications = OpenoltIndications(self)
+ self._indications.start()
def post_init(self, event):
self.log.debug('post_init')
# Initialize gRPC
+ time.sleep(10)
self._grpc = OpenoltGrpc(self.host_and_port, self)
self.log.info('openolt-device-created')
@@ -162,21 +156,6 @@
self.log.debug('post_down')
self.flow_mgr.reset_flows()
- def indications_thread(self):
- self.log.debug('openolt indications thread starting')
- self.kafka_consumer = KConsumer(
- "openolt.ind.alarm",
- "openolt.ind.pkt",
- "openolt.ind.olt")
- self.log.debug('openolt indications thread processing')
- # block reading kafka
- self.kafka_consumer.read(self.indications_process)
- self.log.debug('openolt indications thread stopped')
-
- def indications_process(self, msg):
- ind = loads(msg)
- self.log.debug("openolt indication", ind=ind)
-
def olt_indication(self, olt_indication):
if olt_indication.oper_state == "up":
self.go_state_up()
@@ -209,15 +188,6 @@
self.log.debug("onu discovery indication", intf_id=intf_id,
serial_number=serial_number_str)
- # Post ONU Discover alarm 20180809_0805
- try:
- OnuDiscoveryAlarm(self.alarm_mgr.alarms, pon_id=intf_id,
- serial_number=serial_number_str).raise_alarm()
- except Exception as disc_alarm_error:
- self.log.exception("onu-discovery-alarm-error",
- errmsg=disc_alarm_error.message)
- # continue for now.
-
try:
onu_id = self.data_model.onu_id(serial_number=serial_number_str)
except ValueError:
diff --git a/voltha/adapters/openolt/openolt_grpc.py b/voltha/adapters/openolt/openolt_grpc.py
index 93addd7..10055b7 100644
--- a/voltha/adapters/openolt/openolt_grpc.py
+++ b/voltha/adapters/openolt/openolt_grpc.py
@@ -17,10 +17,10 @@
import structlog
import grpc
import threading
+import time
from simplejson import dumps
from twisted.internet import reactor
from google.protobuf.json_format import MessageToJson
-from google.protobuf.message import Message
from voltha.northbound.kafka.kafka_proxy import get_kafka_proxy
from voltha.adapters.openolt.protos import openolt_pb2_grpc, openolt_pb2
@@ -61,8 +61,9 @@
self.log.debug('kafka-proxy-available')
kafka_proxy.send_message(
topic,
- dumps(MessageToJson(msg,
- preserving_proto_field_name=True)))
+ dumps(MessageToJson(
+ msg,
+ including_default_value_fields=True)))
else:
self.log.error('kafka-proxy-unavailable')
except Exception, e:
@@ -128,27 +129,20 @@
# indication handlers run in the main event loop
if ind.HasField('olt_ind'):
forward_indication("openolt.ind.olt", ind.olt_ind)
- reactor.callFromThread(
- self.device.olt_indication, ind.olt_ind)
elif ind.HasField('intf_ind'):
- reactor.callFromThread(
- self.device.intf_indication, ind.intf_ind)
+ forward_indication("openolt.ind.intf", ind.intf_ind)
elif ind.HasField('intf_oper_ind'):
- reactor.callFromThread(
- self.device.intf_oper_indication, ind.intf_oper_ind)
+ forward_indication("openolt.ind.intfoper",
+ ind.intf_oper_ind)
elif ind.HasField('onu_disc_ind'):
- reactor.callFromThread(
- self.device.onu_discovery_indication, ind.onu_disc_ind)
+ forward_indication("openolt.ind.onudisc", ind.onu_disc_ind)
elif ind.HasField('onu_ind'):
- reactor.callFromThread(
- self.device.onu_indication, ind.onu_ind)
+ forward_indication("openolt.ind.onu", ind.onu_ind)
elif ind.HasField('omci_ind'):
reactor.callFromThread(
self.device.omci_indication, ind.omci_ind)
elif ind.HasField('pkt_ind'):
forward_indication("openolt.ind.pkt", ind.pkt_ind)
- reactor.callFromThread(
- self.device.packet_indication, ind.pkt_ind)
elif ind.HasField('port_stats'):
reactor.callFromThread(
self.device.stats_mgr.port_statistics_indication,
diff --git a/voltha/adapters/openolt/openolt_indications.py b/voltha/adapters/openolt/openolt_indications.py
new file mode 100644
index 0000000..8a51304
--- /dev/null
+++ b/voltha/adapters/openolt/openolt_indications.py
@@ -0,0 +1,78 @@
+#
+# Copyright 2019 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import threading
+from google.protobuf.json_format import Parse
+from simplejson import loads
+from twisted.internet import reactor
+import structlog
+
+from voltha.adapters.openolt.protos import openolt_pb2
+from voltha.adapters.openolt.openolt_kafka_consumer import KConsumer
+
+
+class OpenoltIndications(object):
+ def __init__(self, device):
+ self.log = structlog.get_logger()
+ self.device = device
+ self.indications_thread_handle = threading.Thread(
+ target=self.indications_thread)
+ self.indications_thread_handle.setDaemon(True)
+
+ def start(self):
+ self.indications_thread_handle.start()
+
+ def stop(self):
+ pass
+
+ def indications_thread(self):
+ self.log.debug('openolt indications thread starting')
+ KConsumer(self.indications_process,
+ "openolt.ind.olt",
+ "openolt.ind.intf",
+ 'openolt.ind.intfoper',
+ 'openolt.ind.onudisc',
+ 'openolt.ind.onu',
+ "openolt.ind.pkt")
+
+ def indications_process(self, topic, msg):
+ self.log.debug("received openolt indication", topic=topic, msg=msg)
+
+ if topic == "openolt.ind.olt":
+ pb = Parse(loads(msg), openolt_pb2.OltIndication(),
+ ignore_unknown_fields=True)
+ reactor.callFromThread(self.device.olt_indication, pb)
+ if topic == "openolt.ind.intf":
+ pb = Parse(loads(msg), openolt_pb2.IntfIndication(),
+ ignore_unknown_fields=True)
+ reactor.callFromThread(self.device.intf_indication, pb)
+ if topic == "openolt.ind.intfoper":
+ pb = Parse(loads(msg), openolt_pb2.IntfOperIndication(),
+ ignore_unknown_fields=True)
+ reactor.callFromThread(self.device.intf_oper_indication, pb)
+ if topic == "openolt.ind.onudisc":
+ pb = Parse(loads(msg), openolt_pb2.OnuDiscIndication(),
+ ignore_unknown_fields=True)
+ reactor.callFromThread(
+ self.device.onu_discovery_indication, pb)
+ if topic == "openolt.ind.onu":
+ pb = Parse(loads(msg), openolt_pb2.OnuIndication(),
+ ignore_unknown_fields=True)
+ reactor.callFromThread(self.device.onu_indication, pb)
+ elif topic == "openolt.ind.pkt":
+ pb = Parse(loads(msg), openolt_pb2.PacketIndication(),
+ ignore_unknown_fields=True)
+ reactor.callFromThread(self.device.packet_indication, pb)
diff --git a/voltha/adapters/openolt/openolt_kafka_consumer.py b/voltha/adapters/openolt/openolt_kafka_consumer.py
index adbe1d5..6d6e1f1 100644
--- a/voltha/adapters/openolt/openolt_kafka_consumer.py
+++ b/voltha/adapters/openolt/openolt_kafka_consumer.py
@@ -26,7 +26,7 @@
class KConsumer(object):
- def __init__(self, *topics):
+ def __init__(self, callback, *topics):
kafka_proxy = get_kafka_proxy()
if kafka_proxy and not kafka_proxy.is_faulty():
self.kafka_endpoint = kafka_proxy.kafka_endpoint
@@ -55,7 +55,6 @@
self.topics = list(topics)
self._c.subscribe(self.topics)
- def read(self, callback):
# Read messages from Kafka and hand it to to callback
try:
while True:
@@ -65,7 +64,7 @@
continue
elif not msg.error():
log.debug('got a kafka message', topic=msg.topic())
- callback(msg.value())
+ callback(msg.topic(), msg.value())
elif msg.error().code() == KafkaError._PARTITION_EOF:
pass
else: