blob: e4d415ddec6a2b99cfad28f3e904d62c27b79922 [file] [log] [blame]
Stephane Barbarie6e1bd502018-11-05 22:44:45 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import sys
18
19import structlog
20import os.path
21from twisted.internet import protocol, reactor, ssl
22from twisted.internet.defer import Deferred, inlineCallbacks
23
24import loxi.of13 as of13
William Kurkianfc0dcda2019-04-08 16:54:36 -040025from pyvoltha.common.utils.asleep import asleep
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050026from of_connection import OpenFlowConnection
27from of_protocol_handler import OpenFlowProtocolHandler
28
29log = structlog.get_logger()
30
31
32class Agent(protocol.ClientFactory):
33
34 generation_is_defined = False
35 cached_generation_id = None
36
37 def __init__(self,
David Bainbridge006dc842019-11-22 02:05:32 +000038 connection_manager,
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050039 controller_endpoint,
40 datapath_id,
41 device_id,
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050042 enable_tls=False,
43 key_file=None,
44 cert_file=None,
45 conn_retry_interval=1):
46
47 self.controller_endpoint = controller_endpoint
48 self.datapath_id = datapath_id
49 self.device_id = device_id
David Bainbridge006dc842019-11-22 02:05:32 +000050 self.connection_manager = connection_manager
Stephane Barbarie6e1bd502018-11-05 22:44:45 -050051 self.enable_tls = enable_tls
52 self.key_file = key_file
53 self.cert_file = cert_file
54 self.retry_interval = conn_retry_interval
55
56 self.running = False
57 self.connector = None # will be a Connector instance once connected
58 self.d_disconnected = None # a deferred to signal reconnect loop when
59 # TCP connection is lost
60 self.connected = False
61 self.exiting = False
62 self.proto_handler = None
63
64 def get_device_id(self):
65 return self.device_id
66
67 def start(self):
68 log.debug('starting')
69 if self.running:
70 return
71 self.running = True
72 reactor.callLater(0, self.keep_connected)
73 log.info('started')
74 return self
75
76 def stop(self):
77 log.debug('stopping')
78 self.connected = False
79 self.exiting = True
80 self.connector.disconnect()
81 log.info('stopped')
82
83 def resolve_endpoint(self, endpoint):
84 # enable optional resolution via consul;
85 # see https://jira.opencord.org/browse/CORD-820
86 host, port = endpoint.split(':', 2)
87 return host, int(port)
88
89 @inlineCallbacks
90 def keep_connected(self):
91 """Keep reconnecting to the controller"""
92 while not self.exiting:
93 host, port = self.resolve_endpoint(self.controller_endpoint)
94 log.info('connecting', host=host, port=port)
95 if self.enable_tls:
96 try:
97 # Check that key_file and cert_file is provided and
98 # the files exist
99 if self.key_file is None or \
100 self.cert_file is None or \
101 not os.path.isfile(self.key_file) or \
102 not os.path.isfile(self.cert_file):
103 raise Exception('key_file "{}" or cert_file "{}"'
104 ' is not found'.
105 format(self.key_file, self.cert_file))
106 with open(self.key_file) as keyFile:
107 with open(self.cert_file) as certFile:
108 clientCert = ssl.PrivateCertificate.loadPEM(
109 keyFile.read() + certFile.read())
110
111 ctx = clientCert.options()
112 self.connector = reactor.connectSSL(host, port, self, ctx)
113 log.info('tls-enabled')
114
115 except Exception as e:
116 log.exception('failed-to-connect', reason=e)
117 else:
118 self.connector = reactor.connectTCP(host, port, self)
119 log.info('tls-disabled')
120
121 self.d_disconnected = Deferred()
122 yield self.d_disconnected
123 log.debug('reconnect', after_delay=self.retry_interval)
124 yield asleep(self.retry_interval)
125
126 def enter_disconnected(self, event, reason):
127 """Internally signal entering disconnected state"""
128 self.connected = False
129 if not self.exiting:
130 log.error(event, reason=reason)
131 self.d_disconnected.callback(None)
132
133 def enter_connected(self):
134 """Handle transitioning from disconnected to connected state"""
135 log.info('connected')
136 self.connected = True
137 self.read_buffer = None
138 reactor.callLater(0, self.proto_handler.start)
139
140 # protocol.ClientFactory methods
141
142 def protocol(self):
143 cxn = OpenFlowConnection(self) # Low level message handler
144 self.proto_handler = OpenFlowProtocolHandler(
David Bainbridge006dc842019-11-22 02:05:32 +0000145 self.datapath_id, self.device_id, self, cxn)
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500146 return cxn
147
148 def clientConnectionFailed(self, connector, reason):
149 self.enter_disconnected('connection-failed', reason)
150
151 def clientConnectionLost(self, connector, reason):
152 if not self.exiting:
153 log.error('client-connection-lost',
154 reason=reason, connector=connector)
155
156 def forward_packet_in(self, ofp_packet_in):
157 if self.proto_handler is not None:
158 self.proto_handler.forward_packet_in(ofp_packet_in)
159
160 def forward_change_event(self, event):
161 # assert isinstance(event, ChangeEvent)
162 log.info('got-change-event', change_event=event)
163 if event.HasField("port_status"):
164 if self.proto_handler is not None:
165 self.proto_handler.forward_port_status(event.port_status)
166 else:
167 log.error('unknown-change-event', change_event=event)
168
Stephane Barbarie6e1bd502018-11-05 22:44:45 -0500169if __name__ == '__main__':
170 """Run this to test the agent for N concurrent sessions:
171 python agent [<number-of-desired-instances>]
172 """
173
174 n = 1 if len(sys.argv) < 2 else int(sys.argv[1])
175
176 from utils import mac_str_to_tuple
177
178 class MockRpc(object):
179 @staticmethod
180 def get_port_list(_):
181 ports = []
182 cap = of13.OFPPF_1GB_FD | of13.OFPPF_FIBER
183 for pno, mac, nam, cur, adv, sup, spe in (
184 (1, '00:00:00:00:00:01', 'onu1', cap, cap, cap,
185 of13.OFPPF_1GB_FD),
186 (2, '00:00:00:00:00:02', 'onu2', cap, cap, cap,
187 of13.OFPPF_1GB_FD),
188 (129, '00:00:00:00:00:81', 'olt', cap, cap, cap,
189 of13.OFPPF_1GB_FD)
190 ):
191 port = of13.common.port_desc(pno, mac_str_to_tuple(mac), nam,
192 curr=cur, advertised=adv,
193 supported=sup,
194 curr_speed=spe, max_speed=spe)
195 ports.append(port)
196 return ports
197
198 stub = MockRpc()
199 agents = [Agent('localhost:6653', 256 + i, stub).start() for i in range(n)]
200
201 def shutdown():
202 [a.stop() for a in agents]
203
204 reactor.addSystemEventTrigger('before', 'shutdown', shutdown)
205 reactor.run()