blob: 55c8bfbae0080af47747f83adf32593523ea5996 [file] [log] [blame]
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import sys
18
19import structlog
20from twisted.internet import protocol
21from twisted.internet import reactor
22from twisted.internet.defer import Deferred, inlineCallbacks
23
24import loxi.of13 as of13
25from common.utils.asleep import asleep
26from of_connection import OpenFlowConnection
27from of_protocol_handler import OpenFlowProtocolHandler
Khen Nursimulu7626ce12016-12-21 11:51:46 -050028# from ofagent.protos.openflow_13_pb2 import ChangeEvent
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070029
30log = structlog.get_logger()
31
32
33class Agent(protocol.ClientFactory):
34
sathishg00433e52017-05-23 16:07:41 +053035 generation_is_defined = False
36 cached_generation_id = None
37
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070038 def __init__(self,
39 controller_endpoint,
40 datapath_id,
41 device_id,
42 rpc_stub,
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070043 conn_retry_interval=1):
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070044
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070045 self.controller_endpoint = controller_endpoint
46 self.datapath_id = datapath_id
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070047 self.device_id = device_id
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070048 self.rpc_stub = rpc_stub
49 self.retry_interval = conn_retry_interval
50
51 self.running = False
52 self.connector = None # will be a Connector instance once connected
53 self.d_disconnected = None # a deferred to signal reconnect loop when
54 # TCP connection is lost
55 self.connected = False
56 self.exiting = False
Zsolt Haraszti47324632016-12-20 00:50:33 -080057 self.proto_handler = None
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070058
Zsolt Haraszticd22adc2016-10-25 00:13:06 -070059 def get_device_id(self):
60 return self.device_id
61
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070062 def start(self):
63 log.debug('starting')
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070064 if self.running:
65 return
66 self.running = True
67 reactor.callLater(0, self.keep_connected)
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070068 log.info('started')
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070069 return self
70
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070071 def stop(self):
72 log.debug('stopping')
73 self.connected = False
74 self.exiting = True
75 self.connector.disconnect()
76 log.info('stopped')
77
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070078 def resolve_endpoint(self, endpoint):
alshabibc3fb4942017-01-26 15:34:24 -080079 # enable optional resolution via consul;
80 # see https://jira.opencord.org/browse/CORD-820
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070081 host, port = endpoint.split(':', 2)
82 return host, int(port)
83
84 @inlineCallbacks
85 def keep_connected(self):
86 """Keep reconnecting to the controller"""
87 while not self.exiting:
88 host, port = self.resolve_endpoint(self.controller_endpoint)
89 log.info('connecting', host=host, port=port)
90 self.connector = reactor.connectTCP(host, port, self)
91 self.d_disconnected = Deferred()
92 yield self.d_disconnected
93 log.debug('reconnect', after_delay=self.retry_interval)
94 yield asleep(self.retry_interval)
95
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070096 def enter_disconnected(self, event, reason):
97 """Internally signal entering disconnected state"""
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070098 self.connected = False
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070099 if not self.exiting:
100 log.error(event, reason=reason)
101 self.d_disconnected.callback(None)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700102
103 def enter_connected(self):
104 """Handle transitioning from disconnected to connected state"""
105 log.info('connected')
106 self.connected = True
107 self.read_buffer = None
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700108 reactor.callLater(0, self.proto_handler.start)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700109
110 # protocol.ClientFactory methods
111
112 def protocol(self):
113 cxn = OpenFlowConnection(self) # Low level message handler
114 self.proto_handler = OpenFlowProtocolHandler(
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700115 self.datapath_id, self.device_id, self, cxn, self.rpc_stub)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700116 return cxn
117
118 def clientConnectionFailed(self, connector, reason):
119 self.enter_disconnected('connection-failed', reason)
120
121 def clientConnectionLost(self, connector, reason):
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700122 if not self.exiting:
123 log.error('client-connection-lost',
124 reason=reason, connector=connector)
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700125
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700126 def forward_packet_in(self, ofp_packet_in):
Zsolt Haraszti47324632016-12-20 00:50:33 -0800127 if self.proto_handler is not None:
128 self.proto_handler.forward_packet_in(ofp_packet_in)
Zsolt Haraszticd22adc2016-10-25 00:13:06 -0700129
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800130 def forward_change_event(self, event):
131 # assert isinstance(event, ChangeEvent)
132 log.info('got-change-event', change_event=event)
133 if event.HasField("port_status"):
Zsolt Haraszti47324632016-12-20 00:50:33 -0800134 if self.proto_handler is not None:
135 self.proto_handler.forward_port_status(event.port_status)
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800136 else:
137 log.error('unknown-change-event', change_event=event)
138
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700139
140if __name__ == '__main__':
141 """Run this to test the agent for N concurrent sessions:
142 python agent [<number-of-desired-instances>]
143 """
144
145 n = 1 if len(sys.argv) < 2 else int(sys.argv[1])
146
147 from utils import mac_str_to_tuple
148
149 class MockRpc(object):
150 @staticmethod
151 def get_port_list(_):
152 ports = []
153 cap = of13.OFPPF_1GB_FD | of13.OFPPF_FIBER
154 for pno, mac, nam, cur, adv, sup, spe in (
155 (1, '00:00:00:00:00:01', 'onu1', cap, cap, cap,
156 of13.OFPPF_1GB_FD),
157 (2, '00:00:00:00:00:02', 'onu2', cap, cap, cap,
158 of13.OFPPF_1GB_FD),
159 (129, '00:00:00:00:00:81', 'olt', cap, cap, cap,
160 of13.OFPPF_1GB_FD)
161 ):
162 port = of13.common.port_desc(pno, mac_str_to_tuple(mac), nam,
163 curr=cur, advertised=adv,
164 supported=sup,
165 curr_speed=spe, max_speed=spe)
166 ports.append(port)
167 return ports
168
169 stub = MockRpc()
Zsolt Harasztiee5c4c82017-01-09 14:37:57 -0800170 agents = [Agent('localhost:6653', 256 + i, stub).start() for i in range(n)]
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -0700171
172 def shutdown():
173 [a.stop() for a in agents]
174
175 reactor.addSystemEventTrigger('before', 'shutdown', shutdown)
176 reactor.run()
177