blob: e52bfbee13c72d13f0d31371a7987f9f9f15f36a [file] [log] [blame]
Sreeju Sreedhare3fefd92019-04-02 15:57:15 -07001
2# Copyright 2017-present Open Networking Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16
17"""
18OpenFlow Test Framework
19
20Controller class
21
22Provide the interface to the control channel to the switch under test.
23
24Class inherits from thread so as to run in background allowing
25asynchronous callbacks (if needed, not required). Also supports
26polling.
27
28The controller thread maintains a queue. Incoming messages that
29are not handled by a callback function are placed in this queue for
30poll calls.
31
32Callbacks and polling support specifying the message type
33
34@todo Support transaction semantics via xid
35@todo Support select and listen on an administrative socket (or
36use a timeout to support clean shutdown).
37
38Currently only one connection is accepted during the life of
39the controller. There seems
40to be no clean way to interrupt an accept call. Using select that also listens
41on an administrative socket and can shut down the socket might work.
42
43"""
44
45import sys
46import os
47import socket
48import time
49import struct
50import select
51import logging
52from threading import Thread
53from threading import Lock
54from threading import Condition
55
56import ofutils
57import loxi
58
59# Configured openflow version
60import ofp as cfg_ofp
61
62FILTER = ''.join( [ (len( repr( chr( x ) ) ) == 3) and chr( x ) or '.'
63 for x in range( 256 ) ] )
64
65
66def hex_dump_buffer( src, length=16 ):
67 """
68 Convert src to a hex dump string and return the string
69 @param src The source buffer
70 @param length The number of bytes shown in each line
71 @returns A string showing the hex dump
72 """
73 result = [ "\n" ]
74 for i in xrange( 0, len( src ), length ):
75 chars = src[ i:i + length ]
76 hex = ' '.join( [ "%02x" % ord( x ) for x in chars ] )
77 printable = ''.join( [ "%s" % ((ord( x ) <= 127 and
78 FILTER[ ord( x ) ]) or '.') for x in
79 chars ] )
80 result.append( "%04x %-*s %s\n" % (i, length * 3, hex, printable) )
81 return ''.join( result )
82
83
84##@todo Find a better home for these identifiers (controller)
85RCV_SIZE_DEFAULT = 32768
86LISTEN_QUEUE_SIZE = 1
87
88
89class Controller( Thread ):
90 """
91 Class abstracting the control interface to the switch.
92
93 For receiving messages, two mechanism will be implemented. First,
94 query the interface with poll. Second, register to have a
95 function called by message type. The callback is passed the
96 message type as well as the raw packet (or message object)
97
98 One of the main purposes of this object is to translate between network
99 and host byte order. 'Above' this object, things should be in host
100 byte order.
101
102 @todo Consider using SocketServer for listening socket
103 @todo Test transaction code
104
105 @var rcv_size The receive size to use for receive calls
106 @var max_pkts The max size of the receive queue
107 @var keep_alive If true, listen for echo requests and respond w/
108 echo replies
109 @var initial_hello If true, will send a hello message immediately
110 upon connecting to the switch
111 @var switch If not None, do an active connection to the switch
112 @var host The host to use for connect
113 @var port The port to connect on
114 @var packets_total Total number of packets received
115 @var packets_expired Number of packets popped from queue as queue full
116 @var packets_handled Number of packets handled by something
117 @var dbg_state Debug indication of state
118 """
119
120 def __init__( self, switch=None, host='127.0.0.1', port=6653, max_pkts=1024,
121 force=False ):
122 Thread.__init__( self )
123 # Socket related
124 self.rcv_size = RCV_SIZE_DEFAULT
125 self.listen_socket = None
126 self.switch_socket = None
127 self.switch_addr = None
128 self.connect_cv = Condition( )
129 self.message_cv = Condition( )
130 self.tx_lock = Lock( )
131
132 # Used to wake up the event loop from another thread
133 self.waker = ofutils.EventDescriptor( )
134
135 # Counters
136 self.socket_errors = 0
137 self.parse_errors = 0
138 self.packets_total = 0
139 self.packets_expired = 0
140 self.packets_handled = 0
141 self.poll_discards = 0
142
143 # State
144 self.sync = Lock( )
145 self.handlers = { }
146 self.keep_alive = False
147 self.active = True
148 self.initial_hello = True
149
150 # OpenFlow message/packet queue
151 # Protected by the packets_cv lock / condition variable
152 self.packets = [ ]
153 self.packets_cv = Condition( )
154 self.packet_in_count = 0
155
156 # Settings
157 self.max_pkts = max_pkts
158 self.switch = switch
159 self.passive = not self.switch
160 self.force = force
161 self.host = host
162 self.port = port
163 self.dbg_state = "init"
164 self.logger = logging.getLogger( "controller" )
165 self.filter_packet_in = False # Drop "excessive" packet ins
166 self.pkt_in_run = 0 # Count on run of packet ins
167 self.pkt_in_filter_limit = 50 # Count on run of packet ins
168 self.pkt_in_dropped = 0 # Total dropped packet ins
169 self.transact_to = 15 # Transact timeout default value; add to config
170
171 # Transaction and message type waiting variables
172 # xid_cv: Condition variable (semaphore) for packet waiters
173 # xid: Transaction ID being waited on
174 # xid_response: Transaction response message
175 self.xid_cv = Condition( )
176 self.xid = None
177 self.xid_response = None
178
179 self.buffered_input = ""
180
181 # Create listen socket
182 if self.passive:
183 self.logger.info( "Create/listen at " + self.host + ":" +
184 str( self.port ) )
185 ai = socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC,
186 socket.SOCK_STREAM, 0, socket.AI_PASSIVE )
187 # Use first returned addrinfo
188 (family, socktype, proto, name, sockaddr) = ai[ 0 ]
189 self.listen_socket = socket.socket( family, socktype )
190 self.listen_socket.setsockopt( socket.SOL_SOCKET,
191 socket.SO_REUSEADDR, 1 )
192 self.listen_socket.bind( sockaddr )
193 self.listen_socket.listen( LISTEN_QUEUE_SIZE )
194
195 def filter_packet( self, rawmsg, hdr ):
196 """
197 Check if packet should be filtered
198
199 Currently filters packet in messages
200 @return Boolean, True if packet should be dropped
201 """
202 # XXX didn't actually check for packet-in...
203 return False
204 # Add check for packet in and rate limit
205 if self.filter_packet_in:
206 # If we were dropping packets, report number dropped
207 # TODO dont drop expected packet ins
208 if self.pkt_in_run > self.pkt_in_filter_limit:
209 self.logger.debug( "Dropped %d packet ins (%d total)"
210 % ((self.pkt_in_run -
211 self.pkt_in_filter_limit),
212 self.pkt_in_dropped) )
213 self.pkt_in_run = 0
214
215 return False
216
217 def _pkt_handle( self, pkt ):
218 """
219 Check for all packet handling conditions
220
221 Parse and verify message
222 Check if XID matches something waiting
223 Check if message is being expected for a poll operation
224 Check if keep alive is on and message is an echo request
225 Check if any registered handler wants the packet
226 Enqueue if none of those conditions is met
227
228 an echo request in case keep_alive is true, followed by
229 registered message handlers.
230 @param pkt The raw packet (string) which may contain multiple OF msgs
231 """
232
233 # snag any left over data from last read()
234 pkt = self.buffered_input + pkt
235 self.buffered_input = ""
236
237 # Process each of the OF msgs inside the pkt
238 offset = 0
239 while offset < len( pkt ):
240 if offset + 8 > len( pkt ):
241 break
242
243 # Parse the header to get type
244 hdr_version, hdr_type, hdr_length, hdr_xid = cfg_ofp.message.parse_header(
245 pkt[ offset: ] )
246
247 # Use loxi to resolve to ofp of matching version
248 ofp = loxi.protocol( hdr_version )
249
250 # Extract the raw message bytes
251 if (offset + hdr_length) > len( pkt ):
252 break
253 rawmsg = pkt[ offset: offset + hdr_length ]
254 offset += hdr_length
255
256 # if self.filter_packet(rawmsg, hdr):
257 # continue
258
259 msg = ofp.message.parse_message( rawmsg )
260 if not msg:
261 self.parse_errors += 1
262 self.logger.warn( "Could not parse message" )
263 continue
264
265 self.logger.debug( "Msg in: version %d class %s len %d xid %d",
266 hdr_version, type( msg ).__name__, hdr_length,
267 hdr_xid )
268
269 with self.sync:
270 # Check if transaction is waiting
271 with self.xid_cv:
272 if self.xid and hdr_xid == self.xid:
273 self.logger.debug(
274 "Matched expected XID " + str( hdr_xid ) )
275 self.xid_response = (msg, rawmsg)
276 self.xid = None
277 self.xid_cv.notify( )
278 continue
279
280 # Check if keep alive is set; if so, respond to echo requests
281 if self.keep_alive:
282 if hdr_type == ofp.OFPT_ECHO_REQUEST:
283 self.logger.debug( "Responding to echo request" )
284 rep = ofp.message.echo_reply( )
285 rep.xid = hdr_xid
286 # Ignoring additional data
287 self.message_send( rep )
288 continue
289
290 # Generalize to counters for all packet types?
291 if msg.type == ofp.OFPT_PACKET_IN:
292 self.packet_in_count += 1
293
294 # Log error messages
295 if isinstance( msg, ofp.message.error_msg ):
296 # pylint: disable=E1103
297 if msg.err_type in ofp.ofp_error_type_map:
298 type_str = ofp.ofp_error_type_map[ msg.err_type ]
299 if msg.err_type == ofp.OFPET_HELLO_FAILED:
300 code_map = ofp.ofp_hello_failed_code_map
301 elif msg.err_type == ofp.OFPET_BAD_REQUEST:
302 code_map = ofp.ofp_bad_request_code_map
303 elif msg.err_type == ofp.OFPET_BAD_ACTION:
304 code_map = ofp.ofp_bad_action_code_map
305 elif msg.err_type == ofp.OFPET_FLOW_MOD_FAILED:
306 code_map = ofp.ofp_flow_mod_failed_code_map
307 elif msg.err_type == ofp.OFPET_PORT_MOD_FAILED:
308 code_map = ofp.ofp_port_mod_failed_code_map
309 elif msg.err_type == ofp.OFPET_QUEUE_OP_FAILED:
310 code_map = ofp.ofp_queue_op_failed_code_map
311 else:
312 code_map = None
313
314 if code_map and msg.code in code_map:
315 code_str = code_map[ msg.code ]
316 else:
317 code_str = "unknown"
318 else:
319 type_str = "unknown"
320 code_str = "unknown"
321 self.logger.warn(
322 "Received error message: xid=%d type=%s (%d) code=%s (%d)",
323 hdr_xid, type_str, msg.err_type, code_str,
324 msg.code )
325
326 # Now check for message handlers; preference is given to
327 # handlers for a specific packet
328 handled = False
329 if hdr_type in self.handlers.keys( ):
330 handled = self.handlers[ hdr_type ]( self, msg, rawmsg )
331 if not handled and ("all" in self.handlers.keys( )):
332 handled = self.handlers[ "all" ]( self, msg, rawmsg )
333
334 if not handled: # Not handled, enqueue
335 with self.packets_cv:
336 if len( self.packets ) >= self.max_pkts:
337 self.packets.pop( 0 )
338 self.packets_expired += 1
339 self.packets.append( (msg, rawmsg) )
340 self.packets_cv.notify_all( )
341 self.packets_total += 1
342 else:
343 self.packets_handled += 1
344 self.logger.debug( "Message handled by callback" )
345
346 # end of 'while offset < len(pkt)'
347 # note that if offset = len(pkt), this is
348 # appends a harmless empty string
349 self.buffered_input += pkt[ offset: ]
350
351 def _socket_ready_handle( self, s ):
352 """
353 Handle an input-ready socket
354
355 @param s The socket object that is ready
356 @returns 0 on success, -1 on error
357 """
358
359 if self.passive and s and s == self.listen_socket:
360 if self.switch_socket:
361 self.logger.warning(
362 "Ignoring incoming connection; already connected to switch" )
363 (sock, addr) = self.listen_socket.accept( )
364 sock.close( )
365 return 0
366
367 try:
368 (sock, addr) = self.listen_socket.accept( )
369 except:
370 self.logger.warning( "Error on listen socket accept" )
371 return -1
372 self.logger.info( self.host + ":" + str(
373 self.port ) + ": Incoming connection from " + str( addr ) )
374
375 with self.connect_cv:
376 (self.switch_socket, self.switch_addr) = (sock, addr)
377 self.switch_socket.setsockopt( socket.IPPROTO_TCP,
378 socket.TCP_NODELAY, True )
379 if self.initial_hello:
380 self.message_send( cfg_ofp.message.hello( ) )
381 self.connect_cv.notify( ) # Notify anyone waiting
382
383 # Prevent further connections
384 self.listen_socket.close( )
385 self.listen_socket = None
386 elif s and s == self.switch_socket:
387 for idx in range( 3 ): # debug: try a couple of times
388 try:
389 pkt = self.switch_socket.recv( self.rcv_size )
390 except:
391 self.logger.warning( "Error on switch read" )
392 return -1
393
394 if not self.active:
395 return 0
396
397 if len( pkt ) == 0:
398 self.logger.warning( "Zero-length switch read, %d" % idx )
399 else:
400 break
401
402 if len( pkt ) == 0: # Still no packet
403 self.logger.warning( "Zero-length switch read; closing cxn" )
404 self.logger.info( str( self ) )
405 return -1
406
407 self._pkt_handle( pkt )
408 elif s and s == self.waker:
409 self.waker.wait( )
410 else:
411 self.logger.error( "Unknown socket ready: " + str( s ) )
412 return -1
413
414 return 0
415
416 def active_connect( self ):
417 """
418 Actively connect to a switch IP addr
419 """
420 try:
421 self.logger.info( "Trying active connection to %s" % self.switch )
422 soc = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
423 soc.connect( (self.switch, self.port) )
424 self.logger.info( "Connected to " + self.switch + " on " +
425 str( self.port ) )
426 soc.setsockopt( socket.IPPROTO_TCP, socket.TCP_NODELAY, True )
427 self.switch_addr = (self.switch, self.port)
428 return soc
429 except (StandardError, socket.error), e:
430 self.logger.error( "Could not connect to %s at %d:: %s" %
431 (self.switch, self.port, str( e )) )
432 return None
433
434 def wakeup( self ):
435 """
436 Wake up the event loop, presumably from another thread.
437 """
438 self.waker.notify( )
439
440 def sockets( self ):
441 """
442 Return list of sockets to select on.
443 """
444 socs = [ self.listen_socket, self.switch_socket, self.waker ]
445 return [ x for x in socs if x ]
446
447 def run( self ):
448 """
449 Activity function for class
450
451 Assumes connection to switch already exists. Listens on
452 switch_socket for messages until an error (or zero len pkt)
453 occurs.
454
455 When there is a message on the socket, check for handlers; queue the
456 packet if no one handles the packet.
457
458 See note for controller describing the limitation of a single
459 connection for now.
460 """
461
462 self.dbg_state = "running"
463
464 while self.active:
465 try:
466 sel_in, sel_out, sel_err = \
467 select.select( self.sockets( ), [ ], self.sockets( ), 1 )
468 except:
469 print sys.exc_info( )
470 self.logger.error( "Select error, disconnecting" )
471 self.disconnect( )
472
473 for s in sel_err:
474 self.logger.error(
475 "Got socket error on: " + str( s ) + ", disconnecting" )
476 self.disconnect( )
477
478 for s in sel_in:
479 if self._socket_ready_handle( s ) == -1:
480 self.disconnect( )
481
482 # End of main loop
483 self.dbg_state = "closing"
484 self.logger.info( "Exiting controller thread" )
485 self.shutdown( )
486
487 def connect( self, timeout=-1 ):
488 """
489 Connect to the switch
490
491 @param timeout Block for up to timeout seconds. Pass -1 for the default.
492 @return Boolean, True if connected
493 """
494
495 if not self.passive: # Do active connection now
496 self.logger.info( "Attempting to connect to %s on port %s" %
497 (self.switch, str( self.port )) )
498 soc = self.active_connect( )
499 if soc:
500 self.logger.info( "Connected to %s", self.switch )
501 self.dbg_state = "running"
502 self.switch_socket = soc
503 self.wakeup( )
504 with self.connect_cv:
505 if self.initial_hello:
506 self.message_send( cfg_ofp.message.hello( ) )
507 self.connect_cv.notify( ) # Notify anyone waiting
508 else:
509 self.logger.error( "Could not actively connect to switch %s",
510 self.switch )
511 self.active = False
512 else:
513 with self.connect_cv:
514 ofutils.timed_wait( self.connect_cv, lambda: self.switch_socket,
515 timeout=timeout )
516
517 return self.switch_socket is not None
518
519 def disconnect( self, timeout=-1 ):
520 """
521 If connected to a switch, disconnect.
522 """
523 if self.switch_socket:
524 self.switch_socket.close( )
525 self.switch_socket = None
526 self.switch_addr = None
527 with self.packets_cv:
528 self.packets = [ ]
529 with self.connect_cv:
530 self.connect_cv.notifyAll( )
531
532 def wait_disconnected( self, timeout=-1 ):
533 """
534 @param timeout Block for up to timeout seconds. Pass -1 for the default.
535 @return Boolean, True if disconnected
536 """
537
538 with self.connect_cv:
539 ofutils.timed_wait( self.connect_cv,
540 lambda: True if not self.switch_socket else None,
541 timeout=timeout )
542 return self.switch_socket is None
543
544 def kill( self ):
545 """
546 Force the controller thread to quit
547 """
548 self.active = False
549 self.wakeup( )
550 self.join( )
551
552 def shutdown( self ):
553 """
554 Shutdown the controller closing all sockets
555
556 @todo Might want to synchronize shutdown with self.sync...
557 """
558
559 self.active = False
560 try:
561 self.switch_socket.shutdown( socket.SHUT_RDWR )
562 except:
563 self.logger.info( "Ignoring switch soc shutdown error" )
564 self.switch_socket = None
565
566 try:
567 self.listen_socket.shutdown( socket.SHUT_RDWR )
568 except:
569 self.logger.info( "Ignoring listen soc shutdown error" )
570 self.listen_socket = None
571
572 # Wakeup condition variables on which controller may be wait
573 with self.xid_cv:
574 self.xid_cv.notifyAll( )
575
576 with self.connect_cv:
577 self.connect_cv.notifyAll( )
578
579 self.wakeup( )
580 self.dbg_state = "down"
581
582 def register( self, msg_type, handler ):
583 """
584 Register a callback to receive a specific message type.
585
586 Only one handler may be registered for a given message type.
587
588 WARNING: A lock is held during the handler call back, so
589 the handler should not make any blocking calls
590
591 @param msg_type The type of message to receive. May be DEFAULT
592 for all non-handled packets. The special type, the string "all"
593 will send all packets to the handler.
594 @param handler The function to call when a message of the given
595 type is received.
596 """
597 # Should check type is valid
598 if not handler and msg_type in self.handlers.keys( ):
599 del self.handlers[ msg_type ]
600 return
601 self.handlers[ msg_type ] = handler
602
603 def poll( self, exp_msg=None, timeout=-1 ):
604 """
605 Wait for the next OF message received from the switch.
606
607 @param exp_msg If set, return only when this type of message
608 is received (unless timeout occurs).
609
610 @param timeout Maximum number of seconds to wait for the message.
611 Pass -1 for the default timeout.
612
613 @retval A pair (msg, pkt) where msg is a message object and pkt
614 the string representing the packet as received from the socket.
615 This allows additional parsing by the receiver if necessary.
616
617 The data members in the message are in host endian order.
618 If an error occurs, (None, None) is returned
619 """
620
621 if exp_msg is None:
622 self.logger.warn( "DEPRECATED polling for any message class" )
623 klass = None
624 elif isinstance( exp_msg, int ):
625 klass = cfg_ofp.message.message.subtypes[ exp_msg ]
626 elif issubclass( exp_msg, loxi.OFObject ):
627 klass = exp_msg
628 else:
629 raise ValueError( "Unexpected exp_msg argument %r" % exp_msg )
630
631 self.logger.debug( "Polling for %s", klass.__name__ )
632
633 # Take the packet from the queue
634 def grab( ):
635 for i, (msg, pkt) in enumerate( self.packets ):
636 if klass is None or isinstance( msg, klass ):
637 self.logger.debug( "Got %s message",
638 msg.__class__.__name__ )
639 return self.packets.pop( i )
640 # Not found
641 self.logger.debug( "%s message not in queue", klass.__name__ )
642 return None
643
644 with self.packets_cv:
645 ret = ofutils.timed_wait( self.packets_cv, grab, timeout=timeout )
646
647 if ret != None:
648 (msg, pkt) = ret
649 return (msg, pkt)
650 else:
651 return (None, None)
652
653 def transact( self, msg, timeout=-1 ):
654 """
655 Run a message transaction with the switch
656
657 Send the message in msg and wait for a reply with a matching
658 transaction id. Transactions have the highest priority in
659 received message handling.
660
661 @param msg The message object to send; must not be a string
662 @param timeout The timeout in seconds; if -1 use default.
663 """
664
665 if msg.xid == None:
666 msg.xid = ofutils.gen_xid( )
667
668 self.logger.debug( "Running transaction %d" % msg.xid )
669
670 with self.xid_cv:
671 if self.xid:
672 self.logger.error( "Can only run one transaction at a time" )
673 return (None, None)
674
675 self.xid = msg.xid
676 self.xid_response = None
677 self.message_send( msg )
678
679 self.logger.debug( "Waiting for transaction %d" % msg.xid )
680 ofutils.timed_wait( self.xid_cv, lambda: self.xid_response,
681 timeout=timeout )
682
683 if self.xid_response:
684 (resp, pkt) = self.xid_response
685 self.xid_response = None
686 else:
687 (resp, pkt) = (None, None)
688
689 if resp is None:
690 self.logger.warning( "No response for xid " + str( self.xid ) )
691 return (resp, pkt)
692
693 def message_send( self, msg ):
694 """
695 Send the message to the switch
696
697 @param msg A string or OpenFlow message object to be forwarded to
698 the switch.
699 """
700 if not self.switch_socket:
701 # Sending a string indicates the message is ready to go
702 raise Exception( "no socket" )
703
704 if msg.xid == None:
705 msg.xid = ofutils.gen_xid( )
706
707 outpkt = msg.pack( )
708
709 self.logger.debug( "Msg out: version %d class %s len %d xid %d",
710 msg.version, type( msg ).__name__, len( outpkt ),
711 msg.xid )
712
713 with self.tx_lock:
714 if self.switch_socket.sendall( outpkt ) is not None:
715 raise AssertionError( "failed to send message to switch" )
716
717 return 0 # for backwards compatibility
718
719 def clear_queue( self ):
720 """
721 Clear the input queue and report the number of messages
722 that were in it
723 """
724 enqueued_pkt_count = len( self.packets )
725 with self.packets_cv:
726 self.packets = [ ]
727 return enqueued_pkt_count
728
729 def __str__( self ):
730 string = "Controller:\n"
731 string += " state " + self.dbg_state + "\n"
732 string += " switch_addr " + str( self.switch_addr ) + "\n"
733 string += " pending pkts " + str( len( self.packets ) ) + "\n"
734 string += " total pkts " + str( self.packets_total ) + "\n"
735 string += " expired pkts " + str( self.packets_expired ) + "\n"
736 string += " handled pkts " + str( self.packets_handled ) + "\n"
737 string += " poll discards " + str( self.poll_discards ) + "\n"
738 string += " parse errors " + str( self.parse_errors ) + "\n"
739 string += " sock errrors " + str( self.socket_errors ) + "\n"
740 string += " max pkts " + str( self.max_pkts ) + "\n"
741 string += " target switch " + str( self.switch ) + "\n"
742 string += " host " + str( self.host ) + "\n"
743 string += " port " + str( self.port ) + "\n"
744 string += " keep_alive " + str( self.keep_alive ) + "\n"
745 string += " pkt_in_run " + str( self.pkt_in_run ) + "\n"
746 string += " pkt_in_dropped " + str( self.pkt_in_dropped ) + "\n"
747 return string
748
749 def show( self ):
750 print str( self )
751
752
753def sample_handler( controller, msg, pkt ):
754 """
755 Sample message handler
756
757 This is the prototype for functions registered with the controller
758 class for packet reception
759
760 @param controller The controller calling the handler
761 @param msg The parsed message object
762 @param pkt The raw packet that was received on the socket. This is
763 in case the packet contains extra unparsed data.
764 @returns Boolean value indicating if the packet was handled. If
765 not handled, the packet is placed in the queue for pollers to received
766 """
767 pass