blob: bc6f86270450484f0c80498d5602437f9f302845 [file] [log] [blame]
Rich Lane17215b32012-10-26 18:01:12 -07001"""
2Dummy platform
3
4This platform uses Open vSwitch dummy interfaces.
5"""
6
7import logging
8import os
9import select
10import socket
11import struct
12import sys
13import time
14from threading import Thread
15from threading import Lock
16
17RCV_TIMEOUT = 10000
18RUN_DIR = os.environ.get("OVS_RUNDIR", "/var/run/openvswitch")
19
20class DataPlanePortOVSDummy(Thread):
21 """
22 Class defining a port monitoring object that uses Unix domain
23 sockets for ports, intended for connecting to Open vSwitch "dummy"
24 netdevs.
25 """
26
27 def __init__(self, interface_name, port_number, parent, max_pkts=1024):
28 """
29 Set up a port monitor object
30 @param interface_name The name of the physical interface like eth1
31 @param port_number The port number associated with this port
32 @param parent The controlling dataplane object; for pkt wait CV
33 @param max_pkts Maximum number of pkts to keep in queue
34 """
35 Thread.__init__(self)
36 self.interface_name = interface_name
37 self.max_pkts = max_pkts
38 self.packets_total = 0
39 self.packets = []
40 self.packets_discarded = 0
41 self.port_number = port_number
42 self.txq = []
43 self.txq_lock = Lock()
44 logname = "dp-" + interface_name
45 self.logger = logging.getLogger(logname)
46 try:
47 self.socket = DataPlanePortOVSDummy.interface_open(interface_name)
48 except:
49 self.logger.info("Could not open socket")
50 raise
51 self.logger.info("Opened port monitor (class %s)", type(self).__name__)
52 self.parent = parent
53
54 @staticmethod
55 def interface_open(interface_name):
56 """
57 Open a Unix domain socket interface.
58 @param interface_name port name as a string such as 'eth1'
59 @retval s socket
60 """
61 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
62 s.settimeout(RCV_TIMEOUT)
63 s.setblocking(0)
64 s.connect("%s/%s" % (RUN_DIR, interface_name))
65 return s
66
67 def run(self):
68 """
69 Activity function for class
70 """
71 self.running = True
72 rxbuf = ""
73 while self.running:
74 try:
75 self.txq_lock.acquire()
76 if self.txq:
77 wlist = [self.socket]
78 else:
79 wlist = []
80 self.txq_lock.release()
81
82 rout, wout, eout = select.select([self.socket], wlist, [], 1)
83 except:
84 print sys.exc_info()
85 self.logger.error("Select error, exiting")
86 break
87
88 if not self.running:
89 break
90
91 if wout:
92 self.txq_lock.acquire()
93 if self.txq:
94 retval = self.socket.send(self.txq[0])
95 if retval > 0:
96 self.txq[0] = self.txq[0][retval:]
97 if len(self.txq[0]) == 0:
98 self.txq = self.txq[1:]
99 self.txq_lock.release()
100
101 if rout:
102 if len(rxbuf) < 2:
103 n = 2 - len(rxbuf)
104 else:
105 frame_len = struct.unpack('>h', rxbuf[:2])[0]
106 n = (2 + frame_len) - len(rxbuf)
107
108 data = self.socket.recv(n)
109 rxbuf += data
110 if len(data) == n and len(rxbuf) > 2:
111 rcvtime = time.clock()
112 self.logger.debug("Pkt len " + str(len(rxbuf)) +
113 " in at " + str(rcvtime) + " on port " +
114 str(self.port_number))
115
116 # Enqueue packet
117 with self.parent.pkt_sync:
118 if len(self.packets) >= self.max_pkts:
119 # Queue full, throw away oldest
120 self.packets.pop(0)
121 self.packets_discarded += 1
122 self.logger.debug("Discarding oldest packet to make room")
123 self.packets.append((rxbuf[2:], rcvtime))
124 self.packets_total += 1
125 self.parent.pkt_sync.notify_all()
126
127 rxbuf = ''
128
129 self.logger.info("Thread exit")
130
131 def kill(self):
132 """
133 Terminate the running thread
134 """
135 self.logger.debug("Port monitor kill")
136 self.running = False
137 try:
138 self.socket.close()
139 except:
140 self.logger.info("Ignoring dataplane soc shutdown error")
141
142 def timestamp_head(self):
143 """
144 Return the timestamp of the head of queue or None if empty
145 """
146 rv = None
147 try:
148 rv = self.packets[0][1]
149 except:
150 rv = None
151 return rv
152
153 def flush(self):
154 """
155 Clear the packet queue
156 """
157 with self.parent.pkt_sync:
158 self.packets_discarded += len(self.packets)
159 self.packets = []
160
161 def send(self, packet):
162 """
163 Send a packet to the dataplane port
164 @param packet The packet data to send to the port
165 @retval The number of bytes sent
166 """
167
168 self.txq_lock.acquire()
169 if len(self.txq) < self.max_pkts:
170 self.txq.append(struct.pack('>h', len(packet)) + packet)
171 retval = len(packet)
172 else:
173 retval = 0
174 self.txq_lock.release()
175
176 return retval
177
178 def register(self, handler):
179 """
180 Register a callback function to receive packets from this
181 port. The callback will be passed the packet, the
182 interface name and the port number (if set) on which the
183 packet was received.
184
185 To be implemented
186 """
187 pass
188
189 def show(self, prefix=''):
190 print prefix + "Name: " + self.interface_name
191 print prefix + "Pkts pending: " + str(len(self.packets))
192 print prefix + "Pkts total: " + str(self.packets_total)
193 print prefix + "socket: " + str(self.socket)
194
195# Update this dictionary to suit your environment.
196dummy_port_map = {
197 1 : "p1",
198 2 : "p2",
199 3 : "p3",
200 4 : "p4"
201}
202
203def platform_config_update(config):
204 """
205 Update configuration for the dummy platform
206
207 @param config The configuration dictionary to use/update
208 """
209 global dummy_port_map
210 config["port_map"] = dummy_port_map.copy()
211 config["caps_table_idx"] = 0
212 config["dataplane"] = {"portclass": DataPlanePortOVSDummy}
213 config["allow_user"] = True