blob: cd5750fe2009d6fb53b5184dc93fd642b305f49f [file] [log] [blame]
Takahiro Suzuki241c10e2020-12-17 20:17:57 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package common
17
18import (
19 "context"
20 "github.com/opencord/voltha-lib-go/v3/pkg/db"
21
22 "github.com/golang/protobuf/proto"
23 "github.com/golang/protobuf/ptypes"
24 "github.com/golang/protobuf/ptypes/any"
25 "github.com/google/uuid"
26 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
27 "github.com/opencord/voltha-lib-go/v3/pkg/log"
28 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
29)
30
31type AdapterProxy struct {
32 kafkaICProxy kafka.InterContainerProxy
33 adapterTopic string
34 coreTopic string
35 endpointMgr kafka.EndpointManager
36}
37
38func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string, backend *db.Backend) *AdapterProxy {
39 proxy := AdapterProxy{
40 kafkaICProxy: kafkaProxy,
41 adapterTopic: adapterTopic,
42 coreTopic: coreTopic,
43 endpointMgr: kafka.NewEndpointManager(backend),
44 }
45 logger.Debugw("topics", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
46 return &proxy
47}
48
49func (ap *AdapterProxy) SendInterAdapterMessage(ctx context.Context,
50 msg proto.Message,
51 msgType ic.InterAdapterMessageType_Types,
52 fromAdapter string,
53 toAdapter string,
54 toDeviceId string,
55 proxyDeviceId string,
56 messageId string) error {
57 logger.Debugw("sending-inter-adapter-message", log.Fields{"type": msgType, "from": fromAdapter,
58 "to": toAdapter, "toDevice": toDeviceId, "proxyDevice": proxyDeviceId})
59
60 //Marshal the message
61 var marshalledMsg *any.Any
62 var err error
63 if marshalledMsg, err = ptypes.MarshalAny(msg); err != nil {
64 logger.Warnw("cannot-marshal-msg", log.Fields{"error": err})
65 return err
66 }
67
68 //Build the inter adapter message
69 header := &ic.InterAdapterHeader{
70 Type: msgType,
71 FromTopic: fromAdapter,
72 ToTopic: toAdapter,
73 ToDeviceId: toDeviceId,
74 ProxyDeviceId: proxyDeviceId,
75 }
76 if messageId != "" {
77 header.Id = messageId
78 } else {
79 header.Id = uuid.New().String()
80 }
81 header.Timestamp = ptypes.TimestampNow()
82 iaMsg := &ic.InterAdapterMessage{
83 Header: header,
84 Body: marshalledMsg,
85 }
86 args := make([]*kafka.KVArg, 1)
87 args[0] = &kafka.KVArg{
88 Key: "msg",
89 Value: iaMsg,
90 }
91
92 // Set up the required rpc arguments
93 endpoint, err := ap.endpointMgr.GetEndpoint(toDeviceId, toAdapter)
94 if err != nil {
95 return err
96 }
97 topic := kafka.Topic{Name: string(endpoint)}
98 replyToTopic := kafka.Topic{Name: fromAdapter}
99 rpc := "process_inter_adapter_message"
100
101 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, proxyDeviceId, args...)
102 logger.Debugw("inter-adapter-msg-response", log.Fields{"replyTopic": replyToTopic, "success": success})
103 return unPackResponse(rpc, "", success, result)
104}