blob: 5196e6d7b9c4c5b0ce7d3c73961f3e389fa2bac4 [file] [log] [blame]
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import sys
18
19import structlog
20from twisted.internet import protocol
21from twisted.internet import reactor
22from twisted.internet.defer import Deferred, inlineCallbacks
23
24import loxi.of13 as of13
25from common.utils.asleep import asleep
26from of_connection import OpenFlowConnection
27from of_protocol_handler import OpenFlowProtocolHandler
28
29
30log = structlog.get_logger()
31
32
33class Agent(protocol.ClientFactory):
34
35 def __init__(self, controller_endpoint, datapath_id, rpc_stub,
36 conn_retry_interval=1):
37 self.controller_endpoint = controller_endpoint
38 self.datapath_id = datapath_id
39 self.rpc_stub = rpc_stub
40 self.retry_interval = conn_retry_interval
41
42 self.running = False
43 self.connector = None # will be a Connector instance once connected
44 self.d_disconnected = None # a deferred to signal reconnect loop when
45 # TCP connection is lost
46 self.connected = False
47 self.exiting = False
48
49 def run(self):
50 if self.running:
51 return
52 self.running = True
53 reactor.callLater(0, self.keep_connected)
54 return self
55
56 def resolve_endpoint(self, endpoint):
57 # TODO allow resolution via consul
58 host, port = endpoint.split(':', 2)
59 return host, int(port)
60
61 @inlineCallbacks
62 def keep_connected(self):
63 """Keep reconnecting to the controller"""
64 while not self.exiting:
65 host, port = self.resolve_endpoint(self.controller_endpoint)
66 log.info('connecting', host=host, port=port)
67 self.connector = reactor.connectTCP(host, port, self)
68 self.d_disconnected = Deferred()
69 yield self.d_disconnected
70 log.debug('reconnect', after_delay=self.retry_interval)
71 yield asleep(self.retry_interval)
72
73 def stop(self):
74 self.connected = False
75 self.exiting = True
76 self.connector.disconnect()
77 log.info('stopped')
78
79 def enter_disconnected(self, event, reason):
80 """Internally signal entering disconnected state"""
81 log.error(event, reason=reason)
82 self.connected = False
83 self.d_disconnected.callback(None)
84
85 def enter_connected(self):
86 """Handle transitioning from disconnected to connected state"""
87 log.info('connected')
88 self.connected = True
89 self.read_buffer = None
90 reactor.callLater(0, self.proto_handler.run)
91
92 # protocol.ClientFactory methods
93
94 def protocol(self):
95 cxn = OpenFlowConnection(self) # Low level message handler
96 self.proto_handler = OpenFlowProtocolHandler(
97 self.datapath_id, self, cxn, self.rpc_stub)
98 return cxn
99
100 def clientConnectionFailed(self, connector, reason):
101 self.enter_disconnected('connection-failed', reason)
102
103 def clientConnectionLost(self, connector, reason):
104 log.error('client-connection-lost',
105 reason=reason, connector=connector)
106
107
108if __name__ == '__main__':
109 """Run this to test the agent for N concurrent sessions:
110 python agent [<number-of-desired-instances>]
111 """
112
113 n = 1 if len(sys.argv) < 2 else int(sys.argv[1])
114
115 from utils import mac_str_to_tuple
116
117 class MockRpc(object):
118 @staticmethod
119 def get_port_list(_):
120 ports = []
121 cap = of13.OFPPF_1GB_FD | of13.OFPPF_FIBER
122 for pno, mac, nam, cur, adv, sup, spe in (
123 (1, '00:00:00:00:00:01', 'onu1', cap, cap, cap,
124 of13.OFPPF_1GB_FD),
125 (2, '00:00:00:00:00:02', 'onu2', cap, cap, cap,
126 of13.OFPPF_1GB_FD),
127 (129, '00:00:00:00:00:81', 'olt', cap, cap, cap,
128 of13.OFPPF_1GB_FD)
129 ):
130 port = of13.common.port_desc(pno, mac_str_to_tuple(mac), nam,
131 curr=cur, advertised=adv,
132 supported=sup,
133 curr_speed=spe, max_speed=spe)
134 ports.append(port)
135 return ports
136
137 stub = MockRpc()
138 agents = [Agent('localhost:6633', 256 + i, stub).run() for i in range(n)]
139
140 def shutdown():
141 [a.stop() for a in agents]
142
143 reactor.addSystemEventTrigger('before', 'shutdown', shutdown)
144 reactor.run()
145