blob: bcc423942cc58154423ecfbc58eb4fc400fe1f21 [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Agent to play gateway between CORE and an individual adapter.
19"""
20from uuid import uuid4
21
22import arrow
23import structlog
24from google.protobuf.json_format import MessageToJson
25from scapy.packet import Packet
26from twisted.internet.defer import inlineCallbacks, returnValue
27from twisted.python import failure
28from zope.interface import implementer
29
30from adapters.common.event_bus import EventBusClient
31from adapters.common.frameio.frameio import hexify
32from adapters.common.utils.id_generation import create_cluster_logical_device_ids
33from adapters.interface import IAdapterInterface
34from adapters.protos import third_party
35from adapters.protos.device_pb2 import Device, Port, PmConfigs
36from adapters.protos.events_pb2 import AlarmEvent, AlarmEventType, \
37 AlarmEventSeverity, AlarmEventState, AlarmEventCategory
38from adapters.protos.events_pb2 import KpiEvent
39from adapters.protos.voltha_pb2 import DeviceGroup, LogicalDevice, \
40 LogicalPort, AdminState, OperStatus, AlarmFilterRuleKey, CoreInstance
41from adapters.common.utils.registry import registry, IComponent
42from adapters.common.utils.id_generation import create_cluster_device_id
43import re
44from adapters.interface import ICoreSouthBoundInterface
45from adapters.protos.core_adapter_pb2 import StrType, BoolType, IntType
46from adapters.protos.common_pb2 import ID
47from google.protobuf.message import Message
48from adapters.common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
49
50log = structlog.get_logger()
51
52class KafkaMessagingError(BaseException):
53 def __init__(self, error):
54 self.error = error
55
56def wrap_request(return_cls):
57 def real_wrapper(func):
58 @inlineCallbacks
59 def wrapper(*args, **kw):
60 try:
61 (success, d) = yield func(*args, **kw)
62 if success:
63 log.debug("successful-response", func=func, val=d)
64 if return_cls is not None:
65 rc = return_cls()
66 if d is not None:
67 d.Unpack(rc)
68 returnValue(rc)
69 else:
70 log.debug("successful-response-none", func=func,
71 val=None)
72 returnValue(None)
73 else:
74 log.warn("unsuccessful-request", func=func, args=args, kw=kw)
75 returnValue(d)
76 except Exception as e:
77 log.exception("request-wrapper-exception", func=func, e=e)
78 raise
79 return wrapper
80 return real_wrapper
81
82
83@implementer(IComponent, ICoreSouthBoundInterface)
84class CoreProxy(object):
85
86 def __init__(self, kafka_proxy, core_topic, my_listening_topic):
87 self.kafka_proxy = kafka_proxy
88 self.listening_topic = my_listening_topic
89 self.core_topic = core_topic
90 self.default_timeout = 3
91
92 def start(self):
93 log.info('started')
94
95 return self
96
97 def stop(self):
98 log.info('stopped')
99
100 @inlineCallbacks
101 def invoke(self, rpc, to_topic=None, **kwargs):
102 @inlineCallbacks
103 def _send_request(rpc, m_callback,to_topic, **kwargs):
104 try:
105 log.debug("sending-request", rpc=rpc)
106 if to_topic is None:
107 to_topic = self.core_topic
108 result = yield self.kafka_proxy.send_request(rpc=rpc,
109 to_topic=to_topic,
110 reply_topic=self.listening_topic,
111 callback=None,
112 **kwargs)
113 if not m_callback.called:
114 m_callback.callback(result)
115 else:
116 log.debug('timeout-already-occurred', rpc=rpc)
117 except Exception as e:
118 log.exception("Failure-sending-request", rpc=rpc, kw=kwargs)
119 if not m_callback.called:
120 m_callback.errback(failure.Failure())
121
122 log.debug('invoke-request', rpc=rpc)
123 cb = DeferredWithTimeout(timeout=self.default_timeout)
124 _send_request(rpc, cb, to_topic, **kwargs)
125 try:
126 res = yield cb
127 returnValue(res)
128 except TimeOutError as e:
129 log.warn('invoke-timeout', e=e)
130 raise e
131
132
133 @wrap_request(CoreInstance)
134 @inlineCallbacks
135 def register(self, adapter):
136 log.debug("register")
137 try:
138 res = yield self.invoke(rpc="Register", adapter=adapter)
139 log.info("registration-returned", res=res)
140 returnValue(res)
141 except Exception as e:
142 log.exception("registration-exception", e=e)
143 raise
144
145 @wrap_request(Device)
146 @inlineCallbacks
147 def get_device(self, device_id):
148 log.debug("get-device")
149 id = ID()
150 id.id = device_id
151 res = yield self.invoke(rpc="GetDevice", device_id=id)
152 returnValue(res)
153
154 @wrap_request(Device)
155 @inlineCallbacks
156 def get_child_device(self, parent_device_id, **kwargs):
157 raise NotImplementedError()
158
159 # def add_device(self, device):
160 # raise NotImplementedError()
161
162 def get_ports(self, device_id, port_type):
163 raise NotImplementedError()
164
165 def get_child_devices(self, parent_device_id):
166 raise NotImplementedError()
167
168 def get_child_device_with_proxy_address(self, proxy_address):
169 raise NotImplementedError()
170
171 def _to_proto(self, **kwargs):
172 encoded = {}
173 for k,v in kwargs.iteritems():
174 if isinstance(v, Message):
175 encoded[k] = v
176 elif type(v) == int:
177 i_proto = IntType()
178 i_proto.val = v
179 encoded[k] = i_proto
180 elif type(v) == str:
181 s_proto = StrType()
182 s_proto.val = v
183 encoded[k] = s_proto
184 elif type(v) == bool:
185 b_proto = BoolType()
186 b_proto.val = v
187 encoded[k] = b_proto
188 return encoded
189
190
191 @wrap_request(None)
192 @inlineCallbacks
193 def child_device_detected(self,
194 parent_device_id,
195 parent_port_no,
196 child_device_type,
197 channel_id,
198 **kw):
199 id = ID()
200 id.id = parent_device_id
201 ppn = IntType()
202 ppn.val = parent_port_no
203 cdt = StrType()
204 cdt.val = child_device_type
205 channel = IntType()
206 channel.val = channel_id
207
208 args = self._to_proto(**kw)
209 res = yield self.invoke(rpc="ChildDeviceDetected",
210 parent_device_id=id,
211 parent_port_no = ppn,
212 child_device_type= cdt,
213 channel_id=channel,
214 **args)
215 returnValue(res)
216
217
218 @wrap_request(None)
219 @inlineCallbacks
220 def device_update(self, device):
221 log.debug("device_update")
222 res = yield self.invoke(rpc="DeviceUpdate", device=device)
223 returnValue(res)
224
225 def child_device_removed(parent_device_id, child_device_id):
226 raise NotImplementedError()
227
228
229 @wrap_request(None)
230 @inlineCallbacks
231 def device_state_update(self, device_id,
232 oper_status=None,
233 connect_status=None):
234
235 id = ID()
236 id.id = device_id
237 o_status = IntType()
238 if oper_status:
239 o_status.val = oper_status
240 else:
241 o_status.val = -1
242 c_status = IntType()
243 if connect_status:
244 c_status.val = connect_status
245 else:
246 c_status.val = -1
247 a_status = IntType()
248
249 res = yield self.invoke(rpc="DeviceStateUpdate",
250 device_id=id,
251 oper_status=o_status,
252 connect_status=c_status)
253 returnValue(res)
254
255 @wrap_request(None)
256 @inlineCallbacks
257 def child_devices_state_update(self, parent_device_id,
258 oper_status=None,
259 connect_status=None,
260 admin_state=None):
261
262 id = ID()
263 id.id = parent_device_id
264 o_status = IntType()
265 if oper_status:
266 o_status.val = oper_status
267 else:
268 o_status.val = -1
269 c_status = IntType()
270 if connect_status:
271 c_status.val = connect_status
272 else:
273 c_status.val = -1
274 a_status = IntType()
275 if admin_state:
276 a_status.val = admin_state
277 else:
278 a_status.val = -1
279
280 res = yield self.invoke(rpc="child_devices_state_update",
281 parent_device_id=id,
282 oper_status=o_status,
283 connect_status=c_status,
284 admin_state=a_status)
285 returnValue(res)
286
287
288 def child_devices_removed(parent_device_id):
289 raise NotImplementedError()
290
291
292 @wrap_request(None)
293 @inlineCallbacks
294 def device_pm_config_update(self, device_pm_config, init=False):
295 log.debug("device_pm_config_update")
296 b = BoolType()
297 b.val = init
298 res = yield self.invoke(rpc="DevicePMConfigUpdate",
299 device_pm_config=device_pm_config, init=b)
300 returnValue(res)
301
302 @wrap_request(None)
303 @inlineCallbacks
304 def port_created(self, device_id, port):
305 log.debug("port_created")
306 proto_id = ID()
307 proto_id.id = device_id
308 res = yield self.invoke(rpc="PortCreated", device_id=proto_id, port=port)
309 returnValue(res)
310
311
312 def port_removed(device_id, port):
313 raise NotImplementedError()
314
315 def ports_enabled(device_id):
316 raise NotImplementedError()
317
318 def ports_disabled(device_id):
319 raise NotImplementedError()
320
321 def ports_oper_status_update(device_id, oper_status):
322 raise NotImplementedError()
323
324 def image_download_update(img_dnld):
325 raise NotImplementedError()
326
327 def image_download_deleted(img_dnld):
328 raise NotImplementedError()
329
330 def packet_in(device_id, egress_port_no, packet):
331 raise NotImplementedError()