blob: fe148083c40733b53d43f2b7ae00e4365288931b [file] [log] [blame]
Chip Boling67b674a2019-02-08 11:42:18 -06001#
2# Copyright 2018 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Agent to play gateway between adapters.
19"""
20
Zack Williams84a71e92019-11-15 09:00:19 -070021from __future__ import absolute_import
Chip Boling67b674a2019-02-08 11:42:18 -060022import structlog
23from uuid import uuid4
24from twisted.internet.defer import inlineCallbacks, returnValue
Zack Williams84a71e92019-11-15 09:00:19 -070025from .container_proxy import ContainerProxy
William Kurkianede82e92019-03-05 13:02:57 -050026from voltha_protos.inter_container_pb2 import InterAdapterHeader, \
Chip Boling67b674a2019-02-08 11:42:18 -060027 InterAdapterMessage
28import time
Zack Williams84a71e92019-11-15 09:00:19 -070029import codecs
30import six
Chip Boling67b674a2019-02-08 11:42:18 -060031
Chip Boling67b674a2019-02-08 11:42:18 -060032log = structlog.get_logger()
33
34
35class AdapterProxy(ContainerProxy):
36
serkant.uluderyac344f302020-05-29 23:26:57 -070037 def __init__(self, kafka_proxy, adapter_topic, my_listening_topic):
Chip Boling67b674a2019-02-08 11:42:18 -060038 super(AdapterProxy, self).__init__(kafka_proxy,
serkant.uluderyac344f302020-05-29 23:26:57 -070039 adapter_topic,
Chip Boling67b674a2019-02-08 11:42:18 -060040 my_listening_topic)
41
42 def _to_string(self, unicode_str):
43 if unicode_str is not None:
Zack Williams84a71e92019-11-15 09:00:19 -070044 if isinstance(unicode_str, six.string_types):
Chip Boling67b674a2019-02-08 11:42:18 -060045 return unicode_str
Zack Williams84a71e92019-11-15 09:00:19 -070046 else:
47 return codecs.encode(unicode_str, 'ascii')
Chip Boling67b674a2019-02-08 11:42:18 -060048 else:
Zack Williams84a71e92019-11-15 09:00:19 -070049 return None
Chip Boling67b674a2019-02-08 11:42:18 -060050
51 @ContainerProxy.wrap_request(None)
52 @inlineCallbacks
53 def send_inter_adapter_message(self,
54 msg,
55 type,
56 from_adapter,
57 to_adapter,
58 to_device_id=None,
59 proxy_device_id=None,
60 message_id=None):
61 """
62 Sends a message directly to an adapter. This is typically used to send
63 proxied messages from one adapter to another. An initial ACK response
64 is sent back to the invoking adapter. If there is subsequent response
65 to be sent back (async) then the adapter receiving this request will
66 use this same API to send back the async response.
67 :param msg : GRPC message to send
68 :param type : InterAdapterMessageType of the message to send
69 :param from_adapter: Name of the adapter making the request.
70 :param to_adapter: Name of the remote adapter.
71 :param to_device_id: The ID of the device for to the message is
72 intended. if it's None then the message is not intended to a specific
73 device. Its interpretation is adapter specific.
74 :param proxy_device_id: The ID of the device which will proxy that
75 message. If it's None then there is no specific device to proxy the
76 message. Its interpretation is adapter specific.
77 :param message_id: A unique number for this transaction that the
78 adapter may use to correlate a request and an async response.
79 """
80
81 try:
82 # validate params
83 assert msg
84 assert from_adapter
85 assert to_adapter
86
87 # Build the inter adapter message
88 h = InterAdapterHeader()
89 h.type = type
90 h.from_topic = self._to_string(from_adapter)
serkant.uluderyac344f302020-05-29 23:26:57 -070091 h.to_topic = self.remote_topic
Chip Boling67b674a2019-02-08 11:42:18 -060092 h.to_device_id = self._to_string(to_device_id)
93 h.proxy_device_id = self._to_string(proxy_device_id)
94
95 if message_id:
96 h.id = self._to_string(message_id)
97 else:
98 h.id = uuid4().hex
99
Scott Baker8f144242020-04-17 13:13:05 -0700100 h.timestamp.GetCurrentTime()
Chip Boling67b674a2019-02-08 11:42:18 -0600101 iaMsg = InterAdapterMessage()
102 iaMsg.header.CopyFrom(h)
103 iaMsg.body.Pack(msg)
104
Matteo Scandolo63efb062019-11-26 12:14:48 -0700105 log.debug("sending-inter-adapter-message", type=iaMsg.header.type, from_topic=iaMsg.header.from_topic,
106 to_topic=iaMsg.header.to_topic, to_device_id=iaMsg.header.to_device_id)
Chip Boling67b674a2019-02-08 11:42:18 -0600107 res = yield self.invoke(rpc="process_inter_adapter_message",
108 to_topic=iaMsg.header.to_topic,
109 msg=iaMsg)
110 returnValue(res)
111 except Exception as e:
112 log.exception("error-sending-request", e=e)