blob: 8588fe4717b1735e35cde5f0851d419a7c8d1831 [file] [log] [blame]
khenaidood2b6df92018-12-13 16:37:20 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package common
17
18import (
19 "context"
Matteo Scandolod525ae32020-04-02 17:27:29 -070020 "github.com/opencord/voltha-lib-go/v3/pkg/db"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080021
khenaidood2b6df92018-12-13 16:37:20 -050022 "github.com/golang/protobuf/proto"
23 "github.com/golang/protobuf/ptypes"
24 "github.com/golang/protobuf/ptypes/any"
25 "github.com/google/uuid"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080026 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
27 "github.com/opencord/voltha-lib-go/v3/pkg/log"
28 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
khenaidood2b6df92018-12-13 16:37:20 -050029)
30
31type AdapterProxy struct {
npujar467fe752020-01-16 20:17:45 +053032 kafkaICProxy kafka.InterContainerProxy
khenaidood2b6df92018-12-13 16:37:20 -050033 coreTopic string
Matteo Scandolod525ae32020-04-02 17:27:29 -070034 endpointMgr kafka.EndpointManager
khenaidood2b6df92018-12-13 16:37:20 -050035}
36
Girish Kumardef46fc2020-08-05 18:20:11 +000037func NewAdapterProxy(ctx context.Context, kafkaProxy kafka.InterContainerProxy, coreTopic string, backend *db.Backend) *AdapterProxy {
Matteo Scandolod525ae32020-04-02 17:27:29 -070038 proxy := AdapterProxy{
39 kafkaICProxy: kafkaProxy,
Matteo Scandolod525ae32020-04-02 17:27:29 -070040 coreTopic: coreTopic,
41 endpointMgr: kafka.NewEndpointManager(backend),
42 }
Girish Kumardef46fc2020-08-05 18:20:11 +000043 logger.Debugw(ctx, "topics", log.Fields{"core": proxy.coreTopic})
khenaidood2b6df92018-12-13 16:37:20 -050044 return &proxy
45}
46
47func (ap *AdapterProxy) SendInterAdapterMessage(ctx context.Context,
48 msg proto.Message,
49 msgType ic.InterAdapterMessageType_Types,
50 fromAdapter string,
51 toAdapter string,
52 toDeviceId string,
53 proxyDeviceId string,
54 messageId string) error {
Rohan Agrawal31f21802020-06-12 05:38:46 +000055 logger.Debugw(ctx, "sending-inter-adapter-message", log.Fields{"type": msgType, "from": fromAdapter,
khenaidood2b6df92018-12-13 16:37:20 -050056 "to": toAdapter, "toDevice": toDeviceId, "proxyDevice": proxyDeviceId})
57
58 //Marshal the message
59 var marshalledMsg *any.Any
60 var err error
61 if marshalledMsg, err = ptypes.MarshalAny(msg); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +000062 logger.Warnw(ctx, "cannot-marshal-msg", log.Fields{"error": err})
khenaidood2b6df92018-12-13 16:37:20 -050063 return err
64 }
65
Girish Kumardef46fc2020-08-05 18:20:11 +000066 // Set up the required rpc arguments
67 endpoint, err := ap.endpointMgr.GetEndpoint(ctx, toDeviceId, toAdapter)
68 if err != nil {
69 return err
70 }
71
khenaidood2b6df92018-12-13 16:37:20 -050072 //Build the inter adapter message
73 header := &ic.InterAdapterHeader{
74 Type: msgType,
75 FromTopic: fromAdapter,
Girish Kumardef46fc2020-08-05 18:20:11 +000076 ToTopic: string(endpoint),
khenaidood2b6df92018-12-13 16:37:20 -050077 ToDeviceId: toDeviceId,
78 ProxyDeviceId: proxyDeviceId,
79 }
80 if messageId != "" {
81 header.Id = messageId
82 } else {
83 header.Id = uuid.New().String()
84 }
Scott Baker504b4802020-04-17 10:12:20 -070085 header.Timestamp = ptypes.TimestampNow()
khenaidood2b6df92018-12-13 16:37:20 -050086 iaMsg := &ic.InterAdapterMessage{
87 Header: header,
88 Body: marshalledMsg,
89 }
90 args := make([]*kafka.KVArg, 1)
91 args[0] = &kafka.KVArg{
92 Key: "msg",
93 Value: iaMsg,
94 }
95
Matteo Scandolod525ae32020-04-02 17:27:29 -070096 topic := kafka.Topic{Name: string(endpoint)}
cuilin20186b6a9952019-04-03 22:37:11 -070097 replyToTopic := kafka.Topic{Name: fromAdapter}
98 rpc := "process_inter_adapter_message"
khenaidood2b6df92018-12-13 16:37:20 -050099
Girish Kumardef46fc2020-08-05 18:20:11 +0000100 // Add a indication in context to differentiate this Inter Adapter message during Span processing in Kafka IC proxy
101 ctx = context.WithValue(ctx, "inter-adapter-msg-type", msgType)
khenaidoobdcb8e02019-03-06 16:28:56 -0500102 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, proxyDeviceId, args...)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000103 logger.Debugw(ctx, "inter-adapter-msg-response", log.Fields{"replyTopic": replyToTopic, "success": success})
104 return unPackResponse(ctx, rpc, "", success, result)
khenaidood2b6df92018-12-13 16:37:20 -0500105}