blob: bbae0ed054c07b468155be4dd3a9ae8b2fa8f80f [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package common
17
18import (
19 "context"
Matteo Scandolo3ad5d2b2020-04-02 17:02:04 -070020 "github.com/opencord/voltha-lib-go/v3/pkg/db"
Esin Karamanccb714b2019-11-29 15:02:06 +000021 "time"
22
William Kurkianea869482019-04-09 15:16:11 -040023 "github.com/golang/protobuf/proto"
24 "github.com/golang/protobuf/ptypes"
25 "github.com/golang/protobuf/ptypes/any"
26 "github.com/google/uuid"
Esin Karamanccb714b2019-11-29 15:02:06 +000027 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
28 "github.com/opencord/voltha-lib-go/v3/pkg/log"
29 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
William Kurkianea869482019-04-09 15:16:11 -040030)
31
32type AdapterProxy struct {
npujarec5762e2020-01-01 14:08:48 +053033 kafkaICProxy kafka.InterContainerProxy
William Kurkianea869482019-04-09 15:16:11 -040034 adapterTopic string
35 coreTopic string
Matteo Scandolo3ad5d2b2020-04-02 17:02:04 -070036 endpointMgr kafka.EndpointManager
William Kurkianea869482019-04-09 15:16:11 -040037}
38
Matteo Scandolo3ad5d2b2020-04-02 17:02:04 -070039func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string, backend *db.Backend) *AdapterProxy {
40 proxy := AdapterProxy{
41 kafkaICProxy: kafkaProxy,
42 adapterTopic: adapterTopic,
43 coreTopic: coreTopic,
44 endpointMgr: kafka.NewEndpointManager(backend),
45 }
46 logger.Debugw("topics", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
William Kurkianea869482019-04-09 15:16:11 -040047 return &proxy
48}
49
50func (ap *AdapterProxy) SendInterAdapterMessage(ctx context.Context,
51 msg proto.Message,
52 msgType ic.InterAdapterMessageType_Types,
53 fromAdapter string,
54 toAdapter string,
55 toDeviceId string,
56 proxyDeviceId string,
57 messageId string) error {
Esin Karamanccb714b2019-11-29 15:02:06 +000058 logger.Debugw("sending-inter-adapter-message", log.Fields{"type": msgType, "from": fromAdapter,
William Kurkianea869482019-04-09 15:16:11 -040059 "to": toAdapter, "toDevice": toDeviceId, "proxyDevice": proxyDeviceId})
60
61 //Marshal the message
62 var marshalledMsg *any.Any
63 var err error
64 if marshalledMsg, err = ptypes.MarshalAny(msg); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +000065 logger.Warnw("cannot-marshal-msg", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -040066 return err
67 }
68
69 //Build the inter adapter message
70 header := &ic.InterAdapterHeader{
71 Type: msgType,
72 FromTopic: fromAdapter,
73 ToTopic: toAdapter,
74 ToDeviceId: toDeviceId,
75 ProxyDeviceId: proxyDeviceId,
76 }
77 if messageId != "" {
78 header.Id = messageId
79 } else {
80 header.Id = uuid.New().String()
81 }
82 header.Timestamp = time.Now().Unix()
83 iaMsg := &ic.InterAdapterMessage{
84 Header: header,
85 Body: marshalledMsg,
86 }
87 args := make([]*kafka.KVArg, 1)
88 args[0] = &kafka.KVArg{
89 Key: "msg",
90 Value: iaMsg,
91 }
92
93 // Set up the required rpc arguments
Matteo Scandolo3ad5d2b2020-04-02 17:02:04 -070094 endpoint, err := ap.endpointMgr.GetEndpoint(toDeviceId, toAdapter)
95 if err != nil {
96 return err
97 }
98 topic := kafka.Topic{Name: string(endpoint)}
Matt Jeanneretcab955f2019-04-10 15:45:57 -040099 replyToTopic := kafka.Topic{Name: fromAdapter}
100 rpc := "process_inter_adapter_message"
William Kurkianea869482019-04-09 15:16:11 -0400101
102 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, proxyDeviceId, args...)
Esin Karamanccb714b2019-11-29 15:02:06 +0000103 logger.Debugw("inter-adapter-msg-response", log.Fields{"replyTopic": replyToTopic, "success": success})
William Kurkianea869482019-04-09 15:16:11 -0400104 return unPackResponse(rpc, "", success, result)
105}