blob: cb59d249ea7d83c63e21e476a7ff44bd61b9e36b [file] [log] [blame]
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import sys
18
19import structlog
Girishf6eeaea2017-11-13 10:53:57 +053020import os.path
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070021from twisted.internet import protocol
22from twisted.internet import reactor
schowdhury3fbd4702017-06-30 03:50:25 -070023from twisted.internet import reactor, ssl
24from twisted.internet import reactor
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070025from twisted.internet.defer import Deferred, inlineCallbacks
26
27import loxi.of13 as of13
28from common.utils.asleep import asleep
29from of_connection import OpenFlowConnection
30from of_protocol_handler import OpenFlowProtocolHandler
Khen Nursimulu7626ce12016-12-21 11:51:46 -050031# from ofagent.protos.openflow_13_pb2 import ChangeEvent
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070032
33log = structlog.get_logger()
34
35
36class Agent(protocol.ClientFactory):
37
sathishg00433e52017-05-23 16:07:41 +053038 generation_is_defined = False
39 cached_generation_id = None
40
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070041 def __init__(self,
42 controller_endpoint,
43 datapath_id,
44 device_id,
45 rpc_stub,
Girishf6eeaea2017-11-13 10:53:57 +053046 enable_tls=False,
47 key_file=None,
48 cert_file=None,
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070049 conn_retry_interval=1):
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070050
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070051 self.controller_endpoint = controller_endpoint
52 self.datapath_id = datapath_id
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070053 self.device_id = device_id
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070054 self.rpc_stub = rpc_stub
Girishf6eeaea2017-11-13 10:53:57 +053055 self.enable_tls = enable_tls
56 self.key_file = key_file
57 self.cert_file = cert_file
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070058 self.retry_interval = conn_retry_interval
59
60 self.running = False
61 self.connector = None # will be a Connector instance once connected
62 self.d_disconnected = None # a deferred to signal reconnect loop when
63 # TCP connection is lost
64 self.connected = False
65 self.exiting = False
Zsolt Haraszti47324632016-12-20 00:50:33 -080066 self.proto_handler = None
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070067
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070068 def get_device_id(self):
69 return self.device_id
70
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070071 def start(self):
72 log.debug('starting')
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070073 if self.running:
74 return
75 self.running = True
76 reactor.callLater(0, self.keep_connected)
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070077 log.info('started')
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070078 return self
79
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070080 def stop(self):
81 log.debug('stopping')
82 self.connected = False
83 self.exiting = True
84 self.connector.disconnect()
85 log.info('stopped')
86
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070087 def resolve_endpoint(self, endpoint):
alshabibc3fb4942017-01-26 15:34:24 -080088 # enable optional resolution via consul;
89 # see https://jira.opencord.org/browse/CORD-820
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070090 host, port = endpoint.split(':', 2)
91 return host, int(port)
92
93 @inlineCallbacks
94 def keep_connected(self):
95 """Keep reconnecting to the controller"""
96 while not self.exiting:
97 host, port = self.resolve_endpoint(self.controller_endpoint)
98 log.info('connecting', host=host, port=port)
Girishf6eeaea2017-11-13 10:53:57 +053099 if self.enable_tls:
100 try:
101 # Check that key_file and cert_file is provided and
102 # the files exist
103 if self.key_file is None or \
104 self.cert_file is None or \
105 not os.path.isfile(self.key_file) or \
106 not os.path.isfile(self.cert_file):
107 raise Exception('key_file "{}" or cert_file "{}"'
108 ' is not found'.
109 format(self.key_file, self.cert_file))
110 with open(self.key_file) as keyFile:
111 with open(self.cert_file) as certFile:
112 clientCert = ssl.PrivateCertificate.loadPEM(
113 keyFile.read() + certFile.read())
schowdhury3fbd4702017-06-30 03:50:25 -0700114
Girishf6eeaea2017-11-13 10:53:57 +0530115 ctx = clientCert.options()
116 self.connector = reactor.connectSSL(host, port, self, ctx)
117 log.info('tls-enabled')
schowdhury3fbd4702017-06-30 03:50:25 -0700118
Girishf6eeaea2017-11-13 10:53:57 +0530119 except Exception as e:
120 log.exception('failed-to-connect', reason=e)
121 else:
122 self.connector = reactor.connectTCP(host, port, self)
123 log.info('tls-disabled')
schowdhury3fbd4702017-06-30 03:50:25 -0700124
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700125 self.d_disconnected = Deferred()
126 yield self.d_disconnected
127 log.debug('reconnect', after_delay=self.retry_interval)
128 yield asleep(self.retry_interval)
129
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700130 def enter_disconnected(self, event, reason):
131 """Internally signal entering disconnected state"""
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700132 self.connected = False
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700133 if not self.exiting:
134 log.error(event, reason=reason)
135 self.d_disconnected.callback(None)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700136
137 def enter_connected(self):
138 """Handle transitioning from disconnected to connected state"""
139 log.info('connected')
140 self.connected = True
141 self.read_buffer = None
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700142 reactor.callLater(0, self.proto_handler.start)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700143
144 # protocol.ClientFactory methods
145
146 def protocol(self):
147 cxn = OpenFlowConnection(self) # Low level message handler
148 self.proto_handler = OpenFlowProtocolHandler(
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700149 self.datapath_id, self.device_id, self, cxn, self.rpc_stub)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700150 return cxn
151
152 def clientConnectionFailed(self, connector, reason):
153 self.enter_disconnected('connection-failed', reason)
154
155 def clientConnectionLost(self, connector, reason):
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700156 if not self.exiting:
157 log.error('client-connection-lost',
158 reason=reason, connector=connector)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700159
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700160 def forward_packet_in(self, ofp_packet_in):
Zsolt Haraszti47324632016-12-20 00:50:33 -0800161 if self.proto_handler is not None:
162 self.proto_handler.forward_packet_in(ofp_packet_in)
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700163
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800164 def forward_change_event(self, event):
165 # assert isinstance(event, ChangeEvent)
166 log.info('got-change-event', change_event=event)
167 if event.HasField("port_status"):
Zsolt Haraszti47324632016-12-20 00:50:33 -0800168 if self.proto_handler is not None:
169 self.proto_handler.forward_port_status(event.port_status)
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800170 else:
171 log.error('unknown-change-event', change_event=event)
172
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700173
174if __name__ == '__main__':
175 """Run this to test the agent for N concurrent sessions:
176 python agent [<number-of-desired-instances>]
177 """
178
179 n = 1 if len(sys.argv) < 2 else int(sys.argv[1])
180
181 from utils import mac_str_to_tuple
182
183 class MockRpc(object):
184 @staticmethod
185 def get_port_list(_):
186 ports = []
187 cap = of13.OFPPF_1GB_FD | of13.OFPPF_FIBER
188 for pno, mac, nam, cur, adv, sup, spe in (
189 (1, '00:00:00:00:00:01', 'onu1', cap, cap, cap,
190 of13.OFPPF_1GB_FD),
191 (2, '00:00:00:00:00:02', 'onu2', cap, cap, cap,
192 of13.OFPPF_1GB_FD),
193 (129, '00:00:00:00:00:81', 'olt', cap, cap, cap,
194 of13.OFPPF_1GB_FD)
195 ):
196 port = of13.common.port_desc(pno, mac_str_to_tuple(mac), nam,
197 curr=cur, advertised=adv,
198 supported=sup,
199 curr_speed=spe, max_speed=spe)
200 ports.append(port)
201 return ports
202
203 stub = MockRpc()
Zsolt Harasztiee5c4c82017-01-09 14:37:57 -0800204 agents = [Agent('localhost:6653', 256 + i, stub).start() for i in range(n)]
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700205
206 def shutdown():
207 [a.stop() for a in agents]
208
209 reactor.addSystemEventTrigger('before', 'shutdown', shutdown)
210 reactor.run()
211