blob: b30221475748d696f88238f65f4f8fa8b2590ed9 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package common
17
18import (
19 "context"
Esin Karamanccb714b2019-11-29 15:02:06 +000020 "time"
21
William Kurkianea869482019-04-09 15:16:11 -040022 "github.com/golang/protobuf/proto"
23 "github.com/golang/protobuf/ptypes"
24 "github.com/golang/protobuf/ptypes/any"
25 "github.com/google/uuid"
Esin Karamanccb714b2019-11-29 15:02:06 +000026 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
27 "github.com/opencord/voltha-lib-go/v3/pkg/log"
28 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
William Kurkianea869482019-04-09 15:16:11 -040029)
30
31type AdapterProxy struct {
32 kafkaICProxy *kafka.InterContainerProxy
33 adapterTopic string
34 coreTopic string
35}
36
37func NewAdapterProxy(kafkaProxy *kafka.InterContainerProxy, adapterTopic string, coreTopic string) *AdapterProxy {
38 var proxy AdapterProxy
39 proxy.kafkaICProxy = kafkaProxy
40 proxy.adapterTopic = adapterTopic
41 proxy.coreTopic = coreTopic
Esin Karamanccb714b2019-11-29 15:02:06 +000042 logger.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
William Kurkianea869482019-04-09 15:16:11 -040043 return &proxy
44}
45
46func (ap *AdapterProxy) SendInterAdapterMessage(ctx context.Context,
47 msg proto.Message,
48 msgType ic.InterAdapterMessageType_Types,
49 fromAdapter string,
50 toAdapter string,
51 toDeviceId string,
52 proxyDeviceId string,
53 messageId string) error {
Esin Karamanccb714b2019-11-29 15:02:06 +000054 logger.Debugw("sending-inter-adapter-message", log.Fields{"type": msgType, "from": fromAdapter,
William Kurkianea869482019-04-09 15:16:11 -040055 "to": toAdapter, "toDevice": toDeviceId, "proxyDevice": proxyDeviceId})
56
57 //Marshal the message
58 var marshalledMsg *any.Any
59 var err error
60 if marshalledMsg, err = ptypes.MarshalAny(msg); err != nil {
Esin Karamanccb714b2019-11-29 15:02:06 +000061 logger.Warnw("cannot-marshal-msg", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -040062 return err
63 }
64
65 //Build the inter adapter message
66 header := &ic.InterAdapterHeader{
67 Type: msgType,
68 FromTopic: fromAdapter,
69 ToTopic: toAdapter,
70 ToDeviceId: toDeviceId,
71 ProxyDeviceId: proxyDeviceId,
72 }
73 if messageId != "" {
74 header.Id = messageId
75 } else {
76 header.Id = uuid.New().String()
77 }
78 header.Timestamp = time.Now().Unix()
79 iaMsg := &ic.InterAdapterMessage{
80 Header: header,
81 Body: marshalledMsg,
82 }
83 args := make([]*kafka.KVArg, 1)
84 args[0] = &kafka.KVArg{
85 Key: "msg",
86 Value: iaMsg,
87 }
88
89 // Set up the required rpc arguments
Matt Jeanneretcab955f2019-04-10 15:45:57 -040090 topic := kafka.Topic{Name: toAdapter}
91 replyToTopic := kafka.Topic{Name: fromAdapter}
92 rpc := "process_inter_adapter_message"
William Kurkianea869482019-04-09 15:16:11 -040093
94 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, proxyDeviceId, args...)
Esin Karamanccb714b2019-11-29 15:02:06 +000095 logger.Debugw("inter-adapter-msg-response", log.Fields{"replyTopic": replyToTopic, "success": success})
William Kurkianea869482019-04-09 15:16:11 -040096 return unPackResponse(rpc, "", success, result)
97}