blob: 939f301f15f945ac184f02db686df4875ba1510f [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
npujar1d86a522019-11-14 17:11:16 +053016
Kent Hagerman2b216042020-04-03 18:28:56 -040017package remote
khenaidoob9203542018-09-17 22:56:37 -040018
19import (
20 "context"
serkant.uluderya8ff291d2020-05-20 00:58:00 -070021
serkant.uluderya2ae470f2020-01-21 11:13:09 -080022 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
23 "github.com/opencord/voltha-lib-go/v3/pkg/log"
24 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
25 "github.com/opencord/voltha-protos/v3/go/openflow_13"
26 "github.com/opencord/voltha-protos/v3/go/voltha"
khenaidoob9203542018-09-17 22:56:37 -040027)
28
npujar1d86a522019-11-14 17:11:16 +053029// AdapterProxy represents adapter proxy attributes
khenaidoob9203542018-09-17 22:56:37 -040030type AdapterProxy struct {
Kent Hagerman2b216042020-04-03 18:28:56 -040031 kafka.EndpointManager
khenaidoo54e0ddf2019-02-27 16:21:33 -050032 deviceTopicRegistered bool
serkant.uluderya8ff291d2020-05-20 00:58:00 -070033 coreTopic string
npujar467fe752020-01-16 20:17:45 +053034 kafkaICProxy kafka.InterContainerProxy
khenaidoob9203542018-09-17 22:56:37 -040035}
36
npujar1d86a522019-11-14 17:11:16 +053037// NewAdapterProxy will return adapter proxy instance
serkant.uluderya8ff291d2020-05-20 00:58:00 -070038func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, coreTopic string, endpointManager kafka.EndpointManager) *AdapterProxy {
Kent Hagermana6d0c362019-07-30 12:50:21 -040039 return &AdapterProxy{
Kent Hagerman2b216042020-04-03 18:28:56 -040040 EndpointManager: endpointManager,
Kent Hagermana6d0c362019-07-30 12:50:21 -040041 kafkaICProxy: kafkaProxy,
serkant.uluderya8ff291d2020-05-20 00:58:00 -070042 coreTopic: coreTopic,
Kent Hagermana6d0c362019-07-30 12:50:21 -040043 deviceTopicRegistered: false,
44 }
khenaidoob9203542018-09-17 22:56:37 -040045}
46
serkant.uluderya334479d2019-04-10 08:26:15 -070047func (ap *AdapterProxy) getCoreTopic() kafka.Topic {
serkant.uluderya8ff291d2020-05-20 00:58:00 -070048 return kafka.Topic{Name: ap.coreTopic}
khenaidoo54e0ddf2019-02-27 16:21:33 -050049}
50
Matteo Scandolod525ae32020-04-02 17:27:29 -070051func (ap *AdapterProxy) getAdapterTopic(deviceID string, adapterType string) (*kafka.Topic, error) {
52
Kent Hagerman2b216042020-04-03 18:28:56 -040053 endpoint, err := ap.GetEndpoint(deviceID, adapterType)
Matteo Scandolod525ae32020-04-02 17:27:29 -070054 if err != nil {
55 return nil, err
56 }
57
58 return &kafka.Topic{Name: string(endpoint)}, nil
khenaidoo54e0ddf2019-02-27 16:21:33 -050059}
60
khenaidoo442e7c72020-03-10 16:13:48 -040061func (ap *AdapterProxy) sendRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic,
62 waitForResponse bool, deviceID string, kvArgs ...*kafka.KVArg) (chan *kafka.RpcResponse, error) {
63
64 // Sent the request to kafka
65 respChnl := ap.kafkaICProxy.InvokeAsyncRPC(ctx, rpc, toTopic, replyToTopic, waitForResponse, deviceID, kvArgs...)
66
67 // Wait for first response which would indicate whether the request was successfully sent to kafka.
68 firstResponse, ok := <-respChnl
69 if !ok || firstResponse.MType != kafka.RpcSent {
Girish Kumarf56a4682020-03-20 20:07:46 +000070 logger.Errorw("failure to request to kafka", log.Fields{"rpc": rpc, "device-id": deviceID, "error": firstResponse.Err})
khenaidoo442e7c72020-03-10 16:13:48 -040071 return nil, firstResponse.Err
72 }
73 // return the kafka channel for the caller to wait for the response of the RPC call
74 return respChnl, nil
75}
76
Kent Hagerman2b216042020-04-03 18:28:56 -040077// AdoptDevice invokes adopt device rpc
78func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
79 logger.Debugw("AdoptDevice", log.Fields{"device-id": device.Id})
khenaidoo92e62c52018-10-03 14:02:54 -040080 rpc := "adopt_device"
Matteo Scandolod525ae32020-04-02 17:27:29 -070081 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
82 if err != nil {
83 return nil, err
84 }
khenaidoo442e7c72020-03-10 16:13:48 -040085 args := []*kafka.KVArg{
86 {Key: "device", Value: device},
khenaidoob9203542018-09-17 22:56:37 -040087 }
khenaidoo54e0ddf2019-02-27 16:21:33 -050088 replyToTopic := ap.getCoreTopic()
khenaidoo54e0ddf2019-02-27 16:21:33 -050089 ap.deviceTopicRegistered = true
Matteo Scandolod525ae32020-04-02 17:27:29 -070090 logger.Debugw("adoptDevice-send-request", log.Fields{"device-id": device.Id, "deviceType": device.Type, "serialNumber": device.SerialNumber})
91 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo92e62c52018-10-03 14:02:54 -040092}
93
Kent Hagerman2b216042020-04-03 18:28:56 -040094// DisableDevice invokes disable device rpc
95func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
96 logger.Debugw("DisableDevice", log.Fields{"device-id": device.Id})
khenaidoo92e62c52018-10-03 14:02:54 -040097 rpc := "disable_device"
Matteo Scandolod525ae32020-04-02 17:27:29 -070098 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
99 if err != nil {
100 return nil, err
101 }
khenaidoo442e7c72020-03-10 16:13:48 -0400102 args := []*kafka.KVArg{
103 {Key: "device", Value: device},
khenaidoob9203542018-09-17 22:56:37 -0400104 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500105 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700106 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400107}
108
Kent Hagerman2b216042020-04-03 18:28:56 -0400109// ReEnableDevice invokes reenable device rpc
110func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
111 logger.Debugw("ReEnableDevice", log.Fields{"device-id": device.Id})
khenaidoo4d4802d2018-10-04 21:59:49 -0400112 rpc := "reenable_device"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700113 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
114 if err != nil {
115 return nil, err
116 }
khenaidoo442e7c72020-03-10 16:13:48 -0400117 args := []*kafka.KVArg{
118 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400119 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500120 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700121 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400122}
123
Kent Hagerman2b216042020-04-03 18:28:56 -0400124// RebootDevice invokes reboot device rpc
125func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
126 logger.Debugw("RebootDevice", log.Fields{"device-id": device.Id})
khenaidoo4d4802d2018-10-04 21:59:49 -0400127 rpc := "reboot_device"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700128 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
129 if err != nil {
130 return nil, err
131 }
khenaidoo442e7c72020-03-10 16:13:48 -0400132 args := []*kafka.KVArg{
133 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400134 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500135 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700136 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400137}
138
Kent Hagerman2b216042020-04-03 18:28:56 -0400139// DeleteDevice invokes delete device rpc
140func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
141 logger.Debugw("DeleteDevice", log.Fields{"device-id": device.Id})
khenaidoo4d4802d2018-10-04 21:59:49 -0400142 rpc := "delete_device"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700143 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
144 if err != nil {
145 return nil, err
146 }
khenaidoo442e7c72020-03-10 16:13:48 -0400147 args := []*kafka.KVArg{
148 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400149 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500150 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700151 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400152}
153
Kent Hagerman2b216042020-04-03 18:28:56 -0400154// GetOfpDeviceInfo invokes get ofp device info rpc
155func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
156 logger.Debugw("GetOfpDeviceInfo", log.Fields{"device-id": device.Id})
khenaidoo442e7c72020-03-10 16:13:48 -0400157 rpc := "get_ofp_device_info"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700158 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
159 if err != nil {
160 return nil, err
161 }
khenaidoo442e7c72020-03-10 16:13:48 -0400162 args := []*kafka.KVArg{
163 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400164 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500165 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700166 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400167}
168
Kent Hagerman2b216042020-04-03 18:28:56 -0400169// ReconcileDevice invokes reconcile device rpc
170func (ap *AdapterProxy) ReconcileDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
171 logger.Debugw("ReconcileDevice", log.Fields{"device-id": device.Id})
Matt Jeanneret7cf8e0b2020-01-09 11:57:51 -0500172 rpc := "reconcile_device"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700173 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
174 if err != nil {
175 return nil, err
176 }
khenaidooba6b6c42019-08-02 09:11:56 -0400177 args := []*kafka.KVArg{
178 {Key: "device", Value: device},
179 }
khenaidooba6b6c42019-08-02 09:11:56 -0400180 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700181 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400182}
183
Kent Hagerman2b216042020-04-03 18:28:56 -0400184// DownloadImage invokes download image rpc
185func (ap *AdapterProxy) DownloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
186 logger.Debugw("DownloadImage", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500187 rpc := "download_image"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700188 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
189 if err != nil {
190 return nil, err
191 }
khenaidoo442e7c72020-03-10 16:13:48 -0400192 args := []*kafka.KVArg{
193 {Key: "device", Value: device},
194 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500195 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500196 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700197 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400198}
199
Kent Hagerman2b216042020-04-03 18:28:56 -0400200// GetImageDownloadStatus invokes get image download status rpc
201func (ap *AdapterProxy) GetImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
202 logger.Debugw("GetImageDownloadStatus", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500203 rpc := "get_image_download_status"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700204 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
205 if err != nil {
206 return nil, err
207 }
khenaidoo442e7c72020-03-10 16:13:48 -0400208 args := []*kafka.KVArg{
209 {Key: "device", Value: device},
210 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500211 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500212 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700213 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400214}
215
Kent Hagerman2b216042020-04-03 18:28:56 -0400216// CancelImageDownload invokes cancel image download rpc
217func (ap *AdapterProxy) CancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
218 logger.Debugw("CancelImageDownload", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500219 rpc := "cancel_image_download"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700220 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
221 if err != nil {
222 return nil, err
223 }
khenaidoo442e7c72020-03-10 16:13:48 -0400224 args := []*kafka.KVArg{
225 {Key: "device", Value: device},
226 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500227 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500228 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700229 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400230}
231
Kent Hagerman2b216042020-04-03 18:28:56 -0400232// ActivateImageUpdate invokes activate image update rpc
233func (ap *AdapterProxy) ActivateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
234 logger.Debugw("ActivateImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500235 rpc := "activate_image_update"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700236 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
237 if err != nil {
238 return nil, err
239 }
khenaidoo442e7c72020-03-10 16:13:48 -0400240 args := []*kafka.KVArg{
241 {Key: "device", Value: device},
242 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500243 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500244 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700245 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400246}
247
Kent Hagerman2b216042020-04-03 18:28:56 -0400248// RevertImageUpdate invokes revert image update rpc
249func (ap *AdapterProxy) RevertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
250 logger.Debugw("RevertImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500251 rpc := "revert_image_update"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700252 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
253 if err != nil {
254 return nil, err
255 }
khenaidoo442e7c72020-03-10 16:13:48 -0400256 args := []*kafka.KVArg{
257 {Key: "device", Value: device},
258 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500259 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500260 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700261 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400262}
263
Kent Hagerman2b216042020-04-03 18:28:56 -0400264func (ap *AdapterProxy) PacketOut(ctx context.Context, deviceType string, deviceID string, outPort uint32, packet *openflow_13.OfpPacketOut) (chan *kafka.RpcResponse, error) {
265 logger.Debugw("PacketOut", log.Fields{"device-id": deviceID, "device-type": deviceType, "out-port": outPort})
Matteo Scandolod525ae32020-04-02 17:27:29 -0700266 toTopic, err := ap.getAdapterTopic(deviceID, deviceType)
267 if err != nil {
268 return nil, err
269 }
khenaidoofdbad6e2018-11-06 22:26:38 -0500270 rpc := "receive_packet_out"
khenaidoo442e7c72020-03-10 16:13:48 -0400271 args := []*kafka.KVArg{
272 {Key: "deviceId", Value: &ic.StrType{Val: deviceID}},
273 {Key: "outPort", Value: &ic.IntType{Val: int64(outPort)}},
274 {Key: "packet", Value: packet},
khenaidoofdbad6e2018-11-06 22:26:38 -0500275 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500276 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700277 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, deviceID, args...)
khenaidoofdbad6e2018-11-06 22:26:38 -0500278}
279
Kent Hagerman2b216042020-04-03 18:28:56 -0400280// UpdateFlowsBulk invokes update flows bulk rpc
281func (ap *AdapterProxy) UpdateFlowsBulk(ctx context.Context, device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
282 logger.Debugw("UpdateFlowsBulk", log.Fields{"device-id": device.Id, "flow-count": len(flows.Items), "group-count": len(groups.Items), "flow-metadata": flowMetadata})
Matteo Scandolod525ae32020-04-02 17:27:29 -0700283 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
284 if err != nil {
285 return nil, err
286 }
khenaidoo19d7b632018-10-30 10:49:50 -0400287 rpc := "update_flows_bulk"
khenaidoo442e7c72020-03-10 16:13:48 -0400288 args := []*kafka.KVArg{
289 {Key: "device", Value: device},
290 {Key: "flows", Value: flows},
291 {Key: "groups", Value: groups},
292 {Key: "flow_metadata", Value: flowMetadata},
khenaidoo19d7b632018-10-30 10:49:50 -0400293 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500294 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700295 return ap.sendRPC(context.TODO(), rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400296}
297
Kent Hagerman2b216042020-04-03 18:28:56 -0400298// UpdateFlowsIncremental invokes update flows incremental rpc
299func (ap *AdapterProxy) UpdateFlowsIncremental(ctx context.Context, device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
300 logger.Debugw("UpdateFlowsIncremental",
khenaidoo0458db62019-06-20 08:50:36 -0400301 log.Fields{
khenaidoo442e7c72020-03-10 16:13:48 -0400302 "device-id": device.Id,
303 "flow-to-add-count": len(flowChanges.ToAdd.Items),
304 "flow-to-delete-count": len(flowChanges.ToRemove.Items),
305 "group-to-add-count": len(groupChanges.ToAdd.Items),
306 "group-to-delete-count": len(groupChanges.ToRemove.Items),
307 "group-to-update-count": len(groupChanges.ToUpdate.Items),
khenaidoo0458db62019-06-20 08:50:36 -0400308 })
Matteo Scandolod525ae32020-04-02 17:27:29 -0700309 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
310 if err != nil {
311 return nil, err
312 }
Matt Jeanneretb0037422019-03-23 14:36:51 -0400313 rpc := "update_flows_incrementally"
khenaidoo442e7c72020-03-10 16:13:48 -0400314 args := []*kafka.KVArg{
315 {Key: "device", Value: device},
316 {Key: "flow_changes", Value: flowChanges},
317 {Key: "group_changes", Value: groupChanges},
318 {Key: "flow_metadata", Value: flowMetadata},
khenaidoo19d7b632018-10-30 10:49:50 -0400319 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500320 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700321 return ap.sendRPC(context.TODO(), rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400322}
323
Kent Hagerman2b216042020-04-03 18:28:56 -0400324// UpdatePmConfigs invokes update pm configs rpc
325func (ap *AdapterProxy) UpdatePmConfigs(ctx context.Context, device *voltha.Device, pmConfigs *voltha.PmConfigs) (chan *kafka.RpcResponse, error) {
326 logger.Debugw("UpdatePmConfigs", log.Fields{"device-id": device.Id, "pm-configs-id": pmConfigs.Id})
Matteo Scandolod525ae32020-04-02 17:27:29 -0700327 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
328 if err != nil {
329 return nil, err
330 }
Rohan Agrawal2a0c4492020-06-29 11:55:06 +0000331 rpc := "update_pm_config"
khenaidoo442e7c72020-03-10 16:13:48 -0400332 args := []*kafka.KVArg{
333 {Key: "device", Value: device},
334 {Key: "pm_configs", Value: pmConfigs},
khenaidoob3127472019-07-24 21:04:55 -0400335 }
khenaidoob3127472019-07-24 21:04:55 -0400336 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700337 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400338}
339
Kent Hagerman2b216042020-04-03 18:28:56 -0400340// SimulateAlarm invokes simulate alarm rpc
341func (ap *AdapterProxy) SimulateAlarm(ctx context.Context, device *voltha.Device, simulateReq *voltha.SimulateAlarmRequest) (chan *kafka.RpcResponse, error) {
342 logger.Debugw("SimulateAlarm", log.Fields{"device-id": device.Id, "simulate-req-id": simulateReq.Id})
serkant.uluderya334479d2019-04-10 08:26:15 -0700343 rpc := "simulate_alarm"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700344 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
345 if err != nil {
346 return nil, err
347 }
khenaidoo442e7c72020-03-10 16:13:48 -0400348 args := []*kafka.KVArg{
349 {Key: "device", Value: device},
350 {Key: "request", Value: simulateReq},
serkant.uluderya334479d2019-04-10 08:26:15 -0700351 }
serkant.uluderya334479d2019-04-10 08:26:15 -0700352 replyToTopic := ap.getCoreTopic()
353 ap.deviceTopicRegistered = true
Matteo Scandolod525ae32020-04-02 17:27:29 -0700354 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
serkant.uluderya334479d2019-04-10 08:26:15 -0700355}
kesavandbc2d1622020-01-21 00:42:01 -0500356
Kent Hagerman2b216042020-04-03 18:28:56 -0400357func (ap *AdapterProxy) DisablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
358 logger.Debugw("DisablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
kesavandbc2d1622020-01-21 00:42:01 -0500359 rpc := "disable_port"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700360 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
361 if err != nil {
362 return nil, err
363 }
khenaidoo442e7c72020-03-10 16:13:48 -0400364 args := []*kafka.KVArg{
365 {Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
366 {Key: "port", Value: port},
kesavandbc2d1622020-01-21 00:42:01 -0500367 }
kesavandbc2d1622020-01-21 00:42:01 -0500368 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700369 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
kesavandbc2d1622020-01-21 00:42:01 -0500370}
371
Kent Hagerman2b216042020-04-03 18:28:56 -0400372func (ap *AdapterProxy) EnablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
373 logger.Debugw("EnablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
kesavandbc2d1622020-01-21 00:42:01 -0500374 rpc := "enable_port"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700375 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
376 if err != nil {
377 return nil, err
378 }
khenaidoo442e7c72020-03-10 16:13:48 -0400379 args := []*kafka.KVArg{
380 {Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
381 {Key: "port", Value: port},
kesavandbc2d1622020-01-21 00:42:01 -0500382 }
kesavandbc2d1622020-01-21 00:42:01 -0500383 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700384 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
kesavandbc2d1622020-01-21 00:42:01 -0500385}
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500386
Kent Hagerman2b216042020-04-03 18:28:56 -0400387// ChildDeviceLost invokes child device_lost rpc
388func (ap *AdapterProxy) ChildDeviceLost(ctx context.Context, deviceType string, deviceID string, pPortNo uint32, onuID uint32) (chan *kafka.RpcResponse, error) {
389 logger.Debugw("ChildDeviceLost", log.Fields{"device-id": deviceID, "parent-port-no": pPortNo, "onu-id": onuID})
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500390 rpc := "child_device_lost"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700391 toTopic, err := ap.getAdapterTopic(deviceID, deviceType)
392 if err != nil {
393 return nil, err
394 }
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500395 args := []*kafka.KVArg{
Matteo Scandolod525ae32020-04-02 17:27:29 -0700396 {Key: "pDeviceId", Value: &ic.StrType{Val: deviceID}},
khenaidoo442e7c72020-03-10 16:13:48 -0400397 {Key: "pPortNo", Value: &ic.IntType{Val: int64(pPortNo)}},
398 {Key: "onuID", Value: &ic.IntType{Val: int64(onuID)}},
399 }
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500400 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700401 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, deviceID, args...)
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500402}
onkarkundargi87285252020-01-27 11:34:52 +0530403
Kent Hagerman2b216042020-04-03 18:28:56 -0400404func (ap *AdapterProxy) StartOmciTest(ctx context.Context, device *voltha.Device, omcitestrequest *voltha.OmciTestRequest) (chan *kafka.RpcResponse, error) {
Girish Kumarf56a4682020-03-20 20:07:46 +0000405 logger.Debugw("Omci_test_Request_adapter_proxy", log.Fields{"device": device, "omciTestRequest": omcitestrequest})
onkarkundargi87285252020-01-27 11:34:52 +0530406 rpc := "start_omci_test"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700407 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
408 if err != nil {
409 return nil, err
410 }
onkarkundargi87285252020-01-27 11:34:52 +0530411 // Use a device specific topic as we are the only core handling requests for this device
412 replyToTopic := ap.getCoreTopic()
Scott Baker432f9be2020-03-26 11:56:30 -0700413 // TODO: Perhaps this should have used omcitestrequest.uuid as the second argument rather
414 // than including the whole request, which is (deviceid, uuid)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700415 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id,
onkarkundargi87285252020-01-27 11:34:52 +0530416 &kafka.KVArg{Key: "device", Value: device},
417 &kafka.KVArg{Key: "omcitestrequest", Value: omcitestrequest})
418}
Dinesh Belwalkarc1129f12020-02-27 10:41:33 -0800419
420func (ap *AdapterProxy) GetExtValue(ctx context.Context, pdevice *voltha.Device, cdevice *voltha.Device, id string, valuetype voltha.ValueType_Type) (chan *kafka.RpcResponse, error) {
421 log.Debugw("GetExtValue", log.Fields{"device-id": pdevice.Id, "onuid": id})
422 rpc := "get_ext_value"
423 toTopic, err := ap.getAdapterTopic(pdevice.Id, pdevice.Adapter)
424 if err != nil {
425 return nil, err
426 }
427 // Use a device specific topic to send the request. The adapter handling the device creates a device
428 // specific topic
429 args := []*kafka.KVArg{
430 {
431 Key: "pDeviceId",
432 Value: &ic.StrType{Val: pdevice.Id},
433 },
434 {
435 Key: "device",
436 Value: cdevice,
437 },
438 {
439 Key: "valuetype",
440 Value: &ic.IntType{Val: int64(valuetype)},
441 }}
442
443 replyToTopic := ap.getCoreTopic()
444 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, pdevice.Id, args...)
445}