blob: fc310412f2fda47dc756cebecb5cf957bf4b26ab [file] [log] [blame]
Scott Baker2c1c4822019-10-16 11:02:41 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package common
17
18import (
19 "context"
Girish Gowdra248971a2021-06-01 15:14:15 -070020 "github.com/opencord/voltha-lib-go/v5/pkg/db"
21 "google.golang.org/grpc/status"
serkant.uluderyab38671c2019-11-01 09:35:38 -070022
Scott Baker2c1c4822019-10-16 11:02:41 -070023 "github.com/golang/protobuf/proto"
24 "github.com/golang/protobuf/ptypes"
25 "github.com/golang/protobuf/ptypes/any"
26 "github.com/google/uuid"
Girish Gowdra248971a2021-06-01 15:14:15 -070027 "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
28 "github.com/opencord/voltha-lib-go/v5/pkg/log"
Girish Gowdra89c985b2020-10-14 15:02:09 -070029 ic "github.com/opencord/voltha-protos/v4/go/inter_container"
Scott Baker2c1c4822019-10-16 11:02:41 -070030)
31
32type AdapterProxy struct {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080033 kafkaICProxy kafka.InterContainerProxy
Scott Baker2c1c4822019-10-16 11:02:41 -070034 coreTopic string
khenaidoob6238b32020-04-07 12:07:36 -040035 endpointMgr kafka.EndpointManager
Scott Baker2c1c4822019-10-16 11:02:41 -070036}
37
Rohan Agrawal00e5efc2020-07-24 11:43:02 +000038func NewAdapterProxy(ctx context.Context, kafkaProxy kafka.InterContainerProxy, coreTopic string, backend *db.Backend) *AdapterProxy {
khenaidoob6238b32020-04-07 12:07:36 -040039 proxy := AdapterProxy{
40 kafkaICProxy: kafkaProxy,
khenaidoob6238b32020-04-07 12:07:36 -040041 coreTopic: coreTopic,
42 endpointMgr: kafka.NewEndpointManager(backend),
43 }
Rohan Agrawal00e5efc2020-07-24 11:43:02 +000044 logger.Debugw(ctx, "topics", log.Fields{"core": proxy.coreTopic})
Scott Baker2c1c4822019-10-16 11:02:41 -070045 return &proxy
46}
47
48func (ap *AdapterProxy) SendInterAdapterMessage(ctx context.Context,
49 msg proto.Message,
50 msgType ic.InterAdapterMessageType_Types,
51 fromAdapter string,
52 toAdapter string,
53 toDeviceId string,
54 proxyDeviceId string,
55 messageId string) error {
Neha Sharma94f16a92020-06-26 04:17:55 +000056 logger.Debugw(ctx, "sending-inter-adapter-message", log.Fields{"type": msgType, "from": fromAdapter,
Scott Baker2c1c4822019-10-16 11:02:41 -070057 "to": toAdapter, "toDevice": toDeviceId, "proxyDevice": proxyDeviceId})
58
59 //Marshal the message
60 var marshalledMsg *any.Any
61 var err error
62 if marshalledMsg, err = ptypes.MarshalAny(msg); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +000063 logger.Warnw(ctx, "cannot-marshal-msg", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -070064 return err
65 }
66
Rohan Agrawal00e5efc2020-07-24 11:43:02 +000067 // Set up the required rpc arguments
68 endpoint, err := ap.endpointMgr.GetEndpoint(ctx, toDeviceId, toAdapter)
69 if err != nil {
70 return err
71 }
72
Scott Baker2c1c4822019-10-16 11:02:41 -070073 //Build the inter adapter message
74 header := &ic.InterAdapterHeader{
75 Type: msgType,
76 FromTopic: fromAdapter,
Rohan Agrawal00e5efc2020-07-24 11:43:02 +000077 ToTopic: string(endpoint),
Scott Baker2c1c4822019-10-16 11:02:41 -070078 ToDeviceId: toDeviceId,
79 ProxyDeviceId: proxyDeviceId,
80 }
81 if messageId != "" {
82 header.Id = messageId
83 } else {
84 header.Id = uuid.New().String()
85 }
Scott Baker84a55ce2020-04-17 10:11:30 -070086 header.Timestamp = ptypes.TimestampNow()
Scott Baker2c1c4822019-10-16 11:02:41 -070087 iaMsg := &ic.InterAdapterMessage{
88 Header: header,
89 Body: marshalledMsg,
90 }
91 args := make([]*kafka.KVArg, 1)
92 args[0] = &kafka.KVArg{
93 Key: "msg",
94 Value: iaMsg,
95 }
96
khenaidoob6238b32020-04-07 12:07:36 -040097 topic := kafka.Topic{Name: string(endpoint)}
Scott Baker2c1c4822019-10-16 11:02:41 -070098 replyToTopic := kafka.Topic{Name: fromAdapter}
99 rpc := "process_inter_adapter_message"
100
Girish Kumar74240652020-07-10 11:54:28 +0000101 // Add a indication in context to differentiate this Inter Adapter message during Span processing in Kafka IC proxy
102 ctx = context.WithValue(ctx, "inter-adapter-msg-type", msgType)
Scott Baker2c1c4822019-10-16 11:02:41 -0700103 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, proxyDeviceId, args...)
Neha Sharma94f16a92020-06-26 04:17:55 +0000104 logger.Debugw(ctx, "inter-adapter-msg-response", log.Fields{"replyTopic": replyToTopic, "success": success})
105 return unPackResponse(ctx, rpc, "", success, result)
Scott Baker2c1c4822019-10-16 11:02:41 -0700106}
Girish Gowdra248971a2021-06-01 15:14:15 -0700107
108func (ap *AdapterProxy) TechProfileInstanceRequest(ctx context.Context,
109 tpPath string,
110 parentPonPort uint32,
111 onuID uint32,
112 uniID uint32,
113 fromAdapter string,
114 toAdapter string,
115 toDeviceId string,
116 proxyDeviceId string) (*ic.InterAdapterTechProfileDownloadMessage, error) {
117 logger.Debugw(ctx, "sending-tech-profile-instance-request-message", log.Fields{"from": fromAdapter,
118 "to": toAdapter, "toDevice": toDeviceId, "proxyDevice": proxyDeviceId})
119
120 // Set up the required rpc arguments
121 endpoint, err := ap.endpointMgr.GetEndpoint(ctx, toDeviceId, toAdapter)
122 if err != nil {
123 return nil, err
124 }
125
126 //Build the inter adapter message
127 tpReqMsg := &ic.InterAdapterTechProfileInstanceRequestMessage{
128 TpInstancePath: tpPath,
129 ParentDeviceId: toDeviceId,
130 ParentPonPort: parentPonPort,
131 OnuId: onuID,
132 UniId: uniID,
133 }
134
135 args := make([]*kafka.KVArg, 1)
136 args[0] = &kafka.KVArg{
137 Key: "msg",
138 Value: tpReqMsg,
139 }
140
141 topic := kafka.Topic{Name: string(endpoint)}
142 replyToTopic := kafka.Topic{Name: fromAdapter}
143 rpc := "process_tech_profile_instance_request"
144
145 ctx = context.WithValue(ctx, "inter-adapter-tp-req-msg", tpPath)
146 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, proxyDeviceId, args...)
147 logger.Debugw(ctx, "inter-adapter-msg-response", log.Fields{"replyTopic": replyToTopic, "success": success})
148 if success {
149 tpDwnldMsg := &ic.InterAdapterTechProfileDownloadMessage{}
150 if err := ptypes.UnmarshalAny(result, tpDwnldMsg); err != nil {
151 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
152 return nil, err
153 }
154 return tpDwnldMsg, nil
155 } else {
156 unpackResult := &ic.Error{}
157 var err error
158 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
159 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
160 }
161 logger.Debugw(ctx, "TechProfileInstanceRequest-return", log.Fields{"tpPath": tpPath, "success": success, "error": err})
162
163 return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
164 }
165}