blob: fc5b32e931251395c3cf4c9d313a60c3a7bfa325 [file] [log] [blame]
Shad Ansarie969afc2019-04-05 15:16:41 -07001#
2# Copyright 2019 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import threading
18from google.protobuf.json_format import Parse
19from simplejson import loads
20from twisted.internet import reactor
21import structlog
22
23from voltha.adapters.openolt.protos import openolt_pb2
24from voltha.adapters.openolt.openolt_kafka_consumer import KConsumer
25
26
27class OpenoltIndications(object):
28 def __init__(self, device):
29 self.log = structlog.get_logger()
30 self.device = device
31 self.indications_thread_handle = threading.Thread(
32 target=self.indications_thread)
33 self.indications_thread_handle.setDaemon(True)
34
35 def start(self):
36 self.indications_thread_handle.start()
37
38 def stop(self):
39 pass
40
41 def indications_thread(self):
42 self.log.debug('openolt indications thread starting')
Shad Ansaria3bcfe12019-04-13 11:46:28 -070043
44 KConsumer(self.indications_process,
45 'openolt.ind-{}'.format(
46 self.device.host_and_port.split(':')[0]))
Shad Ansarie969afc2019-04-05 15:16:41 -070047
48 def indications_process(self, topic, msg):
Shad Ansarie969afc2019-04-05 15:16:41 -070049
Shad Ansari929e6a02019-04-06 23:41:42 -070050 ind = Parse(loads(msg), openolt_pb2.Indication(),
51 ignore_unknown_fields=True)
52
53 self.log.debug("received openolt indication", ind=ind)
54
Shad Ansari748fbf92019-04-18 13:40:03 -070055 if self.device.admin_state is "down":
56 if ind.HasField('intf_oper_ind') \
57 and (ind.intf_oper_ind.type == "nni"):
58 self.log.warn('olt is admin down, allow nni ind',
59 admin_state=self.device.admin_state,
60 indications=ind)
61 else:
62 self.log.warn('olt is admin down, ignore indication',
63 admin_state=self.device.admin_state,
64 indications=ind)
65 return
66
Shad Ansari929e6a02019-04-06 23:41:42 -070067 # indication handlers run in the main event loop
68 if ind.HasField('olt_ind'):
69 reactor.callFromThread(self.device.olt_indication, ind.olt_ind)
70 elif ind.HasField('intf_ind'):
71 reactor.callFromThread(self.device.intf_indication, ind.intf_ind)
72 elif ind.HasField('intf_oper_ind'):
73 reactor.callFromThread(self.device.intf_oper_indication,
74 ind.intf_oper_ind)
75 elif ind.HasField('onu_disc_ind'):
76 reactor.callFromThread(self.device.onu_discovery_indication,
77 ind.onu_disc_ind)
78 elif ind.HasField('onu_ind'):
79 reactor.callFromThread(self.device.onu_indication, ind.onu_ind)
80 elif ind.HasField('omci_ind'):
81 reactor.callFromThread(self.device.omci_indication, ind.omci_ind)
Shad Ansari929e6a02019-04-06 23:41:42 -070082 elif ind.HasField('port_stats'):
Shad Ansarie969afc2019-04-05 15:16:41 -070083 reactor.callFromThread(
Shad Ansari929e6a02019-04-06 23:41:42 -070084 self.device.stats_mgr.port_statistics_indication,
85 ind.port_stats)
86 elif ind.HasField('flow_stats'):
87 reactor.callFromThread(
88 self.device.stats_mgr.flow_statistics_indication,
89 ind.flow_stats)
90 elif ind.HasField('alarm_ind'):
91 reactor.callFromThread(
92 self.device.alarm_mgr.process_alarms, ind.alarm_ind)
93 else:
94 self.log.warn('unknown indication type')