blob: d2ecd0c7251890a4ffec57d842380f9a6a0a2fde [file] [log] [blame]
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import sys
18
19import structlog
20from twisted.internet import protocol
21from twisted.internet import reactor
schowdhury3fbd4702017-06-30 03:50:25 -070022from twisted.internet import reactor, ssl
23from twisted.internet import reactor
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070024from twisted.internet.defer import Deferred, inlineCallbacks
25
26import loxi.of13 as of13
27from common.utils.asleep import asleep
28from of_connection import OpenFlowConnection
29from of_protocol_handler import OpenFlowProtocolHandler
Khen Nursimulu7626ce12016-12-21 11:51:46 -050030# from ofagent.protos.openflow_13_pb2 import ChangeEvent
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070031
32log = structlog.get_logger()
33
34
35class Agent(protocol.ClientFactory):
36
sathishg00433e52017-05-23 16:07:41 +053037 generation_is_defined = False
38 cached_generation_id = None
39
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070040 def __init__(self,
41 controller_endpoint,
42 datapath_id,
43 device_id,
44 rpc_stub,
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070045 conn_retry_interval=1):
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070046
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070047 self.controller_endpoint = controller_endpoint
48 self.datapath_id = datapath_id
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070049 self.device_id = device_id
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070050 self.rpc_stub = rpc_stub
51 self.retry_interval = conn_retry_interval
52
53 self.running = False
54 self.connector = None # will be a Connector instance once connected
55 self.d_disconnected = None # a deferred to signal reconnect loop when
56 # TCP connection is lost
57 self.connected = False
58 self.exiting = False
Zsolt Haraszti47324632016-12-20 00:50:33 -080059 self.proto_handler = None
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070060
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070061 def get_device_id(self):
62 return self.device_id
63
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070064 def start(self):
65 log.debug('starting')
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070066 if self.running:
67 return
68 self.running = True
69 reactor.callLater(0, self.keep_connected)
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070070 log.info('started')
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070071 return self
72
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070073 def stop(self):
74 log.debug('stopping')
75 self.connected = False
76 self.exiting = True
77 self.connector.disconnect()
78 log.info('stopped')
79
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070080 def resolve_endpoint(self, endpoint):
alshabibc3fb4942017-01-26 15:34:24 -080081 # enable optional resolution via consul;
82 # see https://jira.opencord.org/browse/CORD-820
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070083 host, port = endpoint.split(':', 2)
84 return host, int(port)
85
86 @inlineCallbacks
87 def keep_connected(self):
88 """Keep reconnecting to the controller"""
89 while not self.exiting:
90 host, port = self.resolve_endpoint(self.controller_endpoint)
91 log.info('connecting', host=host, port=port)
schowdhury3fbd4702017-06-30 03:50:25 -070092 try:
93 with open("/ofagent/pki/voltha.key") as keyFile:
94 with open("/ofagent/pki/voltha.crt") as certFile:
95 clientCert = ssl.PrivateCertificate.loadPEM(
96 keyFile.read() + certFile.read())
97
98 ctx = clientCert.options()
99 self.connector = reactor.connectSSL(host, port, self, ctx)
100
sathishg239af452017-07-13 16:27:46 +0530101 except Exception as e:
102 log.exception('failed-to-connect', reason=e)
schowdhury3fbd4702017-06-30 03:50:25 -0700103
104
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700105 self.d_disconnected = Deferred()
106 yield self.d_disconnected
107 log.debug('reconnect', after_delay=self.retry_interval)
108 yield asleep(self.retry_interval)
109
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700110 def enter_disconnected(self, event, reason):
111 """Internally signal entering disconnected state"""
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700112 self.connected = False
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700113 if not self.exiting:
114 log.error(event, reason=reason)
115 self.d_disconnected.callback(None)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700116
117 def enter_connected(self):
118 """Handle transitioning from disconnected to connected state"""
119 log.info('connected')
120 self.connected = True
121 self.read_buffer = None
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700122 reactor.callLater(0, self.proto_handler.start)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700123
124 # protocol.ClientFactory methods
125
126 def protocol(self):
127 cxn = OpenFlowConnection(self) # Low level message handler
128 self.proto_handler = OpenFlowProtocolHandler(
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700129 self.datapath_id, self.device_id, self, cxn, self.rpc_stub)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700130 return cxn
131
132 def clientConnectionFailed(self, connector, reason):
133 self.enter_disconnected('connection-failed', reason)
134
135 def clientConnectionLost(self, connector, reason):
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700136 if not self.exiting:
137 log.error('client-connection-lost',
138 reason=reason, connector=connector)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700139
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700140 def forward_packet_in(self, ofp_packet_in):
Zsolt Haraszti47324632016-12-20 00:50:33 -0800141 if self.proto_handler is not None:
142 self.proto_handler.forward_packet_in(ofp_packet_in)
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700143
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800144 def forward_change_event(self, event):
145 # assert isinstance(event, ChangeEvent)
146 log.info('got-change-event', change_event=event)
147 if event.HasField("port_status"):
Zsolt Haraszti47324632016-12-20 00:50:33 -0800148 if self.proto_handler is not None:
149 self.proto_handler.forward_port_status(event.port_status)
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800150 else:
151 log.error('unknown-change-event', change_event=event)
152
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700153
154if __name__ == '__main__':
155 """Run this to test the agent for N concurrent sessions:
156 python agent [<number-of-desired-instances>]
157 """
158
159 n = 1 if len(sys.argv) < 2 else int(sys.argv[1])
160
161 from utils import mac_str_to_tuple
162
163 class MockRpc(object):
164 @staticmethod
165 def get_port_list(_):
166 ports = []
167 cap = of13.OFPPF_1GB_FD | of13.OFPPF_FIBER
168 for pno, mac, nam, cur, adv, sup, spe in (
169 (1, '00:00:00:00:00:01', 'onu1', cap, cap, cap,
170 of13.OFPPF_1GB_FD),
171 (2, '00:00:00:00:00:02', 'onu2', cap, cap, cap,
172 of13.OFPPF_1GB_FD),
173 (129, '00:00:00:00:00:81', 'olt', cap, cap, cap,
174 of13.OFPPF_1GB_FD)
175 ):
176 port = of13.common.port_desc(pno, mac_str_to_tuple(mac), nam,
177 curr=cur, advertised=adv,
178 supported=sup,
179 curr_speed=spe, max_speed=spe)
180 ports.append(port)
181 return ports
182
183 stub = MockRpc()
Zsolt Harasztiee5c4c82017-01-09 14:37:57 -0800184 agents = [Agent('localhost:6653', 256 + i, stub).start() for i in range(n)]
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700185
186 def shutdown():
187 [a.stop() for a in agents]
188
189 reactor.addSystemEventTrigger('before', 'shutdown', shutdown)
190 reactor.run()
191