blob: ce4cf49378f9181f548bcbbe0445ea0ea4538756 [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
npujar1d86a522019-11-14 17:11:16 +053016
Kent Hagerman2b216042020-04-03 18:28:56 -040017package remote
khenaidoob9203542018-09-17 22:56:37 -040018
19import (
20 "context"
serkant.uluderya8ff291d2020-05-20 00:58:00 -070021
Maninderdfadc982020-10-28 14:04:33 +053022 "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
23 "github.com/opencord/voltha-lib-go/v4/pkg/log"
Salman Siddiqui1cf95042020-11-19 00:42:56 +053024 "github.com/opencord/voltha-protos/v4/go/extension"
Maninderdfadc982020-10-28 14:04:33 +053025 ic "github.com/opencord/voltha-protos/v4/go/inter_container"
26 ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
27 "github.com/opencord/voltha-protos/v4/go/voltha"
khenaidoob9203542018-09-17 22:56:37 -040028)
29
npujar1d86a522019-11-14 17:11:16 +053030// AdapterProxy represents adapter proxy attributes
khenaidoob9203542018-09-17 22:56:37 -040031type AdapterProxy struct {
Kent Hagerman2b216042020-04-03 18:28:56 -040032 kafka.EndpointManager
khenaidoo54e0ddf2019-02-27 16:21:33 -050033 deviceTopicRegistered bool
serkant.uluderya8ff291d2020-05-20 00:58:00 -070034 coreTopic string
npujar467fe752020-01-16 20:17:45 +053035 kafkaICProxy kafka.InterContainerProxy
khenaidoob9203542018-09-17 22:56:37 -040036}
37
npujar1d86a522019-11-14 17:11:16 +053038// NewAdapterProxy will return adapter proxy instance
serkant.uluderya8ff291d2020-05-20 00:58:00 -070039func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, coreTopic string, endpointManager kafka.EndpointManager) *AdapterProxy {
Kent Hagermana6d0c362019-07-30 12:50:21 -040040 return &AdapterProxy{
Kent Hagerman2b216042020-04-03 18:28:56 -040041 EndpointManager: endpointManager,
Kent Hagermana6d0c362019-07-30 12:50:21 -040042 kafkaICProxy: kafkaProxy,
serkant.uluderya8ff291d2020-05-20 00:58:00 -070043 coreTopic: coreTopic,
Kent Hagermana6d0c362019-07-30 12:50:21 -040044 deviceTopicRegistered: false,
45 }
khenaidoob9203542018-09-17 22:56:37 -040046}
47
serkant.uluderya334479d2019-04-10 08:26:15 -070048func (ap *AdapterProxy) getCoreTopic() kafka.Topic {
serkant.uluderya8ff291d2020-05-20 00:58:00 -070049 return kafka.Topic{Name: ap.coreTopic}
khenaidoo54e0ddf2019-02-27 16:21:33 -050050}
51
Rohan Agrawal31f21802020-06-12 05:38:46 +000052func (ap *AdapterProxy) getAdapterTopic(ctx context.Context, deviceID string, adapterType string) (*kafka.Topic, error) {
Matteo Scandolod525ae32020-04-02 17:27:29 -070053
Rohan Agrawal31f21802020-06-12 05:38:46 +000054 endpoint, err := ap.GetEndpoint(ctx, deviceID, adapterType)
Matteo Scandolod525ae32020-04-02 17:27:29 -070055 if err != nil {
56 return nil, err
57 }
58
59 return &kafka.Topic{Name: string(endpoint)}, nil
khenaidoo54e0ddf2019-02-27 16:21:33 -050060}
61
khenaidoo442e7c72020-03-10 16:13:48 -040062func (ap *AdapterProxy) sendRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic,
63 waitForResponse bool, deviceID string, kvArgs ...*kafka.KVArg) (chan *kafka.RpcResponse, error) {
64
65 // Sent the request to kafka
66 respChnl := ap.kafkaICProxy.InvokeAsyncRPC(ctx, rpc, toTopic, replyToTopic, waitForResponse, deviceID, kvArgs...)
67
68 // Wait for first response which would indicate whether the request was successfully sent to kafka.
69 firstResponse, ok := <-respChnl
70 if !ok || firstResponse.MType != kafka.RpcSent {
Rohan Agrawal31f21802020-06-12 05:38:46 +000071 logger.Errorw(ctx, "failure to request to kafka", log.Fields{"rpc": rpc, "device-id": deviceID, "error": firstResponse.Err})
khenaidoo442e7c72020-03-10 16:13:48 -040072 return nil, firstResponse.Err
73 }
74 // return the kafka channel for the caller to wait for the response of the RPC call
75 return respChnl, nil
76}
77
Kent Hagerman2b216042020-04-03 18:28:56 -040078// AdoptDevice invokes adopt device rpc
79func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +000080 logger.Debugw(ctx, "AdoptDevice", log.Fields{"device-id": device.Id})
khenaidoo92e62c52018-10-03 14:02:54 -040081 rpc := "adopt_device"
Rohan Agrawal31f21802020-06-12 05:38:46 +000082 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -070083 if err != nil {
84 return nil, err
85 }
khenaidoo442e7c72020-03-10 16:13:48 -040086 args := []*kafka.KVArg{
87 {Key: "device", Value: device},
khenaidoob9203542018-09-17 22:56:37 -040088 }
khenaidoo54e0ddf2019-02-27 16:21:33 -050089 replyToTopic := ap.getCoreTopic()
khenaidoo54e0ddf2019-02-27 16:21:33 -050090 ap.deviceTopicRegistered = true
Rohan Agrawal31f21802020-06-12 05:38:46 +000091 logger.Debugw(ctx, "adoptDevice-send-request", log.Fields{"device-id": device.Id, "deviceType": device.Type, "serialNumber": device.SerialNumber})
Matteo Scandolod525ae32020-04-02 17:27:29 -070092 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo92e62c52018-10-03 14:02:54 -040093}
94
Kent Hagerman2b216042020-04-03 18:28:56 -040095// DisableDevice invokes disable device rpc
96func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +000097 logger.Debugw(ctx, "DisableDevice", log.Fields{"device-id": device.Id})
khenaidoo92e62c52018-10-03 14:02:54 -040098 rpc := "disable_device"
Rohan Agrawal31f21802020-06-12 05:38:46 +000099 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700100 if err != nil {
101 return nil, err
102 }
khenaidoo442e7c72020-03-10 16:13:48 -0400103 args := []*kafka.KVArg{
104 {Key: "device", Value: device},
khenaidoob9203542018-09-17 22:56:37 -0400105 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500106 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700107 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400108}
109
Kent Hagerman2b216042020-04-03 18:28:56 -0400110// ReEnableDevice invokes reenable device rpc
111func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000112 logger.Debugw(ctx, "ReEnableDevice", log.Fields{"device-id": device.Id})
khenaidoo4d4802d2018-10-04 21:59:49 -0400113 rpc := "reenable_device"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000114 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700115 if err != nil {
116 return nil, err
117 }
khenaidoo442e7c72020-03-10 16:13:48 -0400118 args := []*kafka.KVArg{
119 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400120 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500121 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700122 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400123}
124
Kent Hagerman2b216042020-04-03 18:28:56 -0400125// RebootDevice invokes reboot device rpc
126func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000127 logger.Debugw(ctx, "RebootDevice", log.Fields{"device-id": device.Id})
khenaidoo4d4802d2018-10-04 21:59:49 -0400128 rpc := "reboot_device"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000129 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700130 if err != nil {
131 return nil, err
132 }
khenaidoo442e7c72020-03-10 16:13:48 -0400133 args := []*kafka.KVArg{
134 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400135 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500136 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700137 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400138}
139
Kent Hagerman2b216042020-04-03 18:28:56 -0400140// DeleteDevice invokes delete device rpc
141func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000142 logger.Debugw(ctx, "DeleteDevice", log.Fields{"device-id": device.Id})
khenaidoo4d4802d2018-10-04 21:59:49 -0400143 rpc := "delete_device"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000144 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700145 if err != nil {
146 return nil, err
147 }
khenaidoo442e7c72020-03-10 16:13:48 -0400148 args := []*kafka.KVArg{
149 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400150 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500151 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700152 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400153}
154
Kent Hagerman2b216042020-04-03 18:28:56 -0400155// GetOfpDeviceInfo invokes get ofp device info rpc
156func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000157 logger.Debugw(ctx, "GetOfpDeviceInfo", log.Fields{"device-id": device.Id})
khenaidoo442e7c72020-03-10 16:13:48 -0400158 rpc := "get_ofp_device_info"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000159 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700160 if err != nil {
161 return nil, err
162 }
khenaidoo442e7c72020-03-10 16:13:48 -0400163 args := []*kafka.KVArg{
164 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400165 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500166 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700167 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400168}
169
Kent Hagerman2b216042020-04-03 18:28:56 -0400170// ReconcileDevice invokes reconcile device rpc
171func (ap *AdapterProxy) ReconcileDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000172 logger.Debugw(ctx, "ReconcileDevice", log.Fields{"device-id": device.Id})
Matt Jeanneret7cf8e0b2020-01-09 11:57:51 -0500173 rpc := "reconcile_device"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000174 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700175 if err != nil {
176 return nil, err
177 }
khenaidooba6b6c42019-08-02 09:11:56 -0400178 args := []*kafka.KVArg{
179 {Key: "device", Value: device},
180 }
khenaidooba6b6c42019-08-02 09:11:56 -0400181 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700182 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400183}
184
Kent Hagerman2b216042020-04-03 18:28:56 -0400185// DownloadImage invokes download image rpc
186func (ap *AdapterProxy) DownloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000187 logger.Debugw(ctx, "DownloadImage", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500188 rpc := "download_image"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000189 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700190 if err != nil {
191 return nil, err
192 }
khenaidoo442e7c72020-03-10 16:13:48 -0400193 args := []*kafka.KVArg{
194 {Key: "device", Value: device},
195 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500196 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500197 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700198 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400199}
200
Kent Hagerman2b216042020-04-03 18:28:56 -0400201// GetImageDownloadStatus invokes get image download status rpc
202func (ap *AdapterProxy) GetImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000203 logger.Debugw(ctx, "GetImageDownloadStatus", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500204 rpc := "get_image_download_status"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000205 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700206 if err != nil {
207 return nil, err
208 }
khenaidoo442e7c72020-03-10 16:13:48 -0400209 args := []*kafka.KVArg{
210 {Key: "device", Value: device},
211 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500212 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500213 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700214 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400215}
216
Kent Hagerman2b216042020-04-03 18:28:56 -0400217// CancelImageDownload invokes cancel image download rpc
218func (ap *AdapterProxy) CancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000219 logger.Debugw(ctx, "CancelImageDownload", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500220 rpc := "cancel_image_download"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000221 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700222 if err != nil {
223 return nil, err
224 }
khenaidoo442e7c72020-03-10 16:13:48 -0400225 args := []*kafka.KVArg{
226 {Key: "device", Value: device},
227 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500228 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500229 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700230 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400231}
232
Kent Hagerman2b216042020-04-03 18:28:56 -0400233// ActivateImageUpdate invokes activate image update rpc
234func (ap *AdapterProxy) ActivateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000235 logger.Debugw(ctx, "ActivateImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500236 rpc := "activate_image_update"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000237 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700238 if err != nil {
239 return nil, err
240 }
khenaidoo442e7c72020-03-10 16:13:48 -0400241 args := []*kafka.KVArg{
242 {Key: "device", Value: device},
243 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500244 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500245 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700246 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400247}
248
Kent Hagerman2b216042020-04-03 18:28:56 -0400249// RevertImageUpdate invokes revert image update rpc
250func (ap *AdapterProxy) RevertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000251 logger.Debugw(ctx, "RevertImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500252 rpc := "revert_image_update"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000253 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700254 if err != nil {
255 return nil, err
256 }
khenaidoo442e7c72020-03-10 16:13:48 -0400257 args := []*kafka.KVArg{
258 {Key: "device", Value: device},
259 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500260 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500261 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700262 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400263}
264
Kent Hagermana7c9d792020-07-16 17:39:01 -0400265func (ap *AdapterProxy) PacketOut(ctx context.Context, deviceType string, deviceID string, outPort uint32, packet *ofp.OfpPacketOut) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000266 logger.Debugw(ctx, "PacketOut", log.Fields{"device-id": deviceID, "device-type": deviceType, "out-port": outPort})
267 toTopic, err := ap.getAdapterTopic(ctx, deviceID, deviceType)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700268 if err != nil {
269 return nil, err
270 }
khenaidoofdbad6e2018-11-06 22:26:38 -0500271 rpc := "receive_packet_out"
khenaidoo442e7c72020-03-10 16:13:48 -0400272 args := []*kafka.KVArg{
273 {Key: "deviceId", Value: &ic.StrType{Val: deviceID}},
274 {Key: "outPort", Value: &ic.IntType{Val: int64(outPort)}},
275 {Key: "packet", Value: packet},
khenaidoofdbad6e2018-11-06 22:26:38 -0500276 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500277 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700278 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, deviceID, args...)
khenaidoofdbad6e2018-11-06 22:26:38 -0500279}
280
Kent Hagerman2b216042020-04-03 18:28:56 -0400281// UpdateFlowsBulk invokes update flows bulk rpc
Kent Hagermana7c9d792020-07-16 17:39:01 -0400282func (ap *AdapterProxy) UpdateFlowsBulk(ctx context.Context, device *voltha.Device, flows map[uint64]*ofp.OfpFlowStats, groups map[uint32]*voltha.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
283 logger.Debugw(ctx, "UpdateFlowsBulk", log.Fields{"device-id": device.Id, "flow-count": len(flows), "group-count": len(groups), "flow-metadata": flowMetadata})
Rohan Agrawal31f21802020-06-12 05:38:46 +0000284 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700285 if err != nil {
286 return nil, err
287 }
khenaidoo19d7b632018-10-30 10:49:50 -0400288 rpc := "update_flows_bulk"
Kent Hagermana7c9d792020-07-16 17:39:01 -0400289
290 ctr, flowSlice := 0, make([]*ofp.OfpFlowStats, len(flows))
291 for _, flow := range flows {
292 flowSlice[ctr] = flow
293 ctr++
294 }
295 ctr, groupSlice := 0, make([]*ofp.OfpGroupEntry, len(groups))
296 for _, group := range groups {
297 groupSlice[ctr] = group
298 ctr++
299 }
khenaidoo442e7c72020-03-10 16:13:48 -0400300 args := []*kafka.KVArg{
301 {Key: "device", Value: device},
Kent Hagermana7c9d792020-07-16 17:39:01 -0400302 {Key: "flows", Value: &voltha.Flows{Items: flowSlice}},
303 {Key: "groups", Value: &voltha.FlowGroups{Items: groupSlice}},
khenaidoo442e7c72020-03-10 16:13:48 -0400304 {Key: "flow_metadata", Value: flowMetadata},
khenaidoo19d7b632018-10-30 10:49:50 -0400305 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500306 replyToTopic := ap.getCoreTopic()
Rohan Agrawalcf12f202020-08-03 04:42:01 +0000307 return ap.sendRPC(log.WithSpanFromContext(context.TODO(), ctx), rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400308}
309
Kent Hagerman2b216042020-04-03 18:28:56 -0400310// UpdateFlowsIncremental invokes update flows incremental rpc
Kent Hagermana7c9d792020-07-16 17:39:01 -0400311func (ap *AdapterProxy) UpdateFlowsIncremental(ctx context.Context, device *voltha.Device, flowChanges *ofp.FlowChanges, groupChanges *ofp.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000312 logger.Debugw(ctx, "UpdateFlowsIncremental",
khenaidoo0458db62019-06-20 08:50:36 -0400313 log.Fields{
khenaidoo442e7c72020-03-10 16:13:48 -0400314 "device-id": device.Id,
315 "flow-to-add-count": len(flowChanges.ToAdd.Items),
316 "flow-to-delete-count": len(flowChanges.ToRemove.Items),
317 "group-to-add-count": len(groupChanges.ToAdd.Items),
318 "group-to-delete-count": len(groupChanges.ToRemove.Items),
319 "group-to-update-count": len(groupChanges.ToUpdate.Items),
khenaidoo0458db62019-06-20 08:50:36 -0400320 })
Rohan Agrawal31f21802020-06-12 05:38:46 +0000321 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700322 if err != nil {
323 return nil, err
324 }
Matt Jeanneretb0037422019-03-23 14:36:51 -0400325 rpc := "update_flows_incrementally"
khenaidoo442e7c72020-03-10 16:13:48 -0400326 args := []*kafka.KVArg{
327 {Key: "device", Value: device},
328 {Key: "flow_changes", Value: flowChanges},
329 {Key: "group_changes", Value: groupChanges},
330 {Key: "flow_metadata", Value: flowMetadata},
khenaidoo19d7b632018-10-30 10:49:50 -0400331 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500332 replyToTopic := ap.getCoreTopic()
Rohan Agrawalcf12f202020-08-03 04:42:01 +0000333 return ap.sendRPC(log.WithSpanFromContext(context.TODO(), ctx), rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400334}
335
Kent Hagerman2b216042020-04-03 18:28:56 -0400336// UpdatePmConfigs invokes update pm configs rpc
337func (ap *AdapterProxy) UpdatePmConfigs(ctx context.Context, device *voltha.Device, pmConfigs *voltha.PmConfigs) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000338 logger.Debugw(ctx, "UpdatePmConfigs", log.Fields{"device-id": device.Id, "pm-configs-id": pmConfigs.Id})
339 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700340 if err != nil {
341 return nil, err
342 }
Rohan Agrawal2a0c4492020-06-29 11:55:06 +0000343 rpc := "update_pm_config"
khenaidoo442e7c72020-03-10 16:13:48 -0400344 args := []*kafka.KVArg{
345 {Key: "device", Value: device},
346 {Key: "pm_configs", Value: pmConfigs},
khenaidoob3127472019-07-24 21:04:55 -0400347 }
khenaidoob3127472019-07-24 21:04:55 -0400348 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700349 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400350}
351
Kent Hagerman2b216042020-04-03 18:28:56 -0400352// SimulateAlarm invokes simulate alarm rpc
353func (ap *AdapterProxy) SimulateAlarm(ctx context.Context, device *voltha.Device, simulateReq *voltha.SimulateAlarmRequest) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000354 logger.Debugw(ctx, "SimulateAlarm", log.Fields{"device-id": device.Id, "simulate-req-id": simulateReq.Id})
serkant.uluderya334479d2019-04-10 08:26:15 -0700355 rpc := "simulate_alarm"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000356 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700357 if err != nil {
358 return nil, err
359 }
khenaidoo442e7c72020-03-10 16:13:48 -0400360 args := []*kafka.KVArg{
361 {Key: "device", Value: device},
362 {Key: "request", Value: simulateReq},
serkant.uluderya334479d2019-04-10 08:26:15 -0700363 }
serkant.uluderya334479d2019-04-10 08:26:15 -0700364 replyToTopic := ap.getCoreTopic()
365 ap.deviceTopicRegistered = true
Matteo Scandolod525ae32020-04-02 17:27:29 -0700366 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
serkant.uluderya334479d2019-04-10 08:26:15 -0700367}
kesavandbc2d1622020-01-21 00:42:01 -0500368
Kent Hagerman2b216042020-04-03 18:28:56 -0400369func (ap *AdapterProxy) DisablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000370 logger.Debugw(ctx, "DisablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
kesavandbc2d1622020-01-21 00:42:01 -0500371 rpc := "disable_port"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000372 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700373 if err != nil {
374 return nil, err
375 }
khenaidoo442e7c72020-03-10 16:13:48 -0400376 args := []*kafka.KVArg{
377 {Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
378 {Key: "port", Value: port},
kesavandbc2d1622020-01-21 00:42:01 -0500379 }
kesavandbc2d1622020-01-21 00:42:01 -0500380 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700381 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
kesavandbc2d1622020-01-21 00:42:01 -0500382}
383
Kent Hagerman2b216042020-04-03 18:28:56 -0400384func (ap *AdapterProxy) EnablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000385 logger.Debugw(ctx, "EnablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
kesavandbc2d1622020-01-21 00:42:01 -0500386 rpc := "enable_port"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000387 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700388 if err != nil {
389 return nil, err
390 }
khenaidoo442e7c72020-03-10 16:13:48 -0400391 args := []*kafka.KVArg{
392 {Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
393 {Key: "port", Value: port},
kesavandbc2d1622020-01-21 00:42:01 -0500394 }
kesavandbc2d1622020-01-21 00:42:01 -0500395 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700396 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
kesavandbc2d1622020-01-21 00:42:01 -0500397}
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500398
Kent Hagerman2b216042020-04-03 18:28:56 -0400399// ChildDeviceLost invokes child device_lost rpc
Girish Gowdra6f9b10e2021-03-11 14:36:39 -0800400func (ap *AdapterProxy) ChildDeviceLost(ctx context.Context, deviceType string, childDevice *voltha.Device) (chan *kafka.RpcResponse, error) {
401 logger.Debugw(ctx, "ChildDeviceLost",
402 log.Fields{"device-id": childDevice.ParentId, "parent-port-no": childDevice.ParentPortNo,
403 "onu-id": childDevice.ProxyAddress.OnuId, "serial-number": childDevice.SerialNumber})
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500404 rpc := "child_device_lost"
Girish Gowdra6f9b10e2021-03-11 14:36:39 -0800405 toTopic, err := ap.getAdapterTopic(ctx, childDevice.ParentId, deviceType)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700406 if err != nil {
407 return nil, err
408 }
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500409 args := []*kafka.KVArg{
Girish Gowdra6f9b10e2021-03-11 14:36:39 -0800410 {Key: "childDevice", Value: childDevice},
khenaidoo442e7c72020-03-10 16:13:48 -0400411 }
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500412 replyToTopic := ap.getCoreTopic()
Girish Gowdra6f9b10e2021-03-11 14:36:39 -0800413 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, childDevice.ParentId, args...)
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500414}
onkarkundargi87285252020-01-27 11:34:52 +0530415
Kent Hagerman2b216042020-04-03 18:28:56 -0400416func (ap *AdapterProxy) StartOmciTest(ctx context.Context, device *voltha.Device, omcitestrequest *voltha.OmciTestRequest) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000417 logger.Debugw(ctx, "Omci_test_Request_adapter_proxy", log.Fields{"device": device, "omciTestRequest": omcitestrequest})
onkarkundargi87285252020-01-27 11:34:52 +0530418 rpc := "start_omci_test"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000419 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700420 if err != nil {
421 return nil, err
422 }
onkarkundargi87285252020-01-27 11:34:52 +0530423 // Use a device specific topic as we are the only core handling requests for this device
424 replyToTopic := ap.getCoreTopic()
Scott Baker432f9be2020-03-26 11:56:30 -0700425 // TODO: Perhaps this should have used omcitestrequest.uuid as the second argument rather
426 // than including the whole request, which is (deviceid, uuid)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700427 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id,
onkarkundargi87285252020-01-27 11:34:52 +0530428 &kafka.KVArg{Key: "device", Value: device},
429 &kafka.KVArg{Key: "omcitestrequest", Value: omcitestrequest})
430}
Dinesh Belwalkarc1129f12020-02-27 10:41:33 -0800431
432func (ap *AdapterProxy) GetExtValue(ctx context.Context, pdevice *voltha.Device, cdevice *voltha.Device, id string, valuetype voltha.ValueType_Type) (chan *kafka.RpcResponse, error) {
Girish Kumar3e8ee212020-08-19 17:50:11 +0000433 logger.Debugw(ctx, "GetExtValue", log.Fields{"device-id": pdevice.Id, "onuid": id})
Dinesh Belwalkarc1129f12020-02-27 10:41:33 -0800434 rpc := "get_ext_value"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000435 toTopic, err := ap.getAdapterTopic(ctx, pdevice.Id, pdevice.Adapter)
Dinesh Belwalkarc1129f12020-02-27 10:41:33 -0800436 if err != nil {
437 return nil, err
438 }
439 // Use a device specific topic to send the request. The adapter handling the device creates a device
440 // specific topic
441 args := []*kafka.KVArg{
442 {
443 Key: "pDeviceId",
444 Value: &ic.StrType{Val: pdevice.Id},
445 },
446 {
447 Key: "device",
448 Value: cdevice,
449 },
450 {
451 Key: "valuetype",
452 Value: &ic.IntType{Val: int64(valuetype)},
453 }}
454
455 replyToTopic := ap.getCoreTopic()
456 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, pdevice.Id, args...)
457}
dpaul62686312020-06-23 14:17:36 +0530458
459// SetExtValue set some given configs or value
460func (ap *AdapterProxy) SetExtValue(ctx context.Context, device *voltha.Device, value *voltha.ValueSet) (chan *kafka.RpcResponse, error) {
461 logger.Debugw(ctx, "SetExtValue", log.Fields{"device-id": value.Id})
462 rpc := "set_ext_value"
463 toTopic, err := ap.getAdapterTopic(ctx, value.Id, device.Adapter)
464 if err != nil {
465 return nil, err
466 }
467 // Use a device specific topic to send the request. The adapter handling the device creates a device
468 // specific topic
469 args := []*kafka.KVArg{
470 {
471 Key: "value",
472 Value: value,
473 },
474 }
475 replyToTopic := ap.getCoreTopic()
476 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, value.Id, args...)
477}
Salman Siddiqui1cf95042020-11-19 00:42:56 +0530478
479// GetSingleValue get a value from the adapter, based on the request type
480func (ap *AdapterProxy) GetSingleValue(ctx context.Context, adapterType string, request *extension.SingleGetValueRequest) (chan *kafka.RpcResponse, error) {
481 logger.Debugw(ctx, "GetSingleValue", log.Fields{"device-id": request.TargetId})
482 rpc := "single_get_value_request"
483 toTopic, err := ap.getAdapterTopic(ctx, request.TargetId, adapterType)
484 if err != nil {
485 return nil, err
486 }
487
488 // Use a device specific topic to send the request. The adapter handling the device creates a device
489 // specific topic
490 args := []*kafka.KVArg{
491 {
492 Key: "request",
493 Value: request,
494 },
495 }
496
497 replyToTopic := ap.getCoreTopic()
498 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, request.TargetId, args...)
499}
500
501// SetSingleValue set a single value on the adapter, based on the request type
502func (ap *AdapterProxy) SetSingleValue(ctx context.Context, adapterType string, request *extension.SingleSetValueRequest) (chan *kafka.RpcResponse, error) {
503 logger.Debugw(ctx, "SetSingleValue", log.Fields{"device-id": request.TargetId})
504 rpc := "single_set_value_request"
505 toTopic, err := ap.getAdapterTopic(ctx, request.TargetId, adapterType)
506 if err != nil {
507 return nil, err
508 }
509
510 // Use a device specific topic to send the request. The adapter handling the device creates a device
511 // specific topic
512 args := []*kafka.KVArg{
513 {
514 Key: "request",
515 Value: request,
516 },
517 }
518
519 replyToTopic := ap.getCoreTopic()
520 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, request.TargetId, args...)
521}