blob: cfd838c31427abd786be5c606350f98bdf42e2ca [file] [log] [blame]
Shad Ansari42392a72019-04-09 22:44:18 -07001#
2# Copyright 2019 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import threading
18from google.protobuf.json_format import Parse
19from simplejson import loads
20import structlog
21from scapy.layers.l2 import Ether, Dot1Q
22import binascii
23
24from common.frameio.frameio import hexify
25from voltha.protos.openflow_13_pb2 import PacketOut
26from voltha.adapters.openolt.openolt_kafka_consumer import KConsumer
27from voltha.core.flow_decomposer import OUTPUT
28from voltha.protos.device_pb2 import Port
29from voltha.adapters.openolt.protos import openolt_pb2
30
31
32class OpenoltPacket(object):
33 def __init__(self, device):
34 self.log = structlog.get_logger()
35 self.device = device
36 self.packet_thread_handle = threading.Thread(
37 target=self.packet_thread)
38 self.packet_thread_handle.setDaemon(True)
39
40 def start(self):
41 self.packet_thread_handle.start()
42
43 def stop(self):
44 pass
45
46 def packet_thread(self):
47 self.log.debug('openolt packet-out thread starting')
48 KConsumer(self.packet_process, 'voltha.pktout')
49
50 def packet_process(self, topic, msg):
51
52 def get_port_out(opo):
53 for action in opo.actions:
54 if action.type == OUTPUT:
55 return action.output.port
56
57 pb = Parse(loads(msg), PacketOut(), ignore_unknown_fields=True)
58
59 logical_device_id = pb.id
60 ofp_packet_out = pb.packet_out
61
62 self.log.debug("received packet-out form kafka",
63 logical_device_id=logical_device_id,
64 ofp_packet_out=ofp_packet_out)
65
66 egress_port = get_port_out(ofp_packet_out)
67 msg = ofp_packet_out.data
68
69 self.log.debug('rcv-packet-out', logical_device_id=logical_device_id,
70 egress_port=egress_port,
71 # adapter_name=self.adapter_name,
72 data=hexify(msg))
73
74 pkt = Ether(msg)
75 self.log.debug('packet out', egress_port=egress_port,
76 packet=str(pkt).encode("HEX"))
77
78 # Find port type
79 egress_port_type = self.device.platform \
80 .intf_id_to_port_type_name(egress_port)
81
82 if egress_port_type == Port.ETHERNET_UNI:
83
84 if pkt.haslayer(Dot1Q):
85 outer_shim = pkt.getlayer(Dot1Q)
86 if isinstance(outer_shim.payload, Dot1Q):
87 # If double tag, remove the outer tag
88 payload = (
89 Ether(src=pkt.src, dst=pkt.dst,
90 type=outer_shim.type) /
91 outer_shim.payload
92 )
93 else:
94 payload = pkt
95 else:
96 payload = pkt
97
98 send_pkt = binascii.unhexlify(str(payload).encode("HEX"))
99
100 self.log.debug(
101 'sending-packet-to-ONU', egress_port=egress_port,
102 intf_id=self.device.platform.intf_id_from_uni_port_num(
103 egress_port),
104 onu_id=self.device.platform.onu_id_from_port_num(egress_port),
105 uni_id=self.device.platform.uni_id_from_port_num(egress_port),
106 port_no=egress_port,
107 packet=str(payload).encode("HEX"))
108
109 onu_pkt = openolt_pb2.OnuPacket(
110 intf_id=self.device.platform.intf_id_from_uni_port_num(
111 egress_port),
112 onu_id=self.device.platform.onu_id_from_port_num(egress_port),
113 port_no=egress_port,
114 pkt=send_pkt)
115
116 self.device._grpc.stub.OnuPacketOut(onu_pkt)
117
118 elif egress_port_type == Port.ETHERNET_NNI:
119 self.log.debug('sending-packet-to-uplink', egress_port=egress_port,
120 packet=str(pkt).encode("HEX"))
121
122 send_pkt = binascii.unhexlify(str(pkt).encode("HEX"))
123
124 uplink_pkt = openolt_pb2.UplinkPacket(
125 intf_id=self.device.platform.intf_id_from_nni_port_num(
126 egress_port),
127 pkt=send_pkt)
128
129 self.device._grpc.stub.UplinkPacketOut(uplink_pkt)
130
131 else:
132 self.log.warn('Packet-out-to-this-interface-type-not-implemented',
133 egress_port=egress_port,
134 port_type=egress_port_type)