blob: cabcc6c0fad5a7fd1cad82d4ca0a81687b71e046 [file] [log] [blame]
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import sys
18
19import structlog
20from twisted.internet import protocol
21from twisted.internet import reactor
22from twisted.internet.defer import Deferred, inlineCallbacks
23
24import loxi.of13 as of13
25from common.utils.asleep import asleep
26from of_connection import OpenFlowConnection
27from of_protocol_handler import OpenFlowProtocolHandler
28
29
30log = structlog.get_logger()
31
32
33class Agent(protocol.ClientFactory):
34
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070035 def __init__(self,
36 controller_endpoint,
37 datapath_id,
38 device_id,
39 rpc_stub,
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070040 conn_retry_interval=1):
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070041
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070042 self.controller_endpoint = controller_endpoint
43 self.datapath_id = datapath_id
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070044 self.device_id = device_id
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070045 self.rpc_stub = rpc_stub
46 self.retry_interval = conn_retry_interval
47
48 self.running = False
49 self.connector = None # will be a Connector instance once connected
50 self.d_disconnected = None # a deferred to signal reconnect loop when
51 # TCP connection is lost
52 self.connected = False
53 self.exiting = False
54
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070055 def get_device_id(self):
56 return self.device_id
57
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070058 def start(self):
59 log.debug('starting')
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070060 if self.running:
61 return
62 self.running = True
63 reactor.callLater(0, self.keep_connected)
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070064 log.info('started')
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070065 return self
66
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070067 def stop(self):
68 log.debug('stopping')
69 self.connected = False
70 self.exiting = True
71 self.connector.disconnect()
72 log.info('stopped')
73
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070074 def resolve_endpoint(self, endpoint):
75 # TODO allow resolution via consul
76 host, port = endpoint.split(':', 2)
77 return host, int(port)
78
79 @inlineCallbacks
80 def keep_connected(self):
81 """Keep reconnecting to the controller"""
82 while not self.exiting:
83 host, port = self.resolve_endpoint(self.controller_endpoint)
84 log.info('connecting', host=host, port=port)
85 self.connector = reactor.connectTCP(host, port, self)
86 self.d_disconnected = Deferred()
87 yield self.d_disconnected
88 log.debug('reconnect', after_delay=self.retry_interval)
89 yield asleep(self.retry_interval)
90
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070091 def enter_disconnected(self, event, reason):
92 """Internally signal entering disconnected state"""
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070093 self.connected = False
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070094 if not self.exiting:
95 log.error(event, reason=reason)
96 self.d_disconnected.callback(None)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070097
98 def enter_connected(self):
99 """Handle transitioning from disconnected to connected state"""
100 log.info('connected')
101 self.connected = True
102 self.read_buffer = None
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700103 reactor.callLater(0, self.proto_handler.start)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700104
105 # protocol.ClientFactory methods
106
107 def protocol(self):
108 cxn = OpenFlowConnection(self) # Low level message handler
109 self.proto_handler = OpenFlowProtocolHandler(
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700110 self.datapath_id, self.device_id, self, cxn, self.rpc_stub)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700111 return cxn
112
113 def clientConnectionFailed(self, connector, reason):
114 self.enter_disconnected('connection-failed', reason)
115
116 def clientConnectionLost(self, connector, reason):
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700117 if not self.exiting:
118 log.error('client-connection-lost',
119 reason=reason, connector=connector)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700120
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700121 def forward_packet_in(self, ofp_packet_in):
122 self.proto_handler.forward_packet_in(ofp_packet_in)
123
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700124
125if __name__ == '__main__':
126 """Run this to test the agent for N concurrent sessions:
127 python agent [<number-of-desired-instances>]
128 """
129
130 n = 1 if len(sys.argv) < 2 else int(sys.argv[1])
131
132 from utils import mac_str_to_tuple
133
134 class MockRpc(object):
135 @staticmethod
136 def get_port_list(_):
137 ports = []
138 cap = of13.OFPPF_1GB_FD | of13.OFPPF_FIBER
139 for pno, mac, nam, cur, adv, sup, spe in (
140 (1, '00:00:00:00:00:01', 'onu1', cap, cap, cap,
141 of13.OFPPF_1GB_FD),
142 (2, '00:00:00:00:00:02', 'onu2', cap, cap, cap,
143 of13.OFPPF_1GB_FD),
144 (129, '00:00:00:00:00:81', 'olt', cap, cap, cap,
145 of13.OFPPF_1GB_FD)
146 ):
147 port = of13.common.port_desc(pno, mac_str_to_tuple(mac), nam,
148 curr=cur, advertised=adv,
149 supported=sup,
150 curr_speed=spe, max_speed=spe)
151 ports.append(port)
152 return ports
153
154 stub = MockRpc()
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700155 agents = [Agent('localhost:6633', 256 + i, stub).start() for i in range(n)]
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700156
157 def shutdown():
158 [a.stop() for a in agents]
159
160 reactor.addSystemEventTrigger('before', 'shutdown', shutdown)
161 reactor.run()
162