blob: a9411112b2ea03d781aeecd6e8e37129970afc01 [file] [log] [blame]
Stephane Barbarie6e1bd502018-11-05 22:44:45 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import structlog
17from hexdump import hexdump
18from twisted.internet import protocol
19
20import loxi.of14
William Kurkianfc0dcda2019-04-08 16:54:36 -040021from pyvoltha.common.utils.message_queue import MessageQueue
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050022
23log = structlog.get_logger()
24
25
26class OpenFlowConnection(protocol.Protocol):
27
28 def __init__(self, agent):
29 self.agent = agent # the protocol will call agent.enter_disconnected()
30 # and agent.enter_connected() methods to indicate
31 # when state change is necessary
32 self.next_xid = 1
33 self.read_buffer = None
34 self.rx = MessageQueue()
35
36 def connectionLost(self, reason):
37 self.agent.enter_disconnected('connection-lost', reason)
38
39 def connectionMade(self):
40 self.agent.enter_connected()
41
42 def dataReceived(self, data):
43 log.debug('data-received', len=len(data),
44 received=hexdump(data, result='return'))
45
46 assert len(data) # connection close shall be handled by the protocol
47 buf = self.read_buffer
48 if buf:
49 buf += data
50 else:
51 buf = data
52
53 offset = 0
54 while offset < len(buf):
55 if offset + 8 > len(buf):
56 break # not enough data for the OpenFlow header
57
58 # parse the header to get type
59 _version, _type, _len, _xid = \
60 loxi.of14.message.parse_header(buf[offset:])
61
62 ofp = loxi.protocol(_version)
63
64 if (offset + _len) > len(buf):
65 break # not enough data to cover whole message
66
67 rawmsg = buf[offset : offset + _len]
68 offset += _len
69
70 msg = ofp.message.parse_message(rawmsg)
71 if not msg:
72 log.warn('could-not-parse',
73 data=hexdump(rawmsg, result='return'))
74 log.debug('received-msg', module=type(msg).__module__,
75 name=type(msg).__name__, xid=msg.xid, len=len(buf))
76 self.rx.put(msg)
77
78 if offset == len(buf):
79 self.read_buffer = None
80 else:
81 self.read_buffer = buf[offset:]
82 log.debug('remaining', len=len(self.read_buffer))
83
84 def send_raw(self, buf):
85 """
86 Send raw bytes on the socket
87 :param buf: bytes buffer
88 :return: None
89 """
90 assert self.connected
91 log.debug('sending-raw', len=len(buf))
92 self.transport.write(buf)
93
94 def send(self, msg):
95 """
96 Send a message
97 :param msg: An OpenFlow protocol message
98 :return: None
99 """
100 assert self.connected
101
102 if msg.xid is None:
103 msg.xid = self._gen_xid()
104 buf = msg.pack()
105 log.debug('sending', module=type(msg).__module__,
106 name=type(msg).__name__, xid=msg.xid, len=len(buf))
107 self.transport.write(buf)
108 log.debug('data-sent', sent=hexdump(buf, result='return'))
109
110 def recv(self, predicate):
111 assert self.connected
112 return self.rx.get(predicate)
113
114 def recv_any(self):
115 return self.recv(lambda _: True)
116
117 def recv_xid(self, xid):
118 return self.recv(lambda msg: msg.xid == xid)
119
120 def recv_class(self, klass):
121 return self.recv(lambda msg: isinstance(msg, klass))
122
123 def _gen_xid(self):
124 xid = self.next_xid
125 self.next_xid += 1
126 return xid