blob: fad10937032e2dd67caadea2bf9e5388b4b46bcb [file] [log] [blame]
khenaidoo6fdf0ba2018-11-02 14:38:33 -04001#
2# Copyright 2018 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Agent to play gateway between adapters.
19"""
20
21import structlog
22from uuid import uuid4
23from twisted.internet.defer import inlineCallbacks, returnValue
24from adapters.kafka.container_proxy import ContainerProxy
25from adapters.protos import third_party
26from adapters.protos.core_adapter_pb2 import InterAdapterHeader, \
27 InterAdapterMessage
28import time
29
30_ = third_party
31log = structlog.get_logger()
32
33
34class AdapterProxy(ContainerProxy):
35
36 def __init__(self, kafka_proxy, core_topic, my_listening_topic):
37 super(AdapterProxy, self).__init__(kafka_proxy,
38 core_topic,
39 my_listening_topic)
40
41 def _to_string(self, unicode_str):
42 if unicode_str is not None:
43 if type(unicode_str) == unicode:
44 return unicode_str.encode('ascii', 'ignore')
45 else:
46 return unicode_str
47 else:
48 return ""
49
50 @ContainerProxy.wrap_request(None)
51 @inlineCallbacks
52 def send_inter_adapter_message(self,
53 msg,
54 type,
55 from_adapter,
56 to_adapter,
57 to_device_id=None,
58 proxy_device_id=None,
khenaidoo91ecfd62018-11-04 17:13:42 -050059 message_id=None):
khenaidoo6fdf0ba2018-11-02 14:38:33 -040060 """
61 Sends a message directly to an adapter. This is typically used to send
62 proxied messages from one adapter to another. An initial ACK response
63 is sent back to the invoking adapter. If there is subsequent response
64 to be sent back (async) then the adapter receiving this request will
65 use this same API to send back the async response.
66 :param msg : GRPC message to send
67 :param type : InterAdapterMessageType of the message to send
68 :param from_adapter: Name of the adapter making the request.
69 :param to_adapter: Name of the remote adapter.
70 :param to_device_id: The ID of the device for to the message is
71 intended. if it's None then the message is not intended to a specific
72 device. Its interpretation is adapter specific.
73 :param proxy_device_id: The ID of the device which will proxy that
74 message. If it's None then there is no specific device to proxy the
75 message. Its interpretation is adapter specific.
khenaidoo91ecfd62018-11-04 17:13:42 -050076 :param message_id: A unique number for this transaction that the
khenaidoo6fdf0ba2018-11-02 14:38:33 -040077 adapter may use to correlate a request and an async response.
78 """
79
80 try:
81 # validate params
82 assert msg
83 assert from_adapter
84 assert to_adapter
85
86 # Build the inter adapter message
87 h = InterAdapterHeader()
88 h.type = type
89 h.from_topic = self._to_string(from_adapter)
90 h.to_topic = self._to_string(to_adapter)
91 h.to_device_id = self._to_string(to_device_id)
92 h.proxy_device_id = self._to_string(proxy_device_id)
93
khenaidoo91ecfd62018-11-04 17:13:42 -050094 if message_id:
95 h.id = self._to_string(message_id)
khenaidoo6fdf0ba2018-11-02 14:38:33 -040096 else:
97 h.id = uuid4().hex
98
99 h.timestamp = int(round(time.time() * 1000))
100 iaMsg = InterAdapterMessage()
101 iaMsg.header.CopyFrom(h)
102 iaMsg.body.Pack(msg)
103
104 log.debug("sending-inter-adapter-message", header=iaMsg.header)
105 res = yield self.invoke(rpc="process_inter_adapter_message",
106 to_topic=iaMsg.header.to_topic,
107 msg=iaMsg)
108 returnValue(res)
109 except Exception as e:
110 log.exception("error-sending-request", e=e)