blob: 1bc77e039389e86bd08ee6f3470f95c58ee724cd [file] [log] [blame]
Shad Ansari42392a72019-04-09 22:44:18 -07001#
2# Copyright 2019 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import threading
18from google.protobuf.json_format import Parse
19from simplejson import loads
20import structlog
21from scapy.layers.l2 import Ether, Dot1Q
22import binascii
23
24from common.frameio.frameio import hexify
25from voltha.protos.openflow_13_pb2 import PacketOut
26from voltha.adapters.openolt.openolt_kafka_consumer import KConsumer
27from voltha.core.flow_decomposer import OUTPUT
28from voltha.protos.device_pb2 import Port
29from voltha.adapters.openolt.protos import openolt_pb2
30
31
32class OpenoltPacket(object):
33 def __init__(self, device):
34 self.log = structlog.get_logger()
35 self.device = device
Shad Ansari8793c132019-05-18 00:02:27 +000036 self.packet_thread_handle = threading.Thread(
37 target=self.packet_thread)
38 self.packet_thread_handle.setDaemon(True)
Shad Ansari42392a72019-04-09 22:44:18 -070039
40 def start(self):
Shad Ansari8793c132019-05-18 00:02:27 +000041 self.packet_thread_handle.start()
Shad Ansari42392a72019-04-09 22:44:18 -070042
43 def stop(self):
Shad Ansari1ed6f7d2019-05-18 00:01:54 +000044 pass
Shad Ansari42392a72019-04-09 22:44:18 -070045
Shad Ansari8793c132019-05-18 00:02:27 +000046 def packet_thread(self):
Shad Ansari42392a72019-04-09 22:44:18 -070047 self.log.debug('openolt packet-out thread starting')
Shad Ansari8793c132019-05-18 00:02:27 +000048 KConsumer(self.packet_process,
Shad Ansaria3bcfe12019-04-13 11:46:28 -070049 'voltha.pktout-{}'.format(
50 self.device.data_model.logical_device_id))
Shad Ansari42392a72019-04-09 22:44:18 -070051
Shad Ansari8793c132019-05-18 00:02:27 +000052 def packet_process(self, topic, msg):
Shad Ansari42392a72019-04-09 22:44:18 -070053
54 def get_port_out(opo):
55 for action in opo.actions:
56 if action.type == OUTPUT:
57 return action.output.port
58
59 pb = Parse(loads(msg), PacketOut(), ignore_unknown_fields=True)
60
61 logical_device_id = pb.id
62 ofp_packet_out = pb.packet_out
63
64 self.log.debug("received packet-out form kafka",
65 logical_device_id=logical_device_id,
66 ofp_packet_out=ofp_packet_out)
67
68 egress_port = get_port_out(ofp_packet_out)
69 msg = ofp_packet_out.data
70
71 self.log.debug('rcv-packet-out', logical_device_id=logical_device_id,
72 egress_port=egress_port,
73 # adapter_name=self.adapter_name,
74 data=hexify(msg))
75
76 pkt = Ether(msg)
77 self.log.debug('packet out', egress_port=egress_port,
78 packet=str(pkt).encode("HEX"))
79
80 # Find port type
81 egress_port_type = self.device.platform \
82 .intf_id_to_port_type_name(egress_port)
83
84 if egress_port_type == Port.ETHERNET_UNI:
85
86 if pkt.haslayer(Dot1Q):
87 outer_shim = pkt.getlayer(Dot1Q)
88 if isinstance(outer_shim.payload, Dot1Q):
89 # If double tag, remove the outer tag
90 payload = (
91 Ether(src=pkt.src, dst=pkt.dst,
92 type=outer_shim.type) /
93 outer_shim.payload
94 )
95 else:
96 payload = pkt
97 else:
98 payload = pkt
99
100 send_pkt = binascii.unhexlify(str(payload).encode("HEX"))
101
102 self.log.debug(
103 'sending-packet-to-ONU', egress_port=egress_port,
104 intf_id=self.device.platform.intf_id_from_uni_port_num(
105 egress_port),
106 onu_id=self.device.platform.onu_id_from_port_num(egress_port),
107 uni_id=self.device.platform.uni_id_from_port_num(egress_port),
108 port_no=egress_port,
109 packet=str(payload).encode("HEX"))
110
111 onu_pkt = openolt_pb2.OnuPacket(
112 intf_id=self.device.platform.intf_id_from_uni_port_num(
113 egress_port),
114 onu_id=self.device.platform.onu_id_from_port_num(egress_port),
115 port_no=egress_port,
116 pkt=send_pkt)
117
Shad Ansari548f94d2019-04-24 13:42:52 -0700118 self.device.stub.OnuPacketOut(onu_pkt)
Shad Ansari42392a72019-04-09 22:44:18 -0700119
120 elif egress_port_type == Port.ETHERNET_NNI:
121 self.log.debug('sending-packet-to-uplink', egress_port=egress_port,
122 packet=str(pkt).encode("HEX"))
123
124 send_pkt = binascii.unhexlify(str(pkt).encode("HEX"))
125
126 uplink_pkt = openolt_pb2.UplinkPacket(
127 intf_id=self.device.platform.intf_id_from_nni_port_num(
128 egress_port),
129 pkt=send_pkt)
130
Shad Ansari548f94d2019-04-24 13:42:52 -0700131 self.device.stub.UplinkPacketOut(uplink_pkt)
Shad Ansari42392a72019-04-09 22:44:18 -0700132
133 else:
134 self.log.warn('Packet-out-to-this-interface-type-not-implemented',
135 egress_port=egress_port,
136 port_type=egress_port_type)