blob: fee70c8ee6ed1163e1889c6ae652f570200686e6 [file] [log] [blame]
Scott Baker2c1c4822019-10-16 11:02:41 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package common
17
18import (
19 "context"
20 "github.com/golang/protobuf/proto"
21 "github.com/golang/protobuf/ptypes"
22 "github.com/golang/protobuf/ptypes/any"
23 "github.com/google/uuid"
Scott Baker2c1c4822019-10-16 11:02:41 -070024 "github.com/opencord/voltha-lib-go/pkg/kafka"
Scott Bakere73f91e2019-10-17 12:58:11 -070025 "github.com/opencord/voltha-lib-go/pkg/log"
Scott Baker2c1c4822019-10-16 11:02:41 -070026 ic "github.com/opencord/voltha-protos/go/inter_container"
27 "time"
28)
29
30type AdapterProxy struct {
31 kafkaICProxy *kafka.InterContainerProxy
32 adapterTopic string
33 coreTopic string
34}
35
36func NewAdapterProxy(kafkaProxy *kafka.InterContainerProxy, adapterTopic string, coreTopic string) *AdapterProxy {
37 var proxy AdapterProxy
38 proxy.kafkaICProxy = kafkaProxy
39 proxy.adapterTopic = adapterTopic
40 proxy.coreTopic = coreTopic
41 log.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
42 return &proxy
43}
44
45func (ap *AdapterProxy) SendInterAdapterMessage(ctx context.Context,
46 msg proto.Message,
47 msgType ic.InterAdapterMessageType_Types,
48 fromAdapter string,
49 toAdapter string,
50 toDeviceId string,
51 proxyDeviceId string,
52 messageId string) error {
53 log.Debugw("sending-inter-adapter-message", log.Fields{"type": msgType, "from": fromAdapter,
54 "to": toAdapter, "toDevice": toDeviceId, "proxyDevice": proxyDeviceId})
55
56 //Marshal the message
57 var marshalledMsg *any.Any
58 var err error
59 if marshalledMsg, err = ptypes.MarshalAny(msg); err != nil {
60 log.Warnw("cannot-marshal-msg", log.Fields{"error": err})
61 return err
62 }
63
64 //Build the inter adapter message
65 header := &ic.InterAdapterHeader{
66 Type: msgType,
67 FromTopic: fromAdapter,
68 ToTopic: toAdapter,
69 ToDeviceId: toDeviceId,
70 ProxyDeviceId: proxyDeviceId,
71 }
72 if messageId != "" {
73 header.Id = messageId
74 } else {
75 header.Id = uuid.New().String()
76 }
77 header.Timestamp = time.Now().Unix()
78 iaMsg := &ic.InterAdapterMessage{
79 Header: header,
80 Body: marshalledMsg,
81 }
82 args := make([]*kafka.KVArg, 1)
83 args[0] = &kafka.KVArg{
84 Key: "msg",
85 Value: iaMsg,
86 }
87
88 // Set up the required rpc arguments
89 topic := kafka.Topic{Name: toAdapter}
90 replyToTopic := kafka.Topic{Name: fromAdapter}
91 rpc := "process_inter_adapter_message"
92
93 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, proxyDeviceId, args...)
94 log.Debugw("inter-adapter-msg-response", log.Fields{"replyTopic": replyToTopic, "success": success})
95 return unPackResponse(rpc, "", success, result)
96}