blob: aba551ec7cb38f31946133e442262435985eaa9d [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
npujar1d86a522019-11-14 17:11:16 +053016
Kent Hagerman2b216042020-04-03 18:28:56 -040017package remote
khenaidoob9203542018-09-17 22:56:37 -040018
19import (
20 "context"
serkant.uluderya8ff291d2020-05-20 00:58:00 -070021
Maninderdfadc982020-10-28 14:04:33 +053022 "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
23 "github.com/opencord/voltha-lib-go/v4/pkg/log"
Salman Siddiqui1cf95042020-11-19 00:42:56 +053024 "github.com/opencord/voltha-protos/v4/go/extension"
Maninderdfadc982020-10-28 14:04:33 +053025 ic "github.com/opencord/voltha-protos/v4/go/inter_container"
26 ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
27 "github.com/opencord/voltha-protos/v4/go/voltha"
khenaidoob9203542018-09-17 22:56:37 -040028)
29
npujar1d86a522019-11-14 17:11:16 +053030// AdapterProxy represents adapter proxy attributes
khenaidoob9203542018-09-17 22:56:37 -040031type AdapterProxy struct {
Kent Hagerman2b216042020-04-03 18:28:56 -040032 kafka.EndpointManager
khenaidoodd3324d2021-04-27 16:22:55 -040033 coreTopic string
34 kafkaICProxy kafka.InterContainerProxy
khenaidoob9203542018-09-17 22:56:37 -040035}
36
npujar1d86a522019-11-14 17:11:16 +053037// NewAdapterProxy will return adapter proxy instance
serkant.uluderya8ff291d2020-05-20 00:58:00 -070038func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, coreTopic string, endpointManager kafka.EndpointManager) *AdapterProxy {
Kent Hagermana6d0c362019-07-30 12:50:21 -040039 return &AdapterProxy{
khenaidoodd3324d2021-04-27 16:22:55 -040040 EndpointManager: endpointManager,
41 kafkaICProxy: kafkaProxy,
42 coreTopic: coreTopic,
Kent Hagermana6d0c362019-07-30 12:50:21 -040043 }
khenaidoob9203542018-09-17 22:56:37 -040044}
45
serkant.uluderya334479d2019-04-10 08:26:15 -070046func (ap *AdapterProxy) getCoreTopic() kafka.Topic {
serkant.uluderya8ff291d2020-05-20 00:58:00 -070047 return kafka.Topic{Name: ap.coreTopic}
khenaidoo54e0ddf2019-02-27 16:21:33 -050048}
49
Rohan Agrawal31f21802020-06-12 05:38:46 +000050func (ap *AdapterProxy) getAdapterTopic(ctx context.Context, deviceID string, adapterType string) (*kafka.Topic, error) {
Matteo Scandolod525ae32020-04-02 17:27:29 -070051
Rohan Agrawal31f21802020-06-12 05:38:46 +000052 endpoint, err := ap.GetEndpoint(ctx, deviceID, adapterType)
Matteo Scandolod525ae32020-04-02 17:27:29 -070053 if err != nil {
54 return nil, err
55 }
56
57 return &kafka.Topic{Name: string(endpoint)}, nil
khenaidoo54e0ddf2019-02-27 16:21:33 -050058}
59
khenaidoo442e7c72020-03-10 16:13:48 -040060func (ap *AdapterProxy) sendRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic,
61 waitForResponse bool, deviceID string, kvArgs ...*kafka.KVArg) (chan *kafka.RpcResponse, error) {
62
63 // Sent the request to kafka
64 respChnl := ap.kafkaICProxy.InvokeAsyncRPC(ctx, rpc, toTopic, replyToTopic, waitForResponse, deviceID, kvArgs...)
65
66 // Wait for first response which would indicate whether the request was successfully sent to kafka.
67 firstResponse, ok := <-respChnl
68 if !ok || firstResponse.MType != kafka.RpcSent {
Rohan Agrawal31f21802020-06-12 05:38:46 +000069 logger.Errorw(ctx, "failure to request to kafka", log.Fields{"rpc": rpc, "device-id": deviceID, "error": firstResponse.Err})
khenaidoo442e7c72020-03-10 16:13:48 -040070 return nil, firstResponse.Err
71 }
72 // return the kafka channel for the caller to wait for the response of the RPC call
73 return respChnl, nil
74}
75
Kent Hagerman2b216042020-04-03 18:28:56 -040076// AdoptDevice invokes adopt device rpc
77func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +000078 logger.Debugw(ctx, "AdoptDevice", log.Fields{"device-id": device.Id})
khenaidoo92e62c52018-10-03 14:02:54 -040079 rpc := "adopt_device"
Rohan Agrawal31f21802020-06-12 05:38:46 +000080 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -070081 if err != nil {
82 return nil, err
83 }
khenaidoo442e7c72020-03-10 16:13:48 -040084 args := []*kafka.KVArg{
85 {Key: "device", Value: device},
khenaidoob9203542018-09-17 22:56:37 -040086 }
khenaidoo54e0ddf2019-02-27 16:21:33 -050087 replyToTopic := ap.getCoreTopic()
Rohan Agrawal31f21802020-06-12 05:38:46 +000088 logger.Debugw(ctx, "adoptDevice-send-request", log.Fields{"device-id": device.Id, "deviceType": device.Type, "serialNumber": device.SerialNumber})
Matteo Scandolod525ae32020-04-02 17:27:29 -070089 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo92e62c52018-10-03 14:02:54 -040090}
91
Kent Hagerman2b216042020-04-03 18:28:56 -040092// DisableDevice invokes disable device rpc
93func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +000094 logger.Debugw(ctx, "DisableDevice", log.Fields{"device-id": device.Id})
khenaidoo92e62c52018-10-03 14:02:54 -040095 rpc := "disable_device"
Rohan Agrawal31f21802020-06-12 05:38:46 +000096 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -070097 if err != nil {
98 return nil, err
99 }
khenaidoo442e7c72020-03-10 16:13:48 -0400100 args := []*kafka.KVArg{
101 {Key: "device", Value: device},
khenaidoob9203542018-09-17 22:56:37 -0400102 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500103 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700104 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400105}
106
Kent Hagerman2b216042020-04-03 18:28:56 -0400107// ReEnableDevice invokes reenable device rpc
108func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000109 logger.Debugw(ctx, "ReEnableDevice", log.Fields{"device-id": device.Id})
khenaidoo4d4802d2018-10-04 21:59:49 -0400110 rpc := "reenable_device"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000111 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700112 if err != nil {
113 return nil, err
114 }
khenaidoo442e7c72020-03-10 16:13:48 -0400115 args := []*kafka.KVArg{
116 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400117 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500118 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700119 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400120}
121
Kent Hagerman2b216042020-04-03 18:28:56 -0400122// RebootDevice invokes reboot device rpc
123func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000124 logger.Debugw(ctx, "RebootDevice", log.Fields{"device-id": device.Id})
khenaidoo4d4802d2018-10-04 21:59:49 -0400125 rpc := "reboot_device"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000126 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700127 if err != nil {
128 return nil, err
129 }
khenaidoo442e7c72020-03-10 16:13:48 -0400130 args := []*kafka.KVArg{
131 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400132 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500133 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700134 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400135}
136
Kent Hagerman2b216042020-04-03 18:28:56 -0400137// DeleteDevice invokes delete device rpc
138func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000139 logger.Debugw(ctx, "DeleteDevice", log.Fields{"device-id": device.Id})
khenaidoo4d4802d2018-10-04 21:59:49 -0400140 rpc := "delete_device"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000141 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700142 if err != nil {
143 return nil, err
144 }
khenaidoo442e7c72020-03-10 16:13:48 -0400145 args := []*kafka.KVArg{
146 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400147 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500148 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700149 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400150}
151
Kent Hagerman2b216042020-04-03 18:28:56 -0400152// GetOfpDeviceInfo invokes get ofp device info rpc
153func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000154 logger.Debugw(ctx, "GetOfpDeviceInfo", log.Fields{"device-id": device.Id})
khenaidoo442e7c72020-03-10 16:13:48 -0400155 rpc := "get_ofp_device_info"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000156 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700157 if err != nil {
158 return nil, err
159 }
khenaidoo442e7c72020-03-10 16:13:48 -0400160 args := []*kafka.KVArg{
161 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400162 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500163 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700164 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400165}
166
Kent Hagerman2b216042020-04-03 18:28:56 -0400167// ReconcileDevice invokes reconcile device rpc
168func (ap *AdapterProxy) ReconcileDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000169 logger.Debugw(ctx, "ReconcileDevice", log.Fields{"device-id": device.Id})
Matt Jeanneret7cf8e0b2020-01-09 11:57:51 -0500170 rpc := "reconcile_device"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000171 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700172 if err != nil {
173 return nil, err
174 }
khenaidooba6b6c42019-08-02 09:11:56 -0400175 args := []*kafka.KVArg{
176 {Key: "device", Value: device},
177 }
khenaidooba6b6c42019-08-02 09:11:56 -0400178 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700179 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400180}
181
Kent Hagerman2b216042020-04-03 18:28:56 -0400182// DownloadImage invokes download image rpc
183func (ap *AdapterProxy) DownloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000184 logger.Debugw(ctx, "DownloadImage", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500185 rpc := "download_image"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000186 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700187 if err != nil {
188 return nil, err
189 }
khenaidoo442e7c72020-03-10 16:13:48 -0400190 args := []*kafka.KVArg{
191 {Key: "device", Value: device},
192 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500193 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500194 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700195 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400196}
197
Kent Hagerman2b216042020-04-03 18:28:56 -0400198// GetImageDownloadStatus invokes get image download status rpc
199func (ap *AdapterProxy) GetImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000200 logger.Debugw(ctx, "GetImageDownloadStatus", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500201 rpc := "get_image_download_status"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000202 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700203 if err != nil {
204 return nil, err
205 }
khenaidoo442e7c72020-03-10 16:13:48 -0400206 args := []*kafka.KVArg{
207 {Key: "device", Value: device},
208 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500209 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500210 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700211 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400212}
213
Kent Hagerman2b216042020-04-03 18:28:56 -0400214// CancelImageDownload invokes cancel image download rpc
215func (ap *AdapterProxy) CancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000216 logger.Debugw(ctx, "CancelImageDownload", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500217 rpc := "cancel_image_download"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000218 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700219 if err != nil {
220 return nil, err
221 }
khenaidoo442e7c72020-03-10 16:13:48 -0400222 args := []*kafka.KVArg{
223 {Key: "device", Value: device},
224 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500225 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500226 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700227 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400228}
229
Kent Hagerman2b216042020-04-03 18:28:56 -0400230// ActivateImageUpdate invokes activate image update rpc
231func (ap *AdapterProxy) ActivateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000232 logger.Debugw(ctx, "ActivateImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500233 rpc := "activate_image_update"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000234 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700235 if err != nil {
236 return nil, err
237 }
khenaidoo442e7c72020-03-10 16:13:48 -0400238 args := []*kafka.KVArg{
239 {Key: "device", Value: device},
240 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500241 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500242 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700243 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400244}
245
Kent Hagerman2b216042020-04-03 18:28:56 -0400246// RevertImageUpdate invokes revert image update rpc
247func (ap *AdapterProxy) RevertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000248 logger.Debugw(ctx, "RevertImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500249 rpc := "revert_image_update"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000250 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700251 if err != nil {
252 return nil, err
253 }
khenaidoo442e7c72020-03-10 16:13:48 -0400254 args := []*kafka.KVArg{
255 {Key: "device", Value: device},
256 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500257 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500258 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700259 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400260}
261
Kent Hagermana7c9d792020-07-16 17:39:01 -0400262func (ap *AdapterProxy) PacketOut(ctx context.Context, deviceType string, deviceID string, outPort uint32, packet *ofp.OfpPacketOut) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000263 logger.Debugw(ctx, "PacketOut", log.Fields{"device-id": deviceID, "device-type": deviceType, "out-port": outPort})
264 toTopic, err := ap.getAdapterTopic(ctx, deviceID, deviceType)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700265 if err != nil {
266 return nil, err
267 }
khenaidoofdbad6e2018-11-06 22:26:38 -0500268 rpc := "receive_packet_out"
khenaidoo442e7c72020-03-10 16:13:48 -0400269 args := []*kafka.KVArg{
270 {Key: "deviceId", Value: &ic.StrType{Val: deviceID}},
271 {Key: "outPort", Value: &ic.IntType{Val: int64(outPort)}},
272 {Key: "packet", Value: packet},
khenaidoofdbad6e2018-11-06 22:26:38 -0500273 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500274 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700275 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, deviceID, args...)
khenaidoofdbad6e2018-11-06 22:26:38 -0500276}
277
Kent Hagerman2b216042020-04-03 18:28:56 -0400278// UpdateFlowsBulk invokes update flows bulk rpc
Kent Hagermana7c9d792020-07-16 17:39:01 -0400279func (ap *AdapterProxy) UpdateFlowsBulk(ctx context.Context, device *voltha.Device, flows map[uint64]*ofp.OfpFlowStats, groups map[uint32]*voltha.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
280 logger.Debugw(ctx, "UpdateFlowsBulk", log.Fields{"device-id": device.Id, "flow-count": len(flows), "group-count": len(groups), "flow-metadata": flowMetadata})
Rohan Agrawal31f21802020-06-12 05:38:46 +0000281 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700282 if err != nil {
283 return nil, err
284 }
khenaidoo19d7b632018-10-30 10:49:50 -0400285 rpc := "update_flows_bulk"
Kent Hagermana7c9d792020-07-16 17:39:01 -0400286
287 ctr, flowSlice := 0, make([]*ofp.OfpFlowStats, len(flows))
288 for _, flow := range flows {
289 flowSlice[ctr] = flow
290 ctr++
291 }
292 ctr, groupSlice := 0, make([]*ofp.OfpGroupEntry, len(groups))
293 for _, group := range groups {
294 groupSlice[ctr] = group
295 ctr++
296 }
khenaidoo442e7c72020-03-10 16:13:48 -0400297 args := []*kafka.KVArg{
298 {Key: "device", Value: device},
Kent Hagermana7c9d792020-07-16 17:39:01 -0400299 {Key: "flows", Value: &voltha.Flows{Items: flowSlice}},
300 {Key: "groups", Value: &voltha.FlowGroups{Items: groupSlice}},
khenaidoo442e7c72020-03-10 16:13:48 -0400301 {Key: "flow_metadata", Value: flowMetadata},
khenaidoo19d7b632018-10-30 10:49:50 -0400302 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500303 replyToTopic := ap.getCoreTopic()
Rohan Agrawalcf12f202020-08-03 04:42:01 +0000304 return ap.sendRPC(log.WithSpanFromContext(context.TODO(), ctx), rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400305}
306
Kent Hagerman2b216042020-04-03 18:28:56 -0400307// UpdateFlowsIncremental invokes update flows incremental rpc
Kent Hagermana7c9d792020-07-16 17:39:01 -0400308func (ap *AdapterProxy) UpdateFlowsIncremental(ctx context.Context, device *voltha.Device, flowChanges *ofp.FlowChanges, groupChanges *ofp.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000309 logger.Debugw(ctx, "UpdateFlowsIncremental",
khenaidoo0458db62019-06-20 08:50:36 -0400310 log.Fields{
khenaidoo442e7c72020-03-10 16:13:48 -0400311 "device-id": device.Id,
312 "flow-to-add-count": len(flowChanges.ToAdd.Items),
313 "flow-to-delete-count": len(flowChanges.ToRemove.Items),
314 "group-to-add-count": len(groupChanges.ToAdd.Items),
315 "group-to-delete-count": len(groupChanges.ToRemove.Items),
316 "group-to-update-count": len(groupChanges.ToUpdate.Items),
khenaidoo0458db62019-06-20 08:50:36 -0400317 })
Rohan Agrawal31f21802020-06-12 05:38:46 +0000318 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700319 if err != nil {
320 return nil, err
321 }
Matt Jeanneretb0037422019-03-23 14:36:51 -0400322 rpc := "update_flows_incrementally"
khenaidoo442e7c72020-03-10 16:13:48 -0400323 args := []*kafka.KVArg{
324 {Key: "device", Value: device},
325 {Key: "flow_changes", Value: flowChanges},
326 {Key: "group_changes", Value: groupChanges},
327 {Key: "flow_metadata", Value: flowMetadata},
khenaidoo19d7b632018-10-30 10:49:50 -0400328 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500329 replyToTopic := ap.getCoreTopic()
Rohan Agrawalcf12f202020-08-03 04:42:01 +0000330 return ap.sendRPC(log.WithSpanFromContext(context.TODO(), ctx), rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400331}
332
Kent Hagerman2b216042020-04-03 18:28:56 -0400333// UpdatePmConfigs invokes update pm configs rpc
334func (ap *AdapterProxy) UpdatePmConfigs(ctx context.Context, device *voltha.Device, pmConfigs *voltha.PmConfigs) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000335 logger.Debugw(ctx, "UpdatePmConfigs", log.Fields{"device-id": device.Id, "pm-configs-id": pmConfigs.Id})
336 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700337 if err != nil {
338 return nil, err
339 }
Rohan Agrawal2a0c4492020-06-29 11:55:06 +0000340 rpc := "update_pm_config"
khenaidoo442e7c72020-03-10 16:13:48 -0400341 args := []*kafka.KVArg{
342 {Key: "device", Value: device},
343 {Key: "pm_configs", Value: pmConfigs},
khenaidoob3127472019-07-24 21:04:55 -0400344 }
khenaidoob3127472019-07-24 21:04:55 -0400345 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700346 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400347}
348
Kent Hagerman2b216042020-04-03 18:28:56 -0400349// SimulateAlarm invokes simulate alarm rpc
350func (ap *AdapterProxy) SimulateAlarm(ctx context.Context, device *voltha.Device, simulateReq *voltha.SimulateAlarmRequest) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000351 logger.Debugw(ctx, "SimulateAlarm", log.Fields{"device-id": device.Id, "simulate-req-id": simulateReq.Id})
serkant.uluderya334479d2019-04-10 08:26:15 -0700352 rpc := "simulate_alarm"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000353 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700354 if err != nil {
355 return nil, err
356 }
khenaidoo442e7c72020-03-10 16:13:48 -0400357 args := []*kafka.KVArg{
358 {Key: "device", Value: device},
359 {Key: "request", Value: simulateReq},
serkant.uluderya334479d2019-04-10 08:26:15 -0700360 }
serkant.uluderya334479d2019-04-10 08:26:15 -0700361 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700362 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
serkant.uluderya334479d2019-04-10 08:26:15 -0700363}
kesavandbc2d1622020-01-21 00:42:01 -0500364
Kent Hagerman2b216042020-04-03 18:28:56 -0400365func (ap *AdapterProxy) DisablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000366 logger.Debugw(ctx, "DisablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
kesavandbc2d1622020-01-21 00:42:01 -0500367 rpc := "disable_port"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000368 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700369 if err != nil {
370 return nil, err
371 }
khenaidoo442e7c72020-03-10 16:13:48 -0400372 args := []*kafka.KVArg{
373 {Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
374 {Key: "port", Value: port},
kesavandbc2d1622020-01-21 00:42:01 -0500375 }
kesavandbc2d1622020-01-21 00:42:01 -0500376 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700377 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
kesavandbc2d1622020-01-21 00:42:01 -0500378}
379
Kent Hagerman2b216042020-04-03 18:28:56 -0400380func (ap *AdapterProxy) EnablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000381 logger.Debugw(ctx, "EnablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
kesavandbc2d1622020-01-21 00:42:01 -0500382 rpc := "enable_port"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000383 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700384 if err != nil {
385 return nil, err
386 }
khenaidoo442e7c72020-03-10 16:13:48 -0400387 args := []*kafka.KVArg{
388 {Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
389 {Key: "port", Value: port},
kesavandbc2d1622020-01-21 00:42:01 -0500390 }
kesavandbc2d1622020-01-21 00:42:01 -0500391 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700392 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
kesavandbc2d1622020-01-21 00:42:01 -0500393}
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500394
Kent Hagerman2b216042020-04-03 18:28:56 -0400395// ChildDeviceLost invokes child device_lost rpc
Girish Gowdra6f9b10e2021-03-11 14:36:39 -0800396func (ap *AdapterProxy) ChildDeviceLost(ctx context.Context, deviceType string, childDevice *voltha.Device) (chan *kafka.RpcResponse, error) {
397 logger.Debugw(ctx, "ChildDeviceLost",
398 log.Fields{"device-id": childDevice.ParentId, "parent-port-no": childDevice.ParentPortNo,
399 "onu-id": childDevice.ProxyAddress.OnuId, "serial-number": childDevice.SerialNumber})
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500400 rpc := "child_device_lost"
Girish Gowdra6f9b10e2021-03-11 14:36:39 -0800401 toTopic, err := ap.getAdapterTopic(ctx, childDevice.ParentId, deviceType)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700402 if err != nil {
403 return nil, err
404 }
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500405 args := []*kafka.KVArg{
Girish Gowdra6f9b10e2021-03-11 14:36:39 -0800406 {Key: "childDevice", Value: childDevice},
khenaidoo442e7c72020-03-10 16:13:48 -0400407 }
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500408 replyToTopic := ap.getCoreTopic()
Girish Gowdra6f9b10e2021-03-11 14:36:39 -0800409 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, childDevice.ParentId, args...)
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500410}
onkarkundargi87285252020-01-27 11:34:52 +0530411
Kent Hagerman2b216042020-04-03 18:28:56 -0400412func (ap *AdapterProxy) StartOmciTest(ctx context.Context, device *voltha.Device, omcitestrequest *voltha.OmciTestRequest) (chan *kafka.RpcResponse, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000413 logger.Debugw(ctx, "Omci_test_Request_adapter_proxy", log.Fields{"device": device, "omciTestRequest": omcitestrequest})
onkarkundargi87285252020-01-27 11:34:52 +0530414 rpc := "start_omci_test"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000415 toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700416 if err != nil {
417 return nil, err
418 }
onkarkundargi87285252020-01-27 11:34:52 +0530419 // Use a device specific topic as we are the only core handling requests for this device
420 replyToTopic := ap.getCoreTopic()
Scott Baker432f9be2020-03-26 11:56:30 -0700421 // TODO: Perhaps this should have used omcitestrequest.uuid as the second argument rather
422 // than including the whole request, which is (deviceid, uuid)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700423 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id,
onkarkundargi87285252020-01-27 11:34:52 +0530424 &kafka.KVArg{Key: "device", Value: device},
425 &kafka.KVArg{Key: "omcitestrequest", Value: omcitestrequest})
426}
Dinesh Belwalkarc1129f12020-02-27 10:41:33 -0800427
428func (ap *AdapterProxy) GetExtValue(ctx context.Context, pdevice *voltha.Device, cdevice *voltha.Device, id string, valuetype voltha.ValueType_Type) (chan *kafka.RpcResponse, error) {
Girish Kumar3e8ee212020-08-19 17:50:11 +0000429 logger.Debugw(ctx, "GetExtValue", log.Fields{"device-id": pdevice.Id, "onuid": id})
Dinesh Belwalkarc1129f12020-02-27 10:41:33 -0800430 rpc := "get_ext_value"
Rohan Agrawal31f21802020-06-12 05:38:46 +0000431 toTopic, err := ap.getAdapterTopic(ctx, pdevice.Id, pdevice.Adapter)
Dinesh Belwalkarc1129f12020-02-27 10:41:33 -0800432 if err != nil {
433 return nil, err
434 }
435 // Use a device specific topic to send the request. The adapter handling the device creates a device
436 // specific topic
437 args := []*kafka.KVArg{
438 {
439 Key: "pDeviceId",
440 Value: &ic.StrType{Val: pdevice.Id},
441 },
442 {
443 Key: "device",
444 Value: cdevice,
445 },
446 {
447 Key: "valuetype",
448 Value: &ic.IntType{Val: int64(valuetype)},
449 }}
450
451 replyToTopic := ap.getCoreTopic()
452 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, pdevice.Id, args...)
453}
dpaul62686312020-06-23 14:17:36 +0530454
455// SetExtValue set some given configs or value
456func (ap *AdapterProxy) SetExtValue(ctx context.Context, device *voltha.Device, value *voltha.ValueSet) (chan *kafka.RpcResponse, error) {
457 logger.Debugw(ctx, "SetExtValue", log.Fields{"device-id": value.Id})
458 rpc := "set_ext_value"
459 toTopic, err := ap.getAdapterTopic(ctx, value.Id, device.Adapter)
460 if err != nil {
461 return nil, err
462 }
463 // Use a device specific topic to send the request. The adapter handling the device creates a device
464 // specific topic
465 args := []*kafka.KVArg{
466 {
467 Key: "value",
468 Value: value,
469 },
470 }
471 replyToTopic := ap.getCoreTopic()
472 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, value.Id, args...)
473}
Salman Siddiqui1cf95042020-11-19 00:42:56 +0530474
475// GetSingleValue get a value from the adapter, based on the request type
476func (ap *AdapterProxy) GetSingleValue(ctx context.Context, adapterType string, request *extension.SingleGetValueRequest) (chan *kafka.RpcResponse, error) {
477 logger.Debugw(ctx, "GetSingleValue", log.Fields{"device-id": request.TargetId})
478 rpc := "single_get_value_request"
479 toTopic, err := ap.getAdapterTopic(ctx, request.TargetId, adapterType)
480 if err != nil {
481 return nil, err
482 }
483
484 // Use a device specific topic to send the request. The adapter handling the device creates a device
485 // specific topic
486 args := []*kafka.KVArg{
487 {
488 Key: "request",
489 Value: request,
490 },
491 }
492
493 replyToTopic := ap.getCoreTopic()
494 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, request.TargetId, args...)
495}
496
497// SetSingleValue set a single value on the adapter, based on the request type
498func (ap *AdapterProxy) SetSingleValue(ctx context.Context, adapterType string, request *extension.SingleSetValueRequest) (chan *kafka.RpcResponse, error) {
499 logger.Debugw(ctx, "SetSingleValue", log.Fields{"device-id": request.TargetId})
500 rpc := "single_set_value_request"
501 toTopic, err := ap.getAdapterTopic(ctx, request.TargetId, adapterType)
502 if err != nil {
503 return nil, err
504 }
505
506 // Use a device specific topic to send the request. The adapter handling the device creates a device
507 // specific topic
508 args := []*kafka.KVArg{
509 {
510 Key: "request",
511 Value: request,
512 },
513 }
514
515 replyToTopic := ap.getCoreTopic()
516 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, request.TargetId, args...)
517}