blob: 48739842acd14a497ef6c25781cea13143837c7e [file] [log] [blame]
Shad Ansarie969afc2019-04-05 15:16:41 -07001#
2# Copyright 2019 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import threading
18from google.protobuf.json_format import Parse
19from simplejson import loads
20from twisted.internet import reactor
21import structlog
Shad Ansari995ca632019-04-08 19:43:46 -070022from scapy.layers.l2 import Ether, Packet
23from common.frameio.frameio import hexify
Shad Ansarie969afc2019-04-05 15:16:41 -070024
25from voltha.adapters.openolt.protos import openolt_pb2
26from voltha.adapters.openolt.openolt_kafka_consumer import KConsumer
27
28
29class OpenoltIndications(object):
30 def __init__(self, device):
31 self.log = structlog.get_logger()
32 self.device = device
33 self.indications_thread_handle = threading.Thread(
34 target=self.indications_thread)
35 self.indications_thread_handle.setDaemon(True)
36
37 def start(self):
38 self.indications_thread_handle.start()
39
40 def stop(self):
41 pass
42
43 def indications_thread(self):
44 self.log.debug('openolt indications thread starting')
Shad Ansaria3bcfe12019-04-13 11:46:28 -070045
46 KConsumer(self.indications_process,
47 'openolt.ind-{}'.format(
48 self.device.host_and_port.split(':')[0]))
Shad Ansarie969afc2019-04-05 15:16:41 -070049
50 def indications_process(self, topic, msg):
Shad Ansarie969afc2019-04-05 15:16:41 -070051
Shad Ansari929e6a02019-04-06 23:41:42 -070052 ind = Parse(loads(msg), openolt_pb2.Indication(),
53 ignore_unknown_fields=True)
54
55 self.log.debug("received openolt indication", ind=ind)
56
57 # indication handlers run in the main event loop
58 if ind.HasField('olt_ind'):
59 reactor.callFromThread(self.device.olt_indication, ind.olt_ind)
60 elif ind.HasField('intf_ind'):
61 reactor.callFromThread(self.device.intf_indication, ind.intf_ind)
62 elif ind.HasField('intf_oper_ind'):
63 reactor.callFromThread(self.device.intf_oper_indication,
64 ind.intf_oper_ind)
65 elif ind.HasField('onu_disc_ind'):
66 reactor.callFromThread(self.device.onu_discovery_indication,
67 ind.onu_disc_ind)
68 elif ind.HasField('onu_ind'):
69 reactor.callFromThread(self.device.onu_indication, ind.onu_ind)
70 elif ind.HasField('omci_ind'):
71 reactor.callFromThread(self.device.omci_indication, ind.omci_ind)
72 elif ind.HasField('pkt_ind'):
Shad Ansari995ca632019-04-08 19:43:46 -070073 self.send_packet_in(ind.pkt_ind)
Shad Ansari929e6a02019-04-06 23:41:42 -070074 elif ind.HasField('port_stats'):
Shad Ansarie969afc2019-04-05 15:16:41 -070075 reactor.callFromThread(
Shad Ansari929e6a02019-04-06 23:41:42 -070076 self.device.stats_mgr.port_statistics_indication,
77 ind.port_stats)
78 elif ind.HasField('flow_stats'):
79 reactor.callFromThread(
80 self.device.stats_mgr.flow_statistics_indication,
81 ind.flow_stats)
82 elif ind.HasField('alarm_ind'):
83 reactor.callFromThread(
84 self.device.alarm_mgr.process_alarms, ind.alarm_ind)
85 else:
86 self.log.warn('unknown indication type')
Shad Ansari995ca632019-04-08 19:43:46 -070087
88 def send_packet_in(self, pkt_indication):
89 self.log.debug("packet indication",
90 intf_type=pkt_indication.intf_type,
91 intf_id=pkt_indication.intf_id,
92 port_no=pkt_indication.port_no,
93 cookie=pkt_indication.cookie,
94 gemport_id=pkt_indication.gemport_id,
95 flow_id=pkt_indication.flow_id)
96 try:
97 logical_port_num = self.device.data_model.logical_port_num(
98 pkt_indication.intf_type,
99 pkt_indication.intf_id,
100 pkt_indication.port_no,
101 pkt_indication.gemport_id)
102 except ValueError:
103 self.log.error('No logical port found',
104 intf_type=pkt_indication.intf_type,
105 intf_id=pkt_indication.intf_id,
106 port_no=pkt_indication.port_no,
107 gemport_id=pkt_indication.gemport_id)
108 return
109
110 ether_pkt = Ether(pkt_indication.pkt)
111
112 if isinstance(ether_pkt, Packet):
113 ether_pkt = str(ether_pkt)
114
115 logical_device_id = self.device.data_model.logical_device_id
116 topic = 'packet-in:' + logical_device_id
117
118 self.log.debug('send-packet-in', logical_device_id=logical_device_id,
119 logical_port_num=logical_port_num,
120 packet=hexify(ether_pkt))
121
122 self.device.data_model.adapter_agent.event_bus.publish(
123 topic, (logical_port_num, str(ether_pkt)))