blob: 3d25a43a7fd02556c89e0e104d02daf364f123ac [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
npujar1d86a522019-11-14 17:11:16 +053016
Kent Hagerman2b216042020-04-03 18:28:56 -040017package remote
khenaidoob9203542018-09-17 22:56:37 -040018
19import (
20 "context"
serkant.uluderya8ff291d2020-05-20 00:58:00 -070021
serkant.uluderya2ae470f2020-01-21 11:13:09 -080022 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
23 "github.com/opencord/voltha-lib-go/v3/pkg/log"
24 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
Kent Hagermana7c9d792020-07-16 17:39:01 -040025 ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080026 "github.com/opencord/voltha-protos/v3/go/voltha"
khenaidoob9203542018-09-17 22:56:37 -040027)
28
npujar1d86a522019-11-14 17:11:16 +053029// AdapterProxy represents adapter proxy attributes
khenaidoob9203542018-09-17 22:56:37 -040030type AdapterProxy struct {
Kent Hagerman2b216042020-04-03 18:28:56 -040031 kafka.EndpointManager
khenaidoo54e0ddf2019-02-27 16:21:33 -050032 deviceTopicRegistered bool
serkant.uluderya8ff291d2020-05-20 00:58:00 -070033 coreTopic string
npujar467fe752020-01-16 20:17:45 +053034 kafkaICProxy kafka.InterContainerProxy
khenaidoob9203542018-09-17 22:56:37 -040035}
36
npujar1d86a522019-11-14 17:11:16 +053037// NewAdapterProxy will return adapter proxy instance
serkant.uluderya8ff291d2020-05-20 00:58:00 -070038func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, coreTopic string, endpointManager kafka.EndpointManager) *AdapterProxy {
Kent Hagermana6d0c362019-07-30 12:50:21 -040039 return &AdapterProxy{
Kent Hagerman2b216042020-04-03 18:28:56 -040040 EndpointManager: endpointManager,
Kent Hagermana6d0c362019-07-30 12:50:21 -040041 kafkaICProxy: kafkaProxy,
serkant.uluderya8ff291d2020-05-20 00:58:00 -070042 coreTopic: coreTopic,
Kent Hagermana6d0c362019-07-30 12:50:21 -040043 deviceTopicRegistered: false,
44 }
khenaidoob9203542018-09-17 22:56:37 -040045}
46
serkant.uluderya334479d2019-04-10 08:26:15 -070047func (ap *AdapterProxy) getCoreTopic() kafka.Topic {
serkant.uluderya8ff291d2020-05-20 00:58:00 -070048 return kafka.Topic{Name: ap.coreTopic}
khenaidoo54e0ddf2019-02-27 16:21:33 -050049}
50
Rohan Agrawal31f21802020-06-12 05:38:46 +000051func (ap *AdapterProxy) getAdapterTopic(ctx context.Context, deviceID string, adapterType string) (*kafka.Topic, error) {
Matteo Scandolod525ae32020-04-02 17:27:29 -070052
Rohan Agrawal31f21802020-06-12 05:38:46 +000053 endpoint, err := ap.GetEndpoint(ctx, deviceID, adapterType)
Matteo Scandolod525ae32020-04-02 17:27:29 -070054 if err != nil {
55 return nil, err
56 }
57
58 return &kafka.Topic{Name: string(endpoint)}, nil
khenaidoo54e0ddf2019-02-27 16:21:33 -050059}
60
khenaidoo442e7c72020-03-10 16:13:48 -040061func (ap *AdapterProxy) sendRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic,
62 waitForResponse bool, deviceID string, kvArgs ...*kafka.KVArg) (chan *kafka.RpcResponse, error) {
63
64 // Sent the request to kafka
65 respChnl := ap.kafkaICProxy.InvokeAsyncRPC(ctx, rpc, toTopic, replyToTopic, waitForResponse, deviceID, kvArgs...)
66
67 // Wait for first response which would indicate whether the request was successfully sent to kafka.
68 firstResponse, ok := <-respChnl
69 if !ok || firstResponse.MType != kafka.RpcSent {
Rohan Agrawal31f21802020-06-12 05:38:46 +000070 logger.Errorw(ctx, "failure to request to kafka", log.Fields{"rpc": rpc, "device-id": deviceID, "error": firstResponse.Err})
khenaidoo442e7c72020-03-10 16:13:48 -040071 return nil, firstResponse.Err
72 }
73 // return the kafka channel for the caller to wait for the response of the RPC call
74 return respChnl, nil
75}
76
Kent Hagerman2b216042020-04-03 18:28:56 -040077// AdoptDevice invokes adopt device rpc
78func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +000079 logger.Debugw(ctx, "AdoptDevice", log.Fields{"device-id": device.Id})
khenaidoo92e62c52018-10-03 14:02:54 -040080 rpc := "adopt_device"
Rohan Agrawal31f21802020-06-12 05:38:46 +000081 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -070082 if err != nil {
83 return nil, err
84 }
khenaidoo442e7c72020-03-10 16:13:48 -040085 args := []*kafka.KVArg{
86 {Key: "device", Value: device},
khenaidoob9203542018-09-17 22:56:37 -040087 }
khenaidoo54e0ddf2019-02-27 16:21:33 -050088 replyToTopic := ap.getCoreTopic()
khenaidoo54e0ddf2019-02-27 16:21:33 -050089 ap.deviceTopicRegistered = true
Rohan Agrawal31f21802020-06-12 05:38:46 +000090 logger.Debugw(ctx, "adoptDevice-send-request", log.Fields{"device-id": device.Id, "deviceType": device.Type, "serialNumber": device.SerialNumber})
Matteo Scandolod525ae32020-04-02 17:27:29 -070091 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo92e62c52018-10-03 14:02:54 -040092}
93
Kent Hagerman2b216042020-04-03 18:28:56 -040094// DisableDevice invokes disable device rpc
95func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +000096 logger.Debugw(ctx, "DisableDevice", log.Fields{"device-id": device.Id})
khenaidoo92e62c52018-10-03 14:02:54 -040097 rpc := "disable_device"
Rohan Agrawal31f21802020-06-12 05:38:46 +000098 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -070099 if err != nil {
100 return nil, err
101 }
khenaidoo442e7c72020-03-10 16:13:48 -0400102 args := []*kafka.KVArg{
103 {Key: "device", Value: device},
khenaidoob9203542018-09-17 22:56:37 -0400104 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500105 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700106 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400107}
108
Kent Hagerman2b216042020-04-03 18:28:56 -0400109// ReEnableDevice invokes reenable device rpc
110func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000111 logger.Debugw(ctx, "ReEnableDevice", log.Fields{"device-id": device.Id})
khenaidoo4d4802d2018-10-04 21:59:49 -0400112 rpc := "reenable_device"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000113 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700114 if err != nil {
115 return nil, err
116 }
khenaidoo442e7c72020-03-10 16:13:48 -0400117 args := []*kafka.KVArg{
118 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400119 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500120 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700121 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400122}
123
Kent Hagerman2b216042020-04-03 18:28:56 -0400124// RebootDevice invokes reboot device rpc
125func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000126 logger.Debugw(ctx, "RebootDevice", log.Fields{"device-id": device.Id})
khenaidoo4d4802d2018-10-04 21:59:49 -0400127 rpc := "reboot_device"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000128 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700129 if err != nil {
130 return nil, err
131 }
khenaidoo442e7c72020-03-10 16:13:48 -0400132 args := []*kafka.KVArg{
133 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400134 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500135 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700136 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400137}
138
Kent Hagerman2b216042020-04-03 18:28:56 -0400139// DeleteDevice invokes delete device rpc
140func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000141 logger.Debugw(ctx, "DeleteDevice", log.Fields{"device-id": device.Id})
khenaidoo4d4802d2018-10-04 21:59:49 -0400142 rpc := "delete_device"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000143 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700144 if err != nil {
145 return nil, err
146 }
khenaidoo442e7c72020-03-10 16:13:48 -0400147 args := []*kafka.KVArg{
148 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400149 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500150 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700151 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400152}
153
Kent Hagerman2b216042020-04-03 18:28:56 -0400154// GetOfpDeviceInfo invokes get ofp device info rpc
155func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000156 logger.Debugw(ctx, "GetOfpDeviceInfo", log.Fields{"device-id": device.Id})
khenaidoo442e7c72020-03-10 16:13:48 -0400157 rpc := "get_ofp_device_info"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000158 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700159 if err != nil {
160 return nil, err
161 }
khenaidoo442e7c72020-03-10 16:13:48 -0400162 args := []*kafka.KVArg{
163 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400164 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500165 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700166 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400167}
168
Kent Hagerman2b216042020-04-03 18:28:56 -0400169// ReconcileDevice invokes reconcile device rpc
170func (ap *AdapterProxy) ReconcileDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000171 logger.Debugw(ctx, "ReconcileDevice", log.Fields{"device-id": device.Id})
Matt Jeanneret7cf8e0b2020-01-09 11:57:51 -0500172 rpc := "reconcile_device"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000173 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700174 if err != nil {
175 return nil, err
176 }
khenaidooba6b6c42019-08-02 09:11:56 -0400177 args := []*kafka.KVArg{
178 {Key: "device", Value: device},
179 }
khenaidooba6b6c42019-08-02 09:11:56 -0400180 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700181 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400182}
183
Kent Hagerman2b216042020-04-03 18:28:56 -0400184// DownloadImage invokes download image rpc
185func (ap *AdapterProxy) DownloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000186 logger.Debugw(ctx, "DownloadImage", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500187 rpc := "download_image"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000188 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700189 if err != nil {
190 return nil, err
191 }
khenaidoo442e7c72020-03-10 16:13:48 -0400192 args := []*kafka.KVArg{
193 {Key: "device", Value: device},
194 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500195 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500196 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700197 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400198}
199
Kent Hagerman2b216042020-04-03 18:28:56 -0400200// GetImageDownloadStatus invokes get image download status rpc
201func (ap *AdapterProxy) GetImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000202 logger.Debugw(ctx, "GetImageDownloadStatus", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500203 rpc := "get_image_download_status"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000204 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700205 if err != nil {
206 return nil, err
207 }
khenaidoo442e7c72020-03-10 16:13:48 -0400208 args := []*kafka.KVArg{
209 {Key: "device", Value: device},
210 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500211 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500212 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700213 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400214}
215
Kent Hagerman2b216042020-04-03 18:28:56 -0400216// CancelImageDownload invokes cancel image download rpc
217func (ap *AdapterProxy) CancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000218 logger.Debugw(ctx, "CancelImageDownload", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500219 rpc := "cancel_image_download"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000220 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700221 if err != nil {
222 return nil, err
223 }
khenaidoo442e7c72020-03-10 16:13:48 -0400224 args := []*kafka.KVArg{
225 {Key: "device", Value: device},
226 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500227 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500228 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700229 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400230}
231
Kent Hagerman2b216042020-04-03 18:28:56 -0400232// ActivateImageUpdate invokes activate image update rpc
233func (ap *AdapterProxy) ActivateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000234 logger.Debugw(ctx, "ActivateImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500235 rpc := "activate_image_update"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000236 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700237 if err != nil {
238 return nil, err
239 }
khenaidoo442e7c72020-03-10 16:13:48 -0400240 args := []*kafka.KVArg{
241 {Key: "device", Value: device},
242 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500243 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500244 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700245 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400246}
247
Kent Hagerman2b216042020-04-03 18:28:56 -0400248// RevertImageUpdate invokes revert image update rpc
249func (ap *AdapterProxy) RevertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000250 logger.Debugw(ctx, "RevertImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500251 rpc := "revert_image_update"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000252 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700253 if err != nil {
254 return nil, err
255 }
khenaidoo442e7c72020-03-10 16:13:48 -0400256 args := []*kafka.KVArg{
257 {Key: "device", Value: device},
258 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500259 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500260 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700261 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400262}
263
Kent Hagermana7c9d792020-07-16 17:39:01 -0400264func (ap *AdapterProxy) PacketOut(ctx context.Context, deviceType string, deviceID string, outPort uint32, packet *ofp.OfpPacketOut) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000265 logger.Debugw(ctx, "PacketOut", log.Fields{"device-id": deviceID, "device-type": deviceType, "out-port": outPort})
266 toTopic, err := ap.getAdapterTopic(ctx, deviceID, deviceType)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700267 if err != nil {
268 return nil, err
269 }
khenaidoofdbad6e2018-11-06 22:26:38 -0500270 rpc := "receive_packet_out"
khenaidoo442e7c72020-03-10 16:13:48 -0400271 args := []*kafka.KVArg{
272 {Key: "deviceId", Value: &ic.StrType{Val: deviceID}},
273 {Key: "outPort", Value: &ic.IntType{Val: int64(outPort)}},
274 {Key: "packet", Value: packet},
khenaidoofdbad6e2018-11-06 22:26:38 -0500275 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500276 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700277 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, deviceID, args...)
khenaidoofdbad6e2018-11-06 22:26:38 -0500278}
279
Kent Hagerman2b216042020-04-03 18:28:56 -0400280// UpdateFlowsBulk invokes update flows bulk rpc
Kent Hagermana7c9d792020-07-16 17:39:01 -0400281func (ap *AdapterProxy) UpdateFlowsBulk(ctx context.Context, device *voltha.Device, flows map[uint64]*ofp.OfpFlowStats, groups map[uint32]*voltha.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
282 logger.Debugw(ctx, "UpdateFlowsBulk", log.Fields{"device-id": device.Id, "flow-count": len(flows), "group-count": len(groups), "flow-metadata": flowMetadata})
Rohan Agrawal31f21802020-06-12 05:38:46 +0000283 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700284 if err != nil {
285 return nil, err
286 }
khenaidoo19d7b632018-10-30 10:49:50 -0400287 rpc := "update_flows_bulk"
Kent Hagermana7c9d792020-07-16 17:39:01 -0400288
289 ctr, flowSlice := 0, make([]*ofp.OfpFlowStats, len(flows))
290 for _, flow := range flows {
291 flowSlice[ctr] = flow
292 ctr++
293 }
294 ctr, groupSlice := 0, make([]*ofp.OfpGroupEntry, len(groups))
295 for _, group := range groups {
296 groupSlice[ctr] = group
297 ctr++
298 }
khenaidoo442e7c72020-03-10 16:13:48 -0400299 args := []*kafka.KVArg{
300 {Key: "device", Value: device},
Kent Hagermana7c9d792020-07-16 17:39:01 -0400301 {Key: "flows", Value: &voltha.Flows{Items: flowSlice}},
302 {Key: "groups", Value: &voltha.FlowGroups{Items: groupSlice}},
khenaidoo442e7c72020-03-10 16:13:48 -0400303 {Key: "flow_metadata", Value: flowMetadata},
khenaidoo19d7b632018-10-30 10:49:50 -0400304 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500305 replyToTopic := ap.getCoreTopic()
Rohan Agrawalcf12f202020-08-03 04:42:01 +0000306 return ap.sendRPC(log.WithSpanFromContext(context.TODO(), ctx), rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400307}
308
Kent Hagerman2b216042020-04-03 18:28:56 -0400309// UpdateFlowsIncremental invokes update flows incremental rpc
Kent Hagermana7c9d792020-07-16 17:39:01 -0400310func (ap *AdapterProxy) UpdateFlowsIncremental(ctx context.Context, device *voltha.Device, flowChanges *ofp.FlowChanges, groupChanges *ofp.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000311 logger.Debugw(ctx, "UpdateFlowsIncremental",
khenaidoo0458db62019-06-20 08:50:36 -0400312 log.Fields{
khenaidoo442e7c72020-03-10 16:13:48 -0400313 "device-id": device.Id,
314 "flow-to-add-count": len(flowChanges.ToAdd.Items),
315 "flow-to-delete-count": len(flowChanges.ToRemove.Items),
316 "group-to-add-count": len(groupChanges.ToAdd.Items),
317 "group-to-delete-count": len(groupChanges.ToRemove.Items),
318 "group-to-update-count": len(groupChanges.ToUpdate.Items),
khenaidoo0458db62019-06-20 08:50:36 -0400319 })
Rohan Agrawal31f21802020-06-12 05:38:46 +0000320 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700321 if err != nil {
322 return nil, err
323 }
Matt Jeanneretb0037422019-03-23 14:36:51 -0400324 rpc := "update_flows_incrementally"
khenaidoo442e7c72020-03-10 16:13:48 -0400325 args := []*kafka.KVArg{
326 {Key: "device", Value: device},
327 {Key: "flow_changes", Value: flowChanges},
328 {Key: "group_changes", Value: groupChanges},
329 {Key: "flow_metadata", Value: flowMetadata},
khenaidoo19d7b632018-10-30 10:49:50 -0400330 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500331 replyToTopic := ap.getCoreTopic()
Rohan Agrawalcf12f202020-08-03 04:42:01 +0000332 return ap.sendRPC(log.WithSpanFromContext(context.TODO(), ctx), rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400333}
334
Kent Hagerman2b216042020-04-03 18:28:56 -0400335// UpdatePmConfigs invokes update pm configs rpc
336func (ap *AdapterProxy) UpdatePmConfigs(ctx context.Context, device *voltha.Device, pmConfigs *voltha.PmConfigs) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000337 logger.Debugw(ctx, "UpdatePmConfigs", log.Fields{"device-id": device.Id, "pm-configs-id": pmConfigs.Id})
338 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700339 if err != nil {
340 return nil, err
341 }
Rohan Agrawal2a0c4492020-06-29 11:55:06 +0000342 rpc := "update_pm_config"
khenaidoo442e7c72020-03-10 16:13:48 -0400343 args := []*kafka.KVArg{
344 {Key: "device", Value: device},
345 {Key: "pm_configs", Value: pmConfigs},
khenaidoob3127472019-07-24 21:04:55 -0400346 }
khenaidoob3127472019-07-24 21:04:55 -0400347 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700348 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400349}
350
Kent Hagerman2b216042020-04-03 18:28:56 -0400351// SimulateAlarm invokes simulate alarm rpc
352func (ap *AdapterProxy) SimulateAlarm(ctx context.Context, device *voltha.Device, simulateReq *voltha.SimulateAlarmRequest) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000353 logger.Debugw(ctx, "SimulateAlarm", log.Fields{"device-id": device.Id, "simulate-req-id": simulateReq.Id})
serkant.uluderya334479d2019-04-10 08:26:15 -0700354 rpc := "simulate_alarm"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000355 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700356 if err != nil {
357 return nil, err
358 }
khenaidoo442e7c72020-03-10 16:13:48 -0400359 args := []*kafka.KVArg{
360 {Key: "device", Value: device},
361 {Key: "request", Value: simulateReq},
serkant.uluderya334479d2019-04-10 08:26:15 -0700362 }
serkant.uluderya334479d2019-04-10 08:26:15 -0700363 replyToTopic := ap.getCoreTopic()
364 ap.deviceTopicRegistered = true
Matteo Scandolod525ae32020-04-02 17:27:29 -0700365 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
serkant.uluderya334479d2019-04-10 08:26:15 -0700366}
kesavandbc2d1622020-01-21 00:42:01 -0500367
Kent Hagerman2b216042020-04-03 18:28:56 -0400368func (ap *AdapterProxy) DisablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000369 logger.Debugw(ctx, "DisablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
kesavandbc2d1622020-01-21 00:42:01 -0500370 rpc := "disable_port"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000371 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700372 if err != nil {
373 return nil, err
374 }
khenaidoo442e7c72020-03-10 16:13:48 -0400375 args := []*kafka.KVArg{
376 {Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
377 {Key: "port", Value: port},
kesavandbc2d1622020-01-21 00:42:01 -0500378 }
kesavandbc2d1622020-01-21 00:42:01 -0500379 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700380 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
kesavandbc2d1622020-01-21 00:42:01 -0500381}
382
Kent Hagerman2b216042020-04-03 18:28:56 -0400383func (ap *AdapterProxy) EnablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000384 logger.Debugw(ctx, "EnablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
kesavandbc2d1622020-01-21 00:42:01 -0500385 rpc := "enable_port"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000386 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700387 if err != nil {
388 return nil, err
389 }
khenaidoo442e7c72020-03-10 16:13:48 -0400390 args := []*kafka.KVArg{
391 {Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
392 {Key: "port", Value: port},
kesavandbc2d1622020-01-21 00:42:01 -0500393 }
kesavandbc2d1622020-01-21 00:42:01 -0500394 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700395 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
kesavandbc2d1622020-01-21 00:42:01 -0500396}
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500397
Kent Hagerman2b216042020-04-03 18:28:56 -0400398// ChildDeviceLost invokes child device_lost rpc
399func (ap *AdapterProxy) ChildDeviceLost(ctx context.Context, deviceType string, deviceID string, pPortNo uint32, onuID uint32) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000400 logger.Debugw(ctx, "ChildDeviceLost", log.Fields{"device-id": deviceID, "parent-port-no": pPortNo, "onu-id": onuID})
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500401 rpc := "child_device_lost"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000402 toTopic, err := ap.getAdapterTopic(ctx, deviceID, deviceType)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700403 if err != nil {
404 return nil, err
405 }
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500406 args := []*kafka.KVArg{
Matteo Scandolod525ae32020-04-02 17:27:29 -0700407 {Key: "pDeviceId", Value: &ic.StrType{Val: deviceID}},
khenaidoo442e7c72020-03-10 16:13:48 -0400408 {Key: "pPortNo", Value: &ic.IntType{Val: int64(pPortNo)}},
409 {Key: "onuID", Value: &ic.IntType{Val: int64(onuID)}},
410 }
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500411 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700412 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, deviceID, args...)
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500413}
onkarkundargi87285252020-01-27 11:34:52 +0530414
Kent Hagerman2b216042020-04-03 18:28:56 -0400415func (ap *AdapterProxy) StartOmciTest(ctx context.Context, device *voltha.Device, omcitestrequest *voltha.OmciTestRequest) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000416 logger.Debugw(ctx, "Omci_test_Request_adapter_proxy", log.Fields{"device": device, "omciTestRequest": omcitestrequest})
onkarkundargi87285252020-01-27 11:34:52 +0530417 rpc := "start_omci_test"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000418 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700419 if err != nil {
420 return nil, err
421 }
onkarkundargi87285252020-01-27 11:34:52 +0530422 // Use a device specific topic as we are the only core handling requests for this device
423 replyToTopic := ap.getCoreTopic()
Scott Baker432f9be2020-03-26 11:56:30 -0700424 // TODO: Perhaps this should have used omcitestrequest.uuid as the second argument rather
425 // than including the whole request, which is (deviceid, uuid)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700426 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id,
onkarkundargi87285252020-01-27 11:34:52 +0530427 &kafka.KVArg{Key: "device", Value: device},
428 &kafka.KVArg{Key: "omcitestrequest", Value: omcitestrequest})
429}
Dinesh Belwalkarc1129f12020-02-27 10:41:33 -0800430
431func (ap *AdapterProxy) GetExtValue(ctx context.Context, pdevice *voltha.Device, cdevice *voltha.Device, id string, valuetype voltha.ValueType_Type) (chan *kafka.RpcResponse, error) {
432 log.Debugw("GetExtValue", log.Fields{"device-id": pdevice.Id, "onuid": id})
433 rpc := "get_ext_value"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000434 toTopic, err := ap.getAdapterTopic(ctx, pdevice.Id, pdevice.Adapter)
Dinesh Belwalkarc1129f12020-02-27 10:41:33 -0800435 if err != nil {
436 return nil, err
437 }
438 // Use a device specific topic to send the request. The adapter handling the device creates a device
439 // specific topic
440 args := []*kafka.KVArg{
441 {
442 Key: "pDeviceId",
443 Value: &ic.StrType{Val: pdevice.Id},
444 },
445 {
446 Key: "device",
447 Value: cdevice,
448 },
449 {
450 Key: "valuetype",
451 Value: &ic.IntType{Val: int64(valuetype)},
452 }}
453
454 replyToTopic := ap.getCoreTopic()
455 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, pdevice.Id, args...)
456}