blob: 71dedb30dc99c8e8e4203c63edc22b9436037319 [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
npujar1d86a522019-11-14 17:11:16 +053016
Kent Hagerman2b216042020-04-03 18:28:56 -040017package remote
khenaidoob9203542018-09-17 22:56:37 -040018
19import (
20 "context"
serkant.uluderya8ff291d2020-05-20 00:58:00 -070021
ssiddiquif076cb82021-04-23 10:47:04 +053022 "github.com/opencord/voltha-protos/v4/go/common"
23
Maninderdfadc982020-10-28 14:04:33 +053024 "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
25 "github.com/opencord/voltha-lib-go/v4/pkg/log"
Salman Siddiqui1cf95042020-11-19 00:42:56 +053026 "github.com/opencord/voltha-protos/v4/go/extension"
Maninderdfadc982020-10-28 14:04:33 +053027 ic "github.com/opencord/voltha-protos/v4/go/inter_container"
28 ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
29 "github.com/opencord/voltha-protos/v4/go/voltha"
khenaidoob9203542018-09-17 22:56:37 -040030)
31
npujar1d86a522019-11-14 17:11:16 +053032// AdapterProxy represents adapter proxy attributes
khenaidoob9203542018-09-17 22:56:37 -040033type AdapterProxy struct {
Kent Hagerman2b216042020-04-03 18:28:56 -040034 kafka.EndpointManager
khenaidoodd3324d2021-04-27 16:22:55 -040035 coreTopic string
36 kafkaICProxy kafka.InterContainerProxy
khenaidoob9203542018-09-17 22:56:37 -040037}
38
npujar1d86a522019-11-14 17:11:16 +053039// NewAdapterProxy will return adapter proxy instance
serkant.uluderya8ff291d2020-05-20 00:58:00 -070040func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, coreTopic string, endpointManager kafka.EndpointManager) *AdapterProxy {
Kent Hagermana6d0c362019-07-30 12:50:21 -040041 return &AdapterProxy{
khenaidoodd3324d2021-04-27 16:22:55 -040042 EndpointManager: endpointManager,
43 kafkaICProxy: kafkaProxy,
44 coreTopic: coreTopic,
Kent Hagermana6d0c362019-07-30 12:50:21 -040045 }
khenaidoob9203542018-09-17 22:56:37 -040046}
47
serkant.uluderya334479d2019-04-10 08:26:15 -070048func (ap *AdapterProxy) getCoreTopic() kafka.Topic {
serkant.uluderya8ff291d2020-05-20 00:58:00 -070049 return kafka.Topic{Name: ap.coreTopic}
khenaidoo54e0ddf2019-02-27 16:21:33 -050050}
51
Rohan Agrawal31f21802020-06-12 05:38:46 +000052func (ap *AdapterProxy) getAdapterTopic(ctx context.Context, deviceID string, adapterType string) (*kafka.Topic, error) {
Matteo Scandolod525ae32020-04-02 17:27:29 -070053
Rohan Agrawal31f21802020-06-12 05:38:46 +000054 endpoint, err := ap.GetEndpoint(ctx, deviceID, adapterType)
Matteo Scandolod525ae32020-04-02 17:27:29 -070055 if err != nil {
56 return nil, err
57 }
58
59 return &kafka.Topic{Name: string(endpoint)}, nil
khenaidoo54e0ddf2019-02-27 16:21:33 -050060}
61
khenaidoo442e7c72020-03-10 16:13:48 -040062func (ap *AdapterProxy) sendRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic,
63 waitForResponse bool, deviceID string, kvArgs ...*kafka.KVArg) (chan *kafka.RpcResponse, error) {
64
65 // Sent the request to kafka
66 respChnl := ap.kafkaICProxy.InvokeAsyncRPC(ctx, rpc, toTopic, replyToTopic, waitForResponse, deviceID, kvArgs...)
67
68 // Wait for first response which would indicate whether the request was successfully sent to kafka.
69 firstResponse, ok := <-respChnl
70 if !ok || firstResponse.MType != kafka.RpcSent {
Rohan Agrawal31f21802020-06-12 05:38:46 +000071 logger.Errorw(ctx, "failure to request to kafka", log.Fields{"rpc": rpc, "device-id": deviceID, "error": firstResponse.Err})
khenaidoo442e7c72020-03-10 16:13:48 -040072 return nil, firstResponse.Err
73 }
74 // return the kafka channel for the caller to wait for the response of the RPC call
75 return respChnl, nil
76}
77
Kent Hagerman2b216042020-04-03 18:28:56 -040078// AdoptDevice invokes adopt device rpc
79func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +000080 logger.Debugw(ctx, "AdoptDevice", log.Fields{"device-id": device.Id})
khenaidoo92e62c52018-10-03 14:02:54 -040081 rpc := "adopt_device"
Rohan Agrawal31f21802020-06-12 05:38:46 +000082 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -070083 if err != nil {
84 return nil, err
85 }
khenaidoo442e7c72020-03-10 16:13:48 -040086 args := []*kafka.KVArg{
87 {Key: "device", Value: device},
khenaidoob9203542018-09-17 22:56:37 -040088 }
khenaidoo54e0ddf2019-02-27 16:21:33 -050089 replyToTopic := ap.getCoreTopic()
Rohan Agrawal31f21802020-06-12 05:38:46 +000090 logger.Debugw(ctx, "adoptDevice-send-request", log.Fields{"device-id": device.Id, "deviceType": device.Type, "serialNumber": device.SerialNumber})
Matteo Scandolod525ae32020-04-02 17:27:29 -070091 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo92e62c52018-10-03 14:02:54 -040092}
93
Kent Hagerman2b216042020-04-03 18:28:56 -040094// DisableDevice invokes disable device rpc
95func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +000096 logger.Debugw(ctx, "DisableDevice", log.Fields{"device-id": device.Id})
khenaidoo92e62c52018-10-03 14:02:54 -040097 rpc := "disable_device"
Rohan Agrawal31f21802020-06-12 05:38:46 +000098 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -070099 if err != nil {
100 return nil, err
101 }
khenaidoo442e7c72020-03-10 16:13:48 -0400102 args := []*kafka.KVArg{
103 {Key: "device", Value: device},
khenaidoob9203542018-09-17 22:56:37 -0400104 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500105 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700106 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400107}
108
Kent Hagerman2b216042020-04-03 18:28:56 -0400109// ReEnableDevice invokes reenable device rpc
110func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000111 logger.Debugw(ctx, "ReEnableDevice", log.Fields{"device-id": device.Id})
khenaidoo4d4802d2018-10-04 21:59:49 -0400112 rpc := "reenable_device"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000113 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700114 if err != nil {
115 return nil, err
116 }
khenaidoo442e7c72020-03-10 16:13:48 -0400117 args := []*kafka.KVArg{
118 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400119 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500120 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700121 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400122}
123
Kent Hagerman2b216042020-04-03 18:28:56 -0400124// RebootDevice invokes reboot device rpc
125func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000126 logger.Debugw(ctx, "RebootDevice", log.Fields{"device-id": device.Id})
khenaidoo4d4802d2018-10-04 21:59:49 -0400127 rpc := "reboot_device"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000128 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700129 if err != nil {
130 return nil, err
131 }
khenaidoo442e7c72020-03-10 16:13:48 -0400132 args := []*kafka.KVArg{
133 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400134 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500135 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700136 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400137}
138
Kent Hagerman2b216042020-04-03 18:28:56 -0400139// DeleteDevice invokes delete device rpc
140func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000141 logger.Debugw(ctx, "DeleteDevice", log.Fields{"device-id": device.Id})
khenaidoo4d4802d2018-10-04 21:59:49 -0400142 rpc := "delete_device"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000143 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700144 if err != nil {
145 return nil, err
146 }
khenaidoo442e7c72020-03-10 16:13:48 -0400147 args := []*kafka.KVArg{
148 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400149 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500150 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700151 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400152}
153
Kent Hagerman2b216042020-04-03 18:28:56 -0400154// GetOfpDeviceInfo invokes get ofp device info rpc
155func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000156 logger.Debugw(ctx, "GetOfpDeviceInfo", log.Fields{"device-id": device.Id})
khenaidoo442e7c72020-03-10 16:13:48 -0400157 rpc := "get_ofp_device_info"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000158 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700159 if err != nil {
160 return nil, err
161 }
khenaidoo442e7c72020-03-10 16:13:48 -0400162 args := []*kafka.KVArg{
163 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400164 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500165 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700166 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400167}
168
Kent Hagerman2b216042020-04-03 18:28:56 -0400169// ReconcileDevice invokes reconcile device rpc
170func (ap *AdapterProxy) ReconcileDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000171 logger.Debugw(ctx, "ReconcileDevice", log.Fields{"device-id": device.Id})
Matt Jeanneret7cf8e0b2020-01-09 11:57:51 -0500172 rpc := "reconcile_device"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000173 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700174 if err != nil {
175 return nil, err
176 }
khenaidooba6b6c42019-08-02 09:11:56 -0400177 args := []*kafka.KVArg{
178 {Key: "device", Value: device},
179 }
khenaidooba6b6c42019-08-02 09:11:56 -0400180 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700181 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400182}
183
Kent Hagerman2b216042020-04-03 18:28:56 -0400184// DownloadImage invokes download image rpc
185func (ap *AdapterProxy) DownloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000186 logger.Debugw(ctx, "DownloadImage", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500187 rpc := "download_image"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000188 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700189 if err != nil {
190 return nil, err
191 }
khenaidoo442e7c72020-03-10 16:13:48 -0400192 args := []*kafka.KVArg{
193 {Key: "device", Value: device},
194 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500195 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500196 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700197 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400198}
199
Kent Hagerman2b216042020-04-03 18:28:56 -0400200// GetImageDownloadStatus invokes get image download status rpc
201func (ap *AdapterProxy) GetImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000202 logger.Debugw(ctx, "GetImageDownloadStatus", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500203 rpc := "get_image_download_status"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000204 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700205 if err != nil {
206 return nil, err
207 }
khenaidoo442e7c72020-03-10 16:13:48 -0400208 args := []*kafka.KVArg{
209 {Key: "device", Value: device},
210 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500211 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500212 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700213 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400214}
215
Kent Hagerman2b216042020-04-03 18:28:56 -0400216// CancelImageDownload invokes cancel image download rpc
217func (ap *AdapterProxy) CancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000218 logger.Debugw(ctx, "CancelImageDownload", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500219 rpc := "cancel_image_download"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000220 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700221 if err != nil {
222 return nil, err
223 }
khenaidoo442e7c72020-03-10 16:13:48 -0400224 args := []*kafka.KVArg{
225 {Key: "device", Value: device},
226 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500227 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500228 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700229 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400230}
231
Kent Hagerman2b216042020-04-03 18:28:56 -0400232// ActivateImageUpdate invokes activate image update rpc
233func (ap *AdapterProxy) ActivateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000234 logger.Debugw(ctx, "ActivateImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500235 rpc := "activate_image_update"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000236 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700237 if err != nil {
238 return nil, err
239 }
khenaidoo442e7c72020-03-10 16:13:48 -0400240 args := []*kafka.KVArg{
241 {Key: "device", Value: device},
242 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500243 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500244 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700245 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400246}
247
Kent Hagerman2b216042020-04-03 18:28:56 -0400248// RevertImageUpdate invokes revert image update rpc
249func (ap *AdapterProxy) RevertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000250 logger.Debugw(ctx, "RevertImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500251 rpc := "revert_image_update"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000252 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700253 if err != nil {
254 return nil, err
255 }
khenaidoo442e7c72020-03-10 16:13:48 -0400256 args := []*kafka.KVArg{
257 {Key: "device", Value: device},
258 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500259 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500260 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700261 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400262}
263
Kent Hagermana7c9d792020-07-16 17:39:01 -0400264func (ap *AdapterProxy) PacketOut(ctx context.Context, deviceType string, deviceID string, outPort uint32, packet *ofp.OfpPacketOut) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000265 logger.Debugw(ctx, "PacketOut", log.Fields{"device-id": deviceID, "device-type": deviceType, "out-port": outPort})
266 toTopic, err := ap.getAdapterTopic(ctx, deviceID, deviceType)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700267 if err != nil {
268 return nil, err
269 }
khenaidoofdbad6e2018-11-06 22:26:38 -0500270 rpc := "receive_packet_out"
khenaidoo442e7c72020-03-10 16:13:48 -0400271 args := []*kafka.KVArg{
272 {Key: "deviceId", Value: &ic.StrType{Val: deviceID}},
273 {Key: "outPort", Value: &ic.IntType{Val: int64(outPort)}},
274 {Key: "packet", Value: packet},
khenaidoofdbad6e2018-11-06 22:26:38 -0500275 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500276 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700277 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, deviceID, args...)
khenaidoofdbad6e2018-11-06 22:26:38 -0500278}
279
Kent Hagerman2b216042020-04-03 18:28:56 -0400280// UpdateFlowsBulk invokes update flows bulk rpc
Kent Hagermana7c9d792020-07-16 17:39:01 -0400281func (ap *AdapterProxy) UpdateFlowsBulk(ctx context.Context, device *voltha.Device, flows map[uint64]*ofp.OfpFlowStats, groups map[uint32]*voltha.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
282 logger.Debugw(ctx, "UpdateFlowsBulk", log.Fields{"device-id": device.Id, "flow-count": len(flows), "group-count": len(groups), "flow-metadata": flowMetadata})
Rohan Agrawal31f21802020-06-12 05:38:46 +0000283 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700284 if err != nil {
285 return nil, err
286 }
khenaidoo19d7b632018-10-30 10:49:50 -0400287 rpc := "update_flows_bulk"
Kent Hagermana7c9d792020-07-16 17:39:01 -0400288
289 ctr, flowSlice := 0, make([]*ofp.OfpFlowStats, len(flows))
290 for _, flow := range flows {
291 flowSlice[ctr] = flow
292 ctr++
293 }
294 ctr, groupSlice := 0, make([]*ofp.OfpGroupEntry, len(groups))
295 for _, group := range groups {
296 groupSlice[ctr] = group
297 ctr++
298 }
khenaidoo442e7c72020-03-10 16:13:48 -0400299 args := []*kafka.KVArg{
300 {Key: "device", Value: device},
Kent Hagermana7c9d792020-07-16 17:39:01 -0400301 {Key: "flows", Value: &voltha.Flows{Items: flowSlice}},
302 {Key: "groups", Value: &voltha.FlowGroups{Items: groupSlice}},
khenaidoo442e7c72020-03-10 16:13:48 -0400303 {Key: "flow_metadata", Value: flowMetadata},
khenaidoo19d7b632018-10-30 10:49:50 -0400304 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500305 replyToTopic := ap.getCoreTopic()
Rohan Agrawalcf12f202020-08-03 04:42:01 +0000306 return ap.sendRPC(log.WithSpanFromContext(context.TODO(), ctx), rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400307}
308
Kent Hagerman2b216042020-04-03 18:28:56 -0400309// UpdateFlowsIncremental invokes update flows incremental rpc
Kent Hagermana7c9d792020-07-16 17:39:01 -0400310func (ap *AdapterProxy) UpdateFlowsIncremental(ctx context.Context, device *voltha.Device, flowChanges *ofp.FlowChanges, groupChanges *ofp.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000311 logger.Debugw(ctx, "UpdateFlowsIncremental",
khenaidoo0458db62019-06-20 08:50:36 -0400312 log.Fields{
khenaidoo442e7c72020-03-10 16:13:48 -0400313 "device-id": device.Id,
314 "flow-to-add-count": len(flowChanges.ToAdd.Items),
315 "flow-to-delete-count": len(flowChanges.ToRemove.Items),
316 "group-to-add-count": len(groupChanges.ToAdd.Items),
317 "group-to-delete-count": len(groupChanges.ToRemove.Items),
318 "group-to-update-count": len(groupChanges.ToUpdate.Items),
khenaidoo0458db62019-06-20 08:50:36 -0400319 })
Rohan Agrawal31f21802020-06-12 05:38:46 +0000320 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700321 if err != nil {
322 return nil, err
323 }
Matt Jeanneretb0037422019-03-23 14:36:51 -0400324 rpc := "update_flows_incrementally"
khenaidoo442e7c72020-03-10 16:13:48 -0400325 args := []*kafka.KVArg{
326 {Key: "device", Value: device},
327 {Key: "flow_changes", Value: flowChanges},
328 {Key: "group_changes", Value: groupChanges},
329 {Key: "flow_metadata", Value: flowMetadata},
khenaidoo19d7b632018-10-30 10:49:50 -0400330 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500331 replyToTopic := ap.getCoreTopic()
Rohan Agrawalcf12f202020-08-03 04:42:01 +0000332 return ap.sendRPC(log.WithSpanFromContext(context.TODO(), ctx), rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400333}
334
Kent Hagerman2b216042020-04-03 18:28:56 -0400335// UpdatePmConfigs invokes update pm configs rpc
336func (ap *AdapterProxy) UpdatePmConfigs(ctx context.Context, device *voltha.Device, pmConfigs *voltha.PmConfigs) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000337 logger.Debugw(ctx, "UpdatePmConfigs", log.Fields{"device-id": device.Id, "pm-configs-id": pmConfigs.Id})
338 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700339 if err != nil {
340 return nil, err
341 }
Rohan Agrawal2a0c4492020-06-29 11:55:06 +0000342 rpc := "update_pm_config"
khenaidoo442e7c72020-03-10 16:13:48 -0400343 args := []*kafka.KVArg{
344 {Key: "device", Value: device},
345 {Key: "pm_configs", Value: pmConfigs},
khenaidoob3127472019-07-24 21:04:55 -0400346 }
khenaidoob3127472019-07-24 21:04:55 -0400347 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700348 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400349}
350
Kent Hagerman2b216042020-04-03 18:28:56 -0400351// SimulateAlarm invokes simulate alarm rpc
352func (ap *AdapterProxy) SimulateAlarm(ctx context.Context, device *voltha.Device, simulateReq *voltha.SimulateAlarmRequest) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000353 logger.Debugw(ctx, "SimulateAlarm", log.Fields{"device-id": device.Id, "simulate-req-id": simulateReq.Id})
serkant.uluderya334479d2019-04-10 08:26:15 -0700354 rpc := "simulate_alarm"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000355 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700356 if err != nil {
357 return nil, err
358 }
khenaidoo442e7c72020-03-10 16:13:48 -0400359 args := []*kafka.KVArg{
360 {Key: "device", Value: device},
361 {Key: "request", Value: simulateReq},
serkant.uluderya334479d2019-04-10 08:26:15 -0700362 }
serkant.uluderya334479d2019-04-10 08:26:15 -0700363 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700364 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
serkant.uluderya334479d2019-04-10 08:26:15 -0700365}
kesavandbc2d1622020-01-21 00:42:01 -0500366
Kent Hagerman2b216042020-04-03 18:28:56 -0400367func (ap *AdapterProxy) DisablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000368 logger.Debugw(ctx, "DisablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
kesavandbc2d1622020-01-21 00:42:01 -0500369 rpc := "disable_port"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000370 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700371 if err != nil {
372 return nil, err
373 }
khenaidoo442e7c72020-03-10 16:13:48 -0400374 args := []*kafka.KVArg{
375 {Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
376 {Key: "port", Value: port},
kesavandbc2d1622020-01-21 00:42:01 -0500377 }
kesavandbc2d1622020-01-21 00:42:01 -0500378 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700379 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
kesavandbc2d1622020-01-21 00:42:01 -0500380}
381
Kent Hagerman2b216042020-04-03 18:28:56 -0400382func (ap *AdapterProxy) EnablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000383 logger.Debugw(ctx, "EnablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
kesavandbc2d1622020-01-21 00:42:01 -0500384 rpc := "enable_port"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000385 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700386 if err != nil {
387 return nil, err
388 }
khenaidoo442e7c72020-03-10 16:13:48 -0400389 args := []*kafka.KVArg{
390 {Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
391 {Key: "port", Value: port},
kesavandbc2d1622020-01-21 00:42:01 -0500392 }
kesavandbc2d1622020-01-21 00:42:01 -0500393 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700394 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
kesavandbc2d1622020-01-21 00:42:01 -0500395}
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500396
Kent Hagerman2b216042020-04-03 18:28:56 -0400397// ChildDeviceLost invokes child device_lost rpc
Girish Gowdra6f9b10e2021-03-11 14:36:39 -0800398func (ap *AdapterProxy) ChildDeviceLost(ctx context.Context, deviceType string, childDevice *voltha.Device) (chan *kafka.RpcResponse, error) {
399 logger.Debugw(ctx, "ChildDeviceLost",
400 log.Fields{"device-id": childDevice.ParentId, "parent-port-no": childDevice.ParentPortNo,
401 "onu-id": childDevice.ProxyAddress.OnuId, "serial-number": childDevice.SerialNumber})
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500402 rpc := "child_device_lost"
Girish Gowdra6f9b10e2021-03-11 14:36:39 -0800403 toTopic, err := ap.getAdapterTopic(ctx, childDevice.ParentId, deviceType)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700404 if err != nil {
405 return nil, err
406 }
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500407 args := []*kafka.KVArg{
Girish Gowdra6f9b10e2021-03-11 14:36:39 -0800408 {Key: "childDevice", Value: childDevice},
khenaidoo442e7c72020-03-10 16:13:48 -0400409 }
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500410 replyToTopic := ap.getCoreTopic()
Girish Gowdra6f9b10e2021-03-11 14:36:39 -0800411 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, childDevice.ParentId, args...)
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500412}
onkarkundargi87285252020-01-27 11:34:52 +0530413
Kent Hagerman2b216042020-04-03 18:28:56 -0400414func (ap *AdapterProxy) StartOmciTest(ctx context.Context, device *voltha.Device, omcitestrequest *voltha.OmciTestRequest) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000415 logger.Debugw(ctx, "Omci_test_Request_adapter_proxy", log.Fields{"device": device, "omciTestRequest": omcitestrequest})
onkarkundargi87285252020-01-27 11:34:52 +0530416 rpc := "start_omci_test"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000417 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700418 if err != nil {
419 return nil, err
420 }
onkarkundargi87285252020-01-27 11:34:52 +0530421 // Use a device specific topic as we are the only core handling requests for this device
422 replyToTopic := ap.getCoreTopic()
Scott Baker432f9be2020-03-26 11:56:30 -0700423 // TODO: Perhaps this should have used omcitestrequest.uuid as the second argument rather
424 // than including the whole request, which is (deviceid, uuid)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700425 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id,
onkarkundargi87285252020-01-27 11:34:52 +0530426 &kafka.KVArg{Key: "device", Value: device},
427 &kafka.KVArg{Key: "omcitestrequest", Value: omcitestrequest})
428}
Dinesh Belwalkarc1129f12020-02-27 10:41:33 -0800429
430func (ap *AdapterProxy) GetExtValue(ctx context.Context, pdevice *voltha.Device, cdevice *voltha.Device, id string, valuetype voltha.ValueType_Type) (chan *kafka.RpcResponse, error) {
Girish Kumar3e8ee212020-08-19 17:50:11 +0000431 logger.Debugw(ctx, "GetExtValue", log.Fields{"device-id": pdevice.Id, "onuid": id})
Dinesh Belwalkarc1129f12020-02-27 10:41:33 -0800432 rpc := "get_ext_value"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000433 toTopic, err := ap.getAdapterTopic(ctx, pdevice.Id, pdevice.Adapter)
Dinesh Belwalkarc1129f12020-02-27 10:41:33 -0800434 if err != nil {
435 return nil, err
436 }
437 // Use a device specific topic to send the request. The adapter handling the device creates a device
438 // specific topic
439 args := []*kafka.KVArg{
440 {
441 Key: "pDeviceId",
442 Value: &ic.StrType{Val: pdevice.Id},
443 },
444 {
445 Key: "device",
446 Value: cdevice,
447 },
448 {
449 Key: "valuetype",
450 Value: &ic.IntType{Val: int64(valuetype)},
451 }}
452
453 replyToTopic := ap.getCoreTopic()
454 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, pdevice.Id, args...)
455}
dpaul62686312020-06-23 14:17:36 +0530456
457// SetExtValue set some given configs or value
458func (ap *AdapterProxy) SetExtValue(ctx context.Context, device *voltha.Device, value *voltha.ValueSet) (chan *kafka.RpcResponse, error) {
459 logger.Debugw(ctx, "SetExtValue", log.Fields{"device-id": value.Id})
460 rpc := "set_ext_value"
461 toTopic, err := ap.getAdapterTopic(ctx, value.Id, device.Adapter)
462 if err != nil {
463 return nil, err
464 }
465 // Use a device specific topic to send the request. The adapter handling the device creates a device
466 // specific topic
467 args := []*kafka.KVArg{
468 {
469 Key: "value",
470 Value: value,
471 },
472 }
473 replyToTopic := ap.getCoreTopic()
474 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, value.Id, args...)
475}
Salman Siddiqui1cf95042020-11-19 00:42:56 +0530476
477// GetSingleValue get a value from the adapter, based on the request type
478func (ap *AdapterProxy) GetSingleValue(ctx context.Context, adapterType string, request *extension.SingleGetValueRequest) (chan *kafka.RpcResponse, error) {
479 logger.Debugw(ctx, "GetSingleValue", log.Fields{"device-id": request.TargetId})
480 rpc := "single_get_value_request"
481 toTopic, err := ap.getAdapterTopic(ctx, request.TargetId, adapterType)
482 if err != nil {
483 return nil, err
484 }
485
486 // Use a device specific topic to send the request. The adapter handling the device creates a device
487 // specific topic
488 args := []*kafka.KVArg{
489 {
490 Key: "request",
491 Value: request,
492 },
493 }
494
495 replyToTopic := ap.getCoreTopic()
496 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, request.TargetId, args...)
497}
498
499// SetSingleValue set a single value on the adapter, based on the request type
500func (ap *AdapterProxy) SetSingleValue(ctx context.Context, adapterType string, request *extension.SingleSetValueRequest) (chan *kafka.RpcResponse, error) {
501 logger.Debugw(ctx, "SetSingleValue", log.Fields{"device-id": request.TargetId})
502 rpc := "single_set_value_request"
503 toTopic, err := ap.getAdapterTopic(ctx, request.TargetId, adapterType)
504 if err != nil {
505 return nil, err
506 }
507
508 // Use a device specific topic to send the request. The adapter handling the device creates a device
509 // specific topic
510 args := []*kafka.KVArg{
511 {
512 Key: "request",
513 Value: request,
514 },
515 }
516
517 replyToTopic := ap.getCoreTopic()
518 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, request.TargetId, args...)
519}
ssiddiquif076cb82021-04-23 10:47:04 +0530520
521// DownloadImageToOnuDevice invokes download image rpc
522func (ap *AdapterProxy) DownloadImageToOnuDevice(ctx context.Context, device *voltha.Device, downloadRequest *voltha.DeviceImageDownloadRequest) (chan *kafka.RpcResponse, error) {
523 logger.Debugw(ctx, "download-image-to-device", log.Fields{"device-id": device.Id, "image": downloadRequest.Image.Name})
524 rpc := "Download_onu_image"
525 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
526 if err != nil {
527 return nil, err
528 }
529 args := []*kafka.KVArg{
530 {Key: "deviceImageDownloadReq", Value: downloadRequest},
531 }
532 replyToTopic := ap.getCoreTopic()
533 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
534}
535
536func (ap *AdapterProxy) GetOnuImageStatus(ctx context.Context, device *voltha.Device, request *voltha.DeviceImageRequest) (chan *kafka.RpcResponse, error) {
537 logger.Debugw(ctx, "get-image-status", log.Fields{"device-id": device.Id})
538 rpc := "Get_onu_image_status"
539 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
540 if err != nil {
541 return nil, err
542 }
543 args := []*kafka.KVArg{
544 {Key: "deviceImageReq", Value: request},
545 }
546 replyToTopic := ap.getCoreTopic()
547 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
548}
549
550func (ap *AdapterProxy) ActivateOnuImage(ctx context.Context, device *voltha.Device, request *voltha.DeviceImageRequest) (chan *kafka.RpcResponse, error) {
551 logger.Debugw(ctx, "activate-onu-image", log.Fields{"device-id": device.Id})
552 rpc := "Activate_onu_image"
553 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
554 if err != nil {
555 return nil, err
556 }
557 args := []*kafka.KVArg{
558 {Key: "deviceImageReq", Value: request},
559 }
560 replyToTopic := ap.getCoreTopic()
561 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
562}
563
564func (ap *AdapterProxy) AbortImageUpgrade(ctx context.Context, device *voltha.Device, request *voltha.DeviceImageRequest) (chan *kafka.RpcResponse, error) {
565 logger.Debugw(ctx, "abort-image-upgrade", log.Fields{"device-id": device.Id})
566 rpc := "Abort_onu_image_upgrade"
567 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
568 if err != nil {
569 return nil, err
570 }
571 args := []*kafka.KVArg{
572 {Key: "deviceImageReq", Value: request},
573 }
574 replyToTopic := ap.getCoreTopic()
575 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
576}
577
578func (ap *AdapterProxy) CommitImage(ctx context.Context, device *voltha.Device, request *voltha.DeviceImageRequest) (chan *kafka.RpcResponse, error) {
579 logger.Debugw(ctx, "commit-image", log.Fields{"device-id": device.Id})
580 rpc := "Commit_onu_image"
581 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
582 if err != nil {
583 return nil, err
584 }
585 args := []*kafka.KVArg{
586 {Key: "deviceImageReq", Value: request},
587 }
588 replyToTopic := ap.getCoreTopic()
589 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
590}
591
592func (ap *AdapterProxy) GetOnuImages(ctx context.Context, device *voltha.Device, id *common.ID) (chan *kafka.RpcResponse, error) {
ssiddiqui47348f62021-05-20 20:41:15 +0530593 logger.Debug(ctx, "get-onu-images")
ssiddiquif076cb82021-04-23 10:47:04 +0530594 rpc := "Get_onu_images"
595 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
596 if err != nil {
597 return nil, err
598 }
599 args := []*kafka.KVArg{
ssiddiqui47348f62021-05-20 20:41:15 +0530600 {Key: "deviceId", Value: &ic.StrType{Val: id.Id}},
ssiddiquif076cb82021-04-23 10:47:04 +0530601 }
602 replyToTopic := ap.getCoreTopic()
603 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
604}