blob: e1299db209ed8a1c508e9edb372af7d8c19f77e8 [file] [log] [blame]
Shad Ansari0cc92302019-04-03 11:34:49 -07001#
2# Copyright 2019 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import structlog
18import grpc
19import threading
Shad Ansari0cc92302019-04-03 11:34:49 -070020from twisted.internet import reactor
Shad Ansari42392a72019-04-09 22:44:18 -070021from voltha.northbound.kafka.kafka_proxy import kafka_send_pb
Shad Ansari0cc92302019-04-03 11:34:49 -070022from voltha.adapters.openolt.protos import openolt_pb2_grpc, openolt_pb2
23
24
25class OpenoltGrpc(object):
26 def __init__(self, host_and_port, device):
27 super(OpenoltGrpc, self).__init__()
28 self.log = structlog.get_logger()
29 self.log.debug('openolt grpc init')
30 self.device = device
31 self.host_and_port = host_and_port
32 self.channel = grpc.insecure_channel(self.host_and_port)
33 self.channel_ready_future = grpc.channel_ready_future(self.channel)
Shad Ansari72462c82019-04-17 01:36:01 -070034 self.stub = openolt_pb2_grpc.OpenoltStub(self.channel)
Shad Ansari0cc92302019-04-03 11:34:49 -070035
Shad Ansari72462c82019-04-17 01:36:01 -070036 def start(self):
Shad Ansari0cc92302019-04-03 11:34:49 -070037 try:
38 # Start indications thread
39 self.log.debug('openolt grpc starting')
40 self.indications_thread_handle = threading.Thread(
41 target=self.indications_thread)
42 # Old getter/setter API for daemon; use it directly as a
43 # property instead. The Jinkins error will happon on the reason of
44 # Exception in thread Thread-1 (most likely raised # during
45 # interpreter shutdown)
46 self.indications_thread_handle.setDaemon(True)
47 self.indications_thread_handle.start()
48 except Exception as e:
49 self.log.exception('indication start failed', e=e)
50 else:
51 self.log.debug('openolt grpc started')
52
53 def indications_thread(self):
54
Shad Ansari0cc92302019-04-03 11:34:49 -070055 self.indications = self.stub.EnableIndication(openolt_pb2.Empty())
56
57 while True:
58 try:
59 # get the next indication from olt
60 ind = next(self.indications)
61 except Exception as e:
62 self.log.warn('openolt grpc connection lost', error=e)
63 reactor.callFromThread(self.device.go_state_down)
64 reactor.callFromThread(self.device.go_state_init)
65 break
66 else:
67 self.log.debug("openolt grpc rx indication", indication=ind)
68
69 if self.device.admin_state is "down":
70 if ind.HasField('intf_oper_ind') \
71 and (ind.intf_oper_ind.type == "nni"):
72 self.log.warn('olt is admin down, allow nni ind',
73 admin_state=self.device.admin_state,
74 indications=ind)
75 else:
76 self.log.warn('olt is admin down, ignore indication',
77 admin_state=self.admin_state,
78 indications=ind)
79 continue
80
Shad Ansaria3bcfe12019-04-13 11:46:28 -070081 topic = 'openolt.ind-{}'.format(
82 self.device.host_and_port.split(':')[0])
83 kafka_send_pb(topic, ind)