blob: 0462f5d735f0fb5719481fd9c946ddea0062361c [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
Dan Talayco3087a462010-02-13 14:01:47 -08004DataPlane and DataPlanePort classes
Dan Talayco34089522010-02-07 23:07:41 -08005
6Provide the interface to the control the set of ports being used
7to stimulate the switch under test.
8
9See the class dataplaneport for more details. This class wraps
10a set of those objects allowing general calls and parsing
11configuration.
12
Dan Talayco3087a462010-02-13 14:01:47 -080013@todo Add "filters" for matching packets. Actions supported
14for filters should include a callback or a counter
Dan Talayco34089522010-02-07 23:07:41 -080015"""
16
Dan Talayco3087a462010-02-13 14:01:47 -080017import sys
18import os
19import socket
20import time
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080021import netutils
Dan Talayco3087a462010-02-13 14:01:47 -080022from threading import Thread
23from threading import Lock
24
25#@todo Move these identifiers into config
26ETH_P_ALL = 0x03
27RCV_TIMEOUT = 10000
28RCV_SIZE = 4096
29
30class DataPlanePort(Thread):
31 """
32 Class defining a port monitoring object.
33
34 Control a dataplane port connected to the switch under test.
35 Creates a promiscuous socket on a physical interface.
36 Queues the packets received on that interface with time stamps.
37 Inherits from Thread class as meant to run in background. Also
38 supports polling.
39 Use accessors to dequeue packets for proper synchronization.
40 """
41
42 def __init__(self, interface_name, max_pkts=1024):
43 """
44 Set up a port monitor object
45 @param interface_name The name of the physical interface like eth1
46 @param max_pkts Maximum number of pkts to keep in queue
47 """
48 Thread.__init__(self)
49 self.interface_name = interface_name
50 self.max_pkts = max_pkts
51 self.packets_pending = 0
52 self.packets_total = 0
53 self.packets = []
54 self.packet_times = []
55 self.sync = Lock()
56 self.socket = self.interface_open(interface_name)
57 print "Openned port monitor socket " + interface_name
58
59 def interface_open(self, interface_name):
60 """
61 Open a socket in a promiscuous mode for a data connection.
62 @param interface_name port name as a string such as 'eth1'
63 @retval s socket
64 """
65 s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
66 socket.htons(ETH_P_ALL))
67 s.bind((interface_name, 0))
68 promisc.set_promisc(s, interface_name)
69 s.settimeout(RCV_TIMEOUT)
70 return s
71
72 def kill(self):
73 """
74 Terminate the running thread
75 """
76 self.running = False
77 self.socket.close()
78 print "Port monitor for " + self.interface_name + " exiting"
79
80 def run(self):
81 """
82 Activity function for class
83 """
84 self.running = True
85 while self.running:
86 try:
87 rcvmsg = self.socket.recv(RCV_SIZE)
88 rcvtime = time.clock()
89
90 self.sync.acquire()
91 self.packets.append(rcvmsg)
92 self.packet_times.append(rcvtime)
93 self.packets_pending += 1
94 self.packets_total += 1
95 self.sync.release()
96
97 except socket.timeout:
98 print "Socket timeout for " + self.interface_name
99 except socket.error:
100 print "Socket closed for " + self.interface_name
101 if self.running:
102 self.kill()
103 break
104
105 def dequeue(self):
106 """
107 Get the oldest packet in the queue
108 """
109 self.sync.acquire()
110 pkt = self.packets.pop(0)
111 pkt_time = self.packet_times.pop(0)
112 self.packets_pending -= 1
113 self.sync.release()
114 return pkt, pkt_time
115
116 def timestamp_head(self):
117 """
118 Return the timestamp of the head of queue or None if empty
119 """
120 if self.packets_pending:
121 return self.packet_times[0]
122 return None
123
124 def flush(self):
125 """
126 Clear the packet queue
127 """
128 self.sync.acquire()
129 self.packets = []
130 self.packet_times = []
131 self.packets_pending = 0
132 self.sync.release()
133
134
135 def send(self, packet):
136 """
137 Send a packet to the dataplane port
138 @param packet The packet data to send to the port
139 @retval The number of bytes sent
140 """
141 return self.socket.send(packet)
142
143
144 def register(self, handler):
145 """
146 Register a callback function to receive packets from this
147 port. The callback will be passed the packet, the
148 interface name and the port number (if set) on which the
149 packet was received.
150
151 To be implemented
152 """
153 pass
154
Dan Talayco34089522010-02-07 23:07:41 -0800155
156class DataPlane:
157 """
158 Class defining access primitives to the data plane
159 Controls a list of DataPlanePort objects
160 """
161 def __init__(self):
162 self.port_list = {}
163
164 def port_add(self, interface_name, port_number):
165 """
166 Add a port to the dataplane
167 TBD: Max packets for queue?
168 @param interface_name The name of the physical interface like eth1
169 @param port_number The port number used to refer to the port
170 """
171
172 self.port_list[port_number] = DataPlanePort(interface_name)
173 self.port_list[port_number].start()
174
175 def send(self, port_number, packet):
176 """
177 Send a packet to the given port
178 @param port_number The port to send the data to
179 @param packet Raw packet data to send to port
180 """
181 bytes = self.port_list[port_number].send(packet)
182 if bytes != len(packet):
183 print "Unhandled send error, length mismatch %d != %d" % \
184 (bytes, len(packet))
185 return bytes
186
187 def flood(self, packet):
188 """
189 Send a packet to all ports
190 @param packet Raw packet data to send to port
191 """
192 for port_number in self.port_list.keys():
193 bytes = self.port_list[port_number].send(packet)
194 if bytes != len(packet):
195 print "Unhandled send error" + \
196 ", port %d, length mismatch %d != %d" % \
197 (port_number, bytes, len(packet))
198
199 def packet_get(self, port_number=None):
200 """
201 Get a packet from the data plane
202 If port_number is given, get the packet from that port.
203 Otherwise, find the port with the oldest packet and return
204 that packet.
205 @param port_number If set, get packet from this port
206 @retval The triple port_number, packet, pkt_time where packet
207 is received from port_number at time pkt_time.
208 """
209
210 if port_number:
211 if self.port_list[port_number].packets_pending != 0:
212 pkt, time = self.port_list[port_number].dequeue()
213 return port_number, pkt, time
214 else:
215 return None, None, None
216
217 # Find port with oldest packet
218 min_time = 0
219 min_port = -1
220 for port_number in self.port_list.keys():
221 ptime = self.port_list[port_number].timestamp_head()
222 if ptime:
223 if (min_port == -1) or (ptime < min_time):
224 min_time = ptime
225 min_port = port_number
226
227 if min_port == -1:
228 return None, None, None
229
230 pkt, time = self.port_list[min_port].dequeue()
231 return min_port, pkt, time
232
233 def kill(self):
234 for port_number in self.port_list.keys():
235 self.port_list[port_number].kill()
236
237 print "DataPlane shutdown"