Merge branch 'master' of yuba:/usr/local/git/openflow-projects/oftest
diff --git a/src/python/oftest/controller.py b/src/python/oftest/controller.py
index 99a6ffb..9c0839e 100644
--- a/src/python/oftest/controller.py
+++ b/src/python/oftest/controller.py
@@ -41,7 +41,7 @@
import logging
##@todo Find a better home for these identifiers (controller)
-RCV_SIZE_DEFAULT = 4096
+RCV_SIZE_DEFAULT = 32768
LISTEN_QUEUE_SIZE = 1
class Controller(Thread):
@@ -143,93 +143,104 @@
an echo request in case keep_alive is true, followed by
registered message handlers.
- @param pkt The raw packet (string)
+ @param pkt The raw packet (string) which may contain multiple OF msgs
"""
- # Parse the header to get type
- hdr = of_header_parse(pkt)
- if not hdr:
- self.logger.info("Could not parse header, pkt len", len(pkt))
- self.parse_errors += 1
- return
-
- self.logger.debug("Msg in: len %d. type %s. hdr.len %d" %
- (len(pkt), ofp_type_map[hdr.type], hdr.length))
- if hdr.version != OFP_VERSION:
- self.logger.error("Version %d does not match OFTest version %d"
- % (hdr.version, OFP_VERSION))
- print "Version %d does not match OFTest version %d" % \
- (hdr.version, OFP_VERSION)
- self.active = False
- self.switch_socket = None
- self.kill()
-
- msg = of_message_parse(pkt)
- if not msg:
- self.parse_errors += 1
- self.logger.warn("Could not parse message")
- return
-
- self.sync.acquire()
-
- # Check if transaction is waiting
- self.xid_cv.acquire()
- if self.xid:
- if hdr.xid == self.xid:
- self.logger.debug("Matched expected XID " + str(hdr.xid))
- self.xid_response = (msg, pkt)
- self.xid = None
- self.xid_cv.notify()
- self.xid_cv.release()
- self.sync.release()
+ # Process each of the OF msgs inside the pkt
+ offset = 0
+ while offset < len(pkt):
+ # Parse the header to get type
+ hdr = of_header_parse(pkt[offset:])
+ if not hdr:
+ self.logger.info("Could not parse header, pkt len", len(pkt))
+ self.parse_errors += 1
return
- self.xid_cv.release()
-
- # PREVENT QUEUE ACCESS AT THIS POINT?
- # Check if anyone waiting on this type of message
- self.expect_msg_cv.acquire()
- if self.expect_msg:
- if not self.expect_msg_type or (self.expect_msg_type == hdr.type):
- self.logger.debug("Matched expected msg type "
- + ofp_type_map[hdr.type])
- self.expect_msg_response = (msg, pkt)
- self.expect_msg = False
- self.expect_msg_cv.notify()
- self.expect_msg_cv.release()
- self.sync.release()
- return
- self.expect_msg_cv.release()
-
- # Check if keep alive is set; if so, respond to echo requests
- if self.keep_alive:
- if hdr.type == OFPT_ECHO_REQUEST:
- self.sync.release()
- self.logger.debug("Responding to echo request")
- rep = echo_reply()
- rep.header.xid = hdr.xid
- # Ignoring additional data
- self.message_send(rep.pack(), zero_xid=True)
+ if hdr.length == 0:
+ self.logger.info("Header length is zero")
+ self.parse_errors += 1
return
- # Now check for message handlers; preference is given to
- # handlers for a specific packet
- handled = False
- if hdr.type in self.handlers.keys():
- handled = self.handlers[hdr.type](self, msg, pkt)
- if not handled and ("all" in self.handlers.keys()):
- handled = self.handlers["all"](self, msg, pkt)
+ # Extract the raw message bytes
+ rawmsg = pkt[offset : offset + hdr.length]
- if not handled: # Not handled, enqueue
- self.logger.debug("Enqueuing pkt type " + ofp_type_map[hdr.type])
- if len(self.packets) >= self.max_pkts:
- self.packets.pop(0)
- self.packets_expired += 1
- self.packets.append((msg, pkt))
- self.packets_total += 1
- else:
- self.packets_handled += 1
- self.logger.debug("Message handled by callback")
+ self.logger.debug("Msg in: len %d. offset %d. type %s. hdr.len %d" %
+ (len(pkt), offset, ofp_type_map[hdr.type], hdr.length))
+ if hdr.version != OFP_VERSION:
+ self.logger.error("Version %d does not match OFTest version %d"
+ % (hdr.version, OFP_VERSION))
+ print "Version %d does not match OFTest version %d" % \
+ (hdr.version, OFP_VERSION)
+ self.active = False
+ self.switch_socket = None
+ self.kill()
- self.sync.release()
+ msg = of_message_parse(rawmsg)
+ if not msg:
+ self.parse_errors += 1
+ self.logger.warn("Could not parse message")
+ continue
+
+ self.sync.acquire()
+
+ # Check if transaction is waiting
+ self.xid_cv.acquire()
+ if self.xid:
+ if hdr.xid == self.xid:
+ self.logger.debug("Matched expected XID " + str(hdr.xid))
+ self.xid_response = (msg, rawmsg)
+ self.xid = None
+ self.xid_cv.notify()
+ self.xid_cv.release()
+ self.sync.release()
+ continue
+ self.xid_cv.release()
+
+ # PREVENT QUEUE ACCESS AT THIS POINT?
+ # Check if anyone waiting on this type of message
+ self.expect_msg_cv.acquire()
+ if self.expect_msg:
+ if not self.expect_msg_type or (self.expect_msg_type == hdr.type):
+ self.logger.debug("Matched expected msg type "
+ + ofp_type_map[hdr.type])
+ self.expect_msg_response = (msg, rawmsg)
+ self.expect_msg = False
+ self.expect_msg_cv.notify()
+ self.expect_msg_cv.release()
+ self.sync.release()
+ continue
+ self.expect_msg_cv.release()
+
+ # Check if keep alive is set; if so, respond to echo requests
+ if self.keep_alive:
+ if hdr.type == OFPT_ECHO_REQUEST:
+ self.sync.release()
+ self.logger.debug("Responding to echo request")
+ rep = echo_reply()
+ rep.header.xid = hdr.xid
+ # Ignoring additional data
+ self.message_send(rep.pack(), zero_xid=True)
+ continue
+
+ # Now check for message handlers; preference is given to
+ # handlers for a specific packet
+ handled = False
+ if hdr.type in self.handlers.keys():
+ handled = self.handlers[hdr.type](self, msg, rawmsg)
+ if not handled and ("all" in self.handlers.keys()):
+ handled = self.handlers["all"](self, msg, rawmsg)
+
+ if not handled: # Not handled, enqueue
+ self.logger.debug("Enqueuing pkt type " + ofp_type_map[hdr.type])
+ if len(self.packets) >= self.max_pkts:
+ self.packets.pop(0)
+ self.packets_expired += 1
+ self.packets.append((msg, rawmsg))
+ self.packets_total += 1
+ else:
+ self.packets_handled += 1
+ self.logger.debug("Message handled by callback")
+
+ self.sync.release()
+ offset += hdr.length
def _socket_ready_handle(self, s):
"""