Initial commit to move grpc into openolt_grpc
Change-Id: Ic2e02a927c6425eb6051a4dc9a90afee4c8ebfcc
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index 99e7612..49a6645 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -13,23 +13,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-import threading
import binascii
import grpc
import structlog
import time
+import threading
from twisted.internet import reactor
from scapy.layers.l2 import Ether, Dot1Q
from transitions import Machine
-from simplejson import dumps
from google.protobuf.message import Message
-from google.protobuf.json_format import MessageToDict
+from simplejson import loads
from voltha.protos.device_pb2 import Port
-from voltha.adapters.openolt.protos import openolt_pb2_grpc, openolt_pb2
+from voltha.adapters.openolt.protos import openolt_pb2
from voltha.adapters.openolt.openolt_utils import OpenoltUtils
from voltha.extensions.alarms.onu.onu_discovery_alarm import OnuDiscoveryAlarm
-from voltha.northbound.kafka.kafka_proxy import get_kafka_proxy
+from voltha.adapters.openolt.openolt_grpc import OpenoltGrpc
+from voltha.adapters.openolt.openolt_kafka_consumer import KConsumer
class OpenoltDevice(object):
@@ -108,33 +108,22 @@
self.device_info = None
+ self._grpc = None
self.go_state_init()
def do_state_init(self, event):
- # Initialize gRPC
- self.channel = grpc.insecure_channel(self.host_and_port)
- self.channel_ready_future = grpc.channel_ready_future(self.channel)
-
- self.log.info('openolt-device-created')
+ self.log.debug('init')
+ self.indications_thread_handle = threading.Thread(
+ target=self.indications_thread)
+ self.indications_thread_handle.setDaemon(True)
+ self.indications_thread_handle.start()
def post_init(self, event):
self.log.debug('post_init')
+ # Initialize gRPC
+ self._grpc = OpenoltGrpc(self.host_and_port, self)
- # We have reached init state, starting the indications thread
-
- # Catch RuntimeError exception
- try:
- # Start indications thread
- self.indications_thread_handle = threading.Thread(
- target=self.indications_thread)
- # Old getter/setter API for daemon; use it directly as a
- # property instead. The Jinkins error will happon on the reason of
- # Exception in thread Thread-1 (most likely raised # during
- # interpreter shutdown)
- self.indications_thread_handle.setDaemon(True)
- self.indications_thread_handle.start()
- except Exception as e:
- self.log.exception('post_init failed', e=e)
+ self.log.info('openolt-device-created')
def do_state_connected(self, event):
self.log.debug("do_state_connected")
@@ -150,7 +139,7 @@
self.host_and_port,
self.extra_args,
self.device_info)
- self.flow_mgr = self.flow_mgr_class(self.log, self.stub,
+ self.flow_mgr = self.flow_mgr_class(self.log, self._grpc.stub,
self.device_id,
self.data_model.logical_device_id,
self.platform, self.resource_mgr,
@@ -174,108 +163,18 @@
self.flow_mgr.reset_flows()
def indications_thread(self):
+ self.log.debug('openolt indications thread starting')
+ self.kafka_consumer = KConsumer(
+ "openolt.ind.alarm",
+ "openolt.ind.pkt",
+ "openolt.ind.olt")
+ self.log.debug('openolt indications thread processing')
+ self.kafka_consumer.read(self.indications_process)
+ self.log.debug('alarm indications thread exited')
- def forward_indication(topic, msg):
- try:
- kafka_proxy = get_kafka_proxy()
- if kafka_proxy and not kafka_proxy.is_faulty():
- self.log.debug('kafka-proxy-available')
- # convert to JSON string if msg is a protobuf msg
- if isinstance(msg, Message):
- msg = dumps(MessageToDict(msg, True, True))
- kafka_proxy.send_message(topic, dumps(msg))
- else:
- self.log.error('kafka-proxy-unavailable')
- except Exception, e:
- self.log.exception('failed-sending-message', e=e)
-
- self.log.debug('starting-indications-thread')
- self.log.debug('connecting to olt')
-
- self.stub = openolt_pb2_grpc.OpenoltStub(self.channel)
-
- timeout = 60*60
- delay = 1
- exponential_back_off = False
- while True:
- try:
- self.device_info = self.stub.GetDeviceInfo(openolt_pb2.Empty())
- break
- except Exception as e:
- if delay > timeout:
- self.log.error("timed out connecting to olt")
- return
- else:
- self.log.warn("retry connecting to olt in %ds: %s"
- % (delay, repr(e)))
- time.sleep(delay)
- if exponential_back_off:
- delay += delay
- else:
- delay += 1
-
- self.log.info('connected to olt', device_info=self.device_info)
-
- self.go_state_connected()
-
- self.indications = self.stub.EnableIndication(openolt_pb2.Empty())
-
- while True:
- try:
- # get the next indication from olt
- ind = next(self.indications)
- except Exception as e:
- self.log.warn('gRPC connection lost', error=e)
- reactor.callFromThread(self.go_state_down)
- reactor.callFromThread(self.go_state_init)
- break
- else:
- self.log.debug("rx indication", indication=ind)
-
- if self.admin_state is "down":
- if ind.HasField('intf_oper_ind') \
- and (ind.intf_oper_ind.type == "nni"):
- self.log.warn('olt is admin down, allow nni ind',
- admin_state=self.admin_state,
- indications=ind)
- else:
- self.log.warn('olt is admin down, ignore indication',
- admin_state=self.admin_state,
- indications=ind)
- continue
-
-
- # indication handlers run in the main event loop
- if ind.HasField('olt_ind'):
- reactor.callFromThread(self.olt_indication, ind.olt_ind)
- elif ind.HasField('intf_ind'):
- reactor.callFromThread(self.intf_indication, ind.intf_ind)
- elif ind.HasField('intf_oper_ind'):
- reactor.callFromThread(self.intf_oper_indication,
- ind.intf_oper_ind)
- elif ind.HasField('onu_disc_ind'):
- reactor.callFromThread(self.onu_discovery_indication,
- ind.onu_disc_ind)
- elif ind.HasField('onu_ind'):
- reactor.callFromThread(self.onu_indication, ind.onu_ind)
- elif ind.HasField('omci_ind'):
- reactor.callFromThread(self.omci_indication, ind.omci_ind)
- elif ind.HasField('pkt_ind'):
- reactor.callFromThread(self.packet_indication, ind.pkt_ind)
- elif ind.HasField('port_stats'):
- reactor.callFromThread(
- self.stats_mgr.port_statistics_indication,
- ind.port_stats)
- elif ind.HasField('flow_stats'):
- reactor.callFromThread(
- self.stats_mgr.flow_statistics_indication,
- ind.flow_stats)
- elif ind.HasField('alarm_ind'):
- forward_indication("openolt.ind.alarm", ind)
- reactor.callFromThread(self.alarm_mgr.process_alarms,
- ind.alarm_ind)
- else:
- self.log.warn('unknown indication type')
+ def indications_process(self, msg):
+ ind = loads(msg)
+ self.log.debug("openolt indication", ind=ind)
def olt_indication(self, olt_indication):
if olt_indication.oper_state == "up":
@@ -420,7 +319,7 @@
port_no=egress_port,
pkt=send_pkt)
- self.stub.OnuPacketOut(onu_pkt)
+ self._grpc.stub.OnuPacketOut(onu_pkt)
elif egress_port_type == Port.ETHERNET_NNI:
self.log.debug('sending-packet-to-uplink', egress_port=egress_port,
@@ -432,7 +331,7 @@
intf_id=self.platform.intf_id_from_nni_port_num(egress_port),
pkt=send_pkt)
- self.stub.UplinkPacketOut(uplink_pkt)
+ self._grpc.stub.UplinkPacketOut(uplink_pkt)
else:
self.log.warn('Packet-out-to-this-interface-type-not-implemented',
@@ -442,7 +341,7 @@
def send_proxied_message(self, proxy_address, msg):
omci = openolt_pb2.OmciMsg(intf_id=proxy_address.channel_id,
onu_id=proxy_address.onu_id, pkt=str(msg))
- self.stub.OmciMsgOut(omci)
+ self._grpc.stub.OmciMsgOut(omci)
def update_flow_table(self, flows):
self.log.debug('No updates here now, all is done in logical flows '
@@ -464,7 +363,7 @@
try:
# Send grpc call
- self.stub.DisableOlt(openolt_pb2.Empty())
+ self._grpc.stub.DisableOlt(openolt_pb2.Empty())
self.admin_state = "down"
self.log.info('openolt device disabled')
except Exception as e:
@@ -491,7 +390,7 @@
self.log.debug('reenabling-olt')
try:
- self.stub.ReenableOlt(openolt_pb2.Empty())
+ self._grpc.stub.ReenableOlt(openolt_pb2.Empty())
except Exception as e:
self.log.error('Failure to reenable openolt device', error=e)
else:
@@ -506,7 +405,7 @@
onu = openolt_pb2.Onu(intf_id=intf_id, onu_id=onu_id,
serial_number=serial_number)
try:
- self.stub.ActivateOnu(onu)
+ self._grpc.stub.ActivateOnu(onu)
except grpc.RpcError as grpc_e:
if grpc_e.code() == grpc.StatusCode.ALREADY_EXISTS:
self.log.info('onu activation in progress',
@@ -555,14 +454,14 @@
intf_id=child_device.proxy_address.channel_id,
onu_id=child_device.proxy_address.onu_id,
serial_number=serial_number)
- self.stub.DeleteOnu(onu)
+ self._grpc.stub.DeleteOnu(onu)
except Exception as e:
self.log.exception("error-deleting-the-onu-on-olt-device", error=e)
def reboot(self):
self.log.debug('rebooting openolt device')
try:
- self.stub.Reboot(openolt_pb2.Empty())
+ self._grpc.stub.Reboot(openolt_pb2.Empty())
except Exception as e:
self.log.error('something went wrong with the reboot', error=e)
else:
@@ -570,7 +469,7 @@
def trigger_statistics_collection(self):
try:
- self.stub.CollectStatistics(openolt_pb2.Empty())
+ self._grpc.stub.CollectStatistics(openolt_pb2.Empty())
except Exception as e:
self.log.error('Error while triggering statistics collection',
error=e)
diff --git a/voltha/adapters/openolt/openolt_grpc.py b/voltha/adapters/openolt/openolt_grpc.py
new file mode 100644
index 0000000..436503b
--- /dev/null
+++ b/voltha/adapters/openolt/openolt_grpc.py
@@ -0,0 +1,169 @@
+#
+# Copyright 2019 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import structlog
+import grpc
+import threading
+from simplejson import dumps
+from twisted.internet import reactor
+from google.protobuf.json_format import MessageToDict
+from google.protobuf.message import Message
+from voltha.northbound.kafka.kafka_proxy import get_kafka_proxy
+from voltha.adapters.openolt.protos import openolt_pb2_grpc, openolt_pb2
+
+
+class OpenoltGrpc(object):
+ def __init__(self, host_and_port, device):
+ super(OpenoltGrpc, self).__init__()
+ self.log = structlog.get_logger()
+ self.log.debug('openolt grpc init')
+ self.device = device
+ self.host_and_port = host_and_port
+ self.channel = grpc.insecure_channel(self.host_and_port)
+ self.channel_ready_future = grpc.channel_ready_future(self.channel)
+
+ try:
+ # Start indications thread
+ self.log.debug('openolt grpc starting')
+ self.indications_thread_handle = threading.Thread(
+ target=self.indications_thread)
+ # Old getter/setter API for daemon; use it directly as a
+ # property instead. The Jinkins error will happon on the reason of
+ # Exception in thread Thread-1 (most likely raised # during
+ # interpreter shutdown)
+ self.indications_thread_handle.setDaemon(True)
+ self.indications_thread_handle.start()
+ except Exception as e:
+ self.log.exception('indication start failed', e=e)
+ else:
+ self.log.debug('openolt grpc started')
+
+ def indications_thread(self):
+
+ def forward_indication(topic, msg):
+ try:
+ self.log.debug('forward indication', topic=topic, msg=msg)
+ kafka_proxy = get_kafka_proxy()
+ if kafka_proxy and not kafka_proxy.is_faulty():
+ self.log.debug('kafka-proxy-available')
+ self.log.debug('shad 1', topic=topic, msg=msg)
+ # convert to JSON string if msg is a protobuf msg
+ if isinstance(msg, Message):
+ self.log.debug('shad 2', topic=topic, msg=msg)
+ msg = dumps(MessageToDict(msg, True, True))
+ self.log.debug('shad 3', topic=topic, msg=msg)
+ kafka_proxy.send_message(topic, dumps(msg))
+ else:
+ self.log.error('kafka-proxy-unavailable')
+ except Exception, e:
+ self.log.exception('failed-sending-message', e=e)
+ self.log.debug('shad 4', topic=topic, msg=msg)
+
+ self.log.debug('openolt grpc connecting to olt')
+
+ self.stub = openolt_pb2_grpc.OpenoltStub(self.channel)
+
+ timeout = 60*60
+ delay = 1
+ exponential_back_off = False
+ while True:
+ try:
+ self.device.device_info \
+ = self.stub.GetDeviceInfo(openolt_pb2.Empty())
+ break
+ except Exception as e:
+ if delay > timeout:
+ self.log.error("openolt grpc timed out connecting to olt")
+ return
+ else:
+ self.log.warn(
+ "openolt grpc retry connecting to olt in %ds: %s"
+ % (delay, repr(e)))
+ time.sleep(delay)
+ if exponential_back_off:
+ delay += delay
+ else:
+ delay += 1
+
+ self.log.info('openolt grpc connected to olt',
+ device_info=self.device.device_info)
+
+ self.device.go_state_connected()
+
+ self.indications = self.stub.EnableIndication(openolt_pb2.Empty())
+
+ while True:
+ try:
+ # get the next indication from olt
+ ind = next(self.indications)
+ except Exception as e:
+ self.log.warn('openolt grpc connection lost', error=e)
+ reactor.callFromThread(self.device.go_state_down)
+ reactor.callFromThread(self.device.go_state_init)
+ break
+ else:
+ self.log.debug("openolt grpc rx indication", indication=ind)
+
+ if self.device.admin_state is "down":
+ if ind.HasField('intf_oper_ind') \
+ and (ind.intf_oper_ind.type == "nni"):
+ self.log.warn('olt is admin down, allow nni ind',
+ admin_state=self.device.admin_state,
+ indications=ind)
+ else:
+ self.log.warn('olt is admin down, ignore indication',
+ admin_state=self.admin_state,
+ indications=ind)
+ continue
+
+ # indication handlers run in the main event loop
+ if ind.HasField('olt_ind'):
+ forward_indication("openolt.ind.olt", ind.olt_ind)
+ reactor.callFromThread(
+ self.device.olt_indication, ind.olt_ind)
+ elif ind.HasField('intf_ind'):
+ reactor.callFromThread(
+ self.device.intf_indication, ind.intf_ind)
+ elif ind.HasField('intf_oper_ind'):
+ reactor.callFromThread(
+ self.device.intf_oper_indication, ind.intf_oper_ind)
+ elif ind.HasField('onu_disc_ind'):
+ reactor.callFromThread(
+ self.device.onu_discovery_indication, ind.onu_disc_ind)
+ elif ind.HasField('onu_ind'):
+ reactor.callFromThread(
+ self.device.onu_indication, ind.onu_ind)
+ elif ind.HasField('omci_ind'):
+ reactor.callFromThread(
+ self.device.omci_indication, ind.omci_ind)
+ elif ind.HasField('pkt_ind'):
+ forward_indication("openolt.ind.pkt", ind.pkt_ind)
+ reactor.callFromThread(
+ self.device.packet_indication, ind.pkt_ind)
+ elif ind.HasField('port_stats'):
+ reactor.callFromThread(
+ self.device.stats_mgr.port_statistics_indication,
+ ind.port_stats)
+ elif ind.HasField('flow_stats'):
+ reactor.callFromThread(
+ self.device.stats_mgr.flow_statistics_indication,
+ ind.flow_stats)
+ elif ind.HasField('alarm_ind'):
+ forward_indication("openolt.ind.alarm", ind.alarm_ind)
+ reactor.callFromThread(
+ self.device.alarm_mgr.process_alarms, ind.alarm_ind)
+ else:
+ self.log.warn('unknown indication type')