blob: 75f905dfc3acf089183386c42e648703dfbb65f6 [file] [log] [blame]
Shad Ansarie969afc2019-04-05 15:16:41 -07001#
2# Copyright 2019 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import threading
18from google.protobuf.json_format import Parse
19from simplejson import loads
20from twisted.internet import reactor
21import structlog
Shad Ansari8793c132019-05-18 00:02:27 +000022from scapy.layers.l2 import Ether, Packet
23from common.frameio.frameio import hexify
Shad Ansarie969afc2019-04-05 15:16:41 -070024
25from voltha.adapters.openolt.protos import openolt_pb2
26from voltha.adapters.openolt.openolt_kafka_consumer import KConsumer
27
28
29class OpenoltIndications(object):
30 def __init__(self, device):
31 self.log = structlog.get_logger()
32 self.device = device
33 self.indications_thread_handle = threading.Thread(
34 target=self.indications_thread)
35 self.indications_thread_handle.setDaemon(True)
36
37 def start(self):
38 self.indications_thread_handle.start()
39
40 def stop(self):
41 pass
42
43 def indications_thread(self):
44 self.log.debug('openolt indications thread starting')
Shad Ansaria3bcfe12019-04-13 11:46:28 -070045
46 KConsumer(self.indications_process,
47 'openolt.ind-{}'.format(
48 self.device.host_and_port.split(':')[0]))
Shad Ansarie969afc2019-04-05 15:16:41 -070049
50 def indications_process(self, topic, msg):
Shad Ansarie969afc2019-04-05 15:16:41 -070051
Shad Ansari929e6a02019-04-06 23:41:42 -070052 ind = Parse(loads(msg), openolt_pb2.Indication(),
53 ignore_unknown_fields=True)
54
55 self.log.debug("received openolt indication", ind=ind)
56
Shad Ansari748fbf92019-04-18 13:40:03 -070057 if self.device.admin_state is "down":
58 if ind.HasField('intf_oper_ind') \
59 and (ind.intf_oper_ind.type == "nni"):
60 self.log.warn('olt is admin down, allow nni ind',
61 admin_state=self.device.admin_state,
62 indications=ind)
63 else:
64 self.log.warn('olt is admin down, ignore indication',
65 admin_state=self.device.admin_state,
66 indications=ind)
67 return
68
Shad Ansari929e6a02019-04-06 23:41:42 -070069 # indication handlers run in the main event loop
70 if ind.HasField('olt_ind'):
71 reactor.callFromThread(self.device.olt_indication, ind.olt_ind)
72 elif ind.HasField('intf_ind'):
73 reactor.callFromThread(self.device.intf_indication, ind.intf_ind)
74 elif ind.HasField('intf_oper_ind'):
75 reactor.callFromThread(self.device.intf_oper_indication,
76 ind.intf_oper_ind)
77 elif ind.HasField('onu_disc_ind'):
78 reactor.callFromThread(self.device.onu_discovery_indication,
79 ind.onu_disc_ind)
80 elif ind.HasField('onu_ind'):
81 reactor.callFromThread(self.device.onu_indication, ind.onu_ind)
82 elif ind.HasField('omci_ind'):
83 reactor.callFromThread(self.device.omci_indication, ind.omci_ind)
Shad Ansari8793c132019-05-18 00:02:27 +000084 elif ind.HasField('pkt_ind'):
85 self.send_packet_in(ind.pkt_ind)
Shad Ansari929e6a02019-04-06 23:41:42 -070086 elif ind.HasField('port_stats'):
Shad Ansarie969afc2019-04-05 15:16:41 -070087 reactor.callFromThread(
Shad Ansari929e6a02019-04-06 23:41:42 -070088 self.device.stats_mgr.port_statistics_indication,
89 ind.port_stats)
90 elif ind.HasField('flow_stats'):
91 reactor.callFromThread(
92 self.device.stats_mgr.flow_statistics_indication,
93 ind.flow_stats)
94 elif ind.HasField('alarm_ind'):
95 reactor.callFromThread(
96 self.device.alarm_mgr.process_alarms, ind.alarm_ind)
97 else:
98 self.log.warn('unknown indication type')
Shad Ansari8793c132019-05-18 00:02:27 +000099
100 def send_packet_in(self, pkt_indication):
101 self.log.debug("packet indication",
102 intf_type=pkt_indication.intf_type,
103 intf_id=pkt_indication.intf_id,
104 port_no=pkt_indication.port_no,
105 cookie=pkt_indication.cookie,
106 gemport_id=pkt_indication.gemport_id,
107 flow_id=pkt_indication.flow_id)
108 try:
109 logical_port_num = self.device.data_model.logical_port_num(
110 pkt_indication.intf_type,
111 pkt_indication.intf_id,
112 pkt_indication.port_no,
113 pkt_indication.gemport_id)
114 except ValueError:
115 self.log.error('No logical port found',
116 intf_type=pkt_indication.intf_type,
117 intf_id=pkt_indication.intf_id,
118 port_no=pkt_indication.port_no,
119 gemport_id=pkt_indication.gemport_id)
120 return
121
122 ether_pkt = Ether(pkt_indication.pkt)
123
124 if isinstance(ether_pkt, Packet):
125 ether_pkt = str(ether_pkt)
126
127 logical_device_id = self.device.data_model.logical_device_id
128 topic = 'packet-in:' + logical_device_id
129
130 self.log.debug('send-packet-in', logical_device_id=logical_device_id,
131 logical_port_num=logical_port_num,
132 packet=hexify(ether_pkt))
133
134 self.device.data_model.adapter_agent.event_bus.publish(
135 topic, (logical_port_num, str(ether_pkt)))